From: James Sedgwick Date: Sat, 21 Oct 2017 17:25:08 +0000 (-0700) Subject: move io/Compression and io/compression/* to compression/ X-Git-Tag: v2017.10.23.00~3 X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=commitdiff_plain;h=453d5ff6fdcb0a1e2c68efd90dcf40d053a5bceb move io/Compression and io/compression/* to compression/ Summary: as above (Note: this ignores all push blocking failures!) Reviewed By: yfeldblum Differential Revision: D6099826 fbshipit-source-id: 20152487135aa8eaf6d2e99369801b6dde4992aa --- diff --git a/CMakeLists.txt b/CMakeLists.txt index e41ba08d..3255fe47 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -294,6 +294,9 @@ if (BUILD_TESTS) apply_folly_compile_options_to_target(folly_test_support) folly_define_tests( + DIRECTORY compression/test/ + TEST compression_test SOURCES CompressionTest.cpp + DIRECTORY concurrency/test/ TEST cache_locality_test SOURCES CacheLocalityTest.cpp @@ -414,7 +417,6 @@ if (BUILD_TESTS) TEST spooky_hash_v2_test SOURCES SpookyHashV2Test.cpp DIRECTORY io/test/ - TEST compression_test SOURCES CompressionTest.cpp TEST iobuf_test SOURCES IOBufTest.cpp TEST iobuf_cursor_test SOURCES IOBufCursorTest.cpp TEST iobuf_queue_test SOURCES IOBufQueueTest.cpp diff --git a/folly/Makefile.am b/folly/Makefile.am index 3fc4644b..ef127982 100644 --- a/folly/Makefile.am +++ b/folly/Makefile.am @@ -53,6 +53,9 @@ nobase_follyinclude_HEADERS = \ CppAttributes.h \ CpuId.h \ CPortability.h \ + compression/Compression.h \ + compression/Utils.h \ + compression/Zlib.h \ concurrency/CacheLocality.h \ concurrency/ConcurrentHashMap.h \ concurrency/CoreCachedSharedPtr.h \ @@ -253,7 +256,6 @@ nobase_follyinclude_HEADERS = \ IndexedMemPool.h \ init/Init.h \ IntrusiveList.h \ - io/Compression.h \ io/Cursor.h \ io/Cursor-inl.h \ io/IOBuf.h \ @@ -311,8 +313,6 @@ nobase_follyinclude_HEADERS = \ io/async/test/TimeUtil.h \ io/async/test/UndelayedDestruction.h \ io/async/test/Util.h \ - io/compression/Utils.h \ - io/compression/Zlib.h \ Iterator.h \ json.h \ Launder.h \ @@ -488,6 +488,8 @@ libfollybase_la_SOURCES = \ libfolly_la_SOURCES = \ Assume.cpp \ ClockGettimeWrappers.cpp \ + compression/Compression.cpp \ + compression/Zlib.cpp \ concurrency/CacheLocality.cpp \ concurrency/GlobalThreadPoolList.cpp \ detail/Futex.cpp \ @@ -527,7 +529,6 @@ libfolly_la_SOURCES = \ IPAddressV6.cpp \ LifoSem.cpp \ init/Init.cpp \ - io/Compression.cpp \ io/Cursor.cpp \ io/IOBuf.cpp \ io/IOBufQueue.cpp \ @@ -558,7 +559,6 @@ libfolly_la_SOURCES = \ io/async/test/TimeUtil.cpp \ io/async/ssl/OpenSSLUtils.cpp \ io/async/ssl/SSLErrors.cpp \ - io/compression/Zlib.cpp \ json.cpp \ detail/MemoryIdler.cpp \ detail/SocketFastOpen.cpp \ diff --git a/folly/compression/Compression.cpp b/folly/compression/Compression.cpp new file mode 100644 index 00000000..05ea019c --- /dev/null +++ b/folly/compression/Compression.cpp @@ -0,0 +1,2146 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#if FOLLY_HAVE_LIBLZ4 +#include +#include +#if LZ4_VERSION_NUMBER >= 10301 +#include +#endif +#endif + +#include + +#if FOLLY_HAVE_LIBSNAPPY +#include +#include +#endif + +#if FOLLY_HAVE_LIBZ +#include +#endif + +#if FOLLY_HAVE_LIBLZMA +#include +#endif + +#if FOLLY_HAVE_LIBZSTD +#define ZSTD_STATIC_LINKING_ONLY +#include +#endif + +#if FOLLY_HAVE_LIBBZ2 +#include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using folly::io::compression::detail::dataStartsWithLE; +using folly::io::compression::detail::prefixToStringLE; + +namespace folly { +namespace io { + +Codec::Codec(CodecType type) : type_(type) { } + +// Ensure consistent behavior in the nullptr case +std::unique_ptr Codec::compress(const IOBuf* data) { + if (data == nullptr) { + throw std::invalid_argument("Codec: data must not be nullptr"); + } + uint64_t len = data->computeChainDataLength(); + if (len > maxUncompressedLength()) { + throw std::runtime_error("Codec: uncompressed length too large"); + } + + return doCompress(data); +} + +std::string Codec::compress(const StringPiece data) { + const uint64_t len = data.size(); + if (len > maxUncompressedLength()) { + throw std::runtime_error("Codec: uncompressed length too large"); + } + + return doCompressString(data); +} + +std::unique_ptr Codec::uncompress( + const IOBuf* data, + Optional uncompressedLength) { + if (data == nullptr) { + throw std::invalid_argument("Codec: data must not be nullptr"); + } + if (!uncompressedLength) { + if (needsUncompressedLength()) { + throw std::invalid_argument("Codec: uncompressed length required"); + } + } else if (*uncompressedLength > maxUncompressedLength()) { + throw std::runtime_error("Codec: uncompressed length too large"); + } + + if (data->empty()) { + if (uncompressedLength.value_or(0) != 0) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + return IOBuf::create(0); + } + + return doUncompress(data, uncompressedLength); +} + +std::string Codec::uncompress( + const StringPiece data, + Optional uncompressedLength) { + if (!uncompressedLength) { + if (needsUncompressedLength()) { + throw std::invalid_argument("Codec: uncompressed length required"); + } + } else if (*uncompressedLength > maxUncompressedLength()) { + throw std::runtime_error("Codec: uncompressed length too large"); + } + + if (data.empty()) { + if (uncompressedLength.value_or(0) != 0) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + return ""; + } + + return doUncompressString(data, uncompressedLength); +} + +bool Codec::needsUncompressedLength() const { + return doNeedsUncompressedLength(); +} + +uint64_t Codec::maxUncompressedLength() const { + return doMaxUncompressedLength(); +} + +bool Codec::doNeedsUncompressedLength() const { + return false; +} + +uint64_t Codec::doMaxUncompressedLength() const { + return UNLIMITED_UNCOMPRESSED_LENGTH; +} + +std::vector Codec::validPrefixes() const { + return {}; +} + +bool Codec::canUncompress(const IOBuf*, Optional) const { + return false; +} + +std::string Codec::doCompressString(const StringPiece data) { + const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data}; + auto outputBuffer = doCompress(&inputBuffer); + std::string output; + output.reserve(outputBuffer->computeChainDataLength()); + for (auto range : *outputBuffer) { + output.append(reinterpret_cast(range.data()), range.size()); + } + return output; +} + +std::string Codec::doUncompressString( + const StringPiece data, + Optional uncompressedLength) { + const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data}; + auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength); + std::string output; + output.reserve(outputBuffer->computeChainDataLength()); + for (auto range : *outputBuffer) { + output.append(reinterpret_cast(range.data()), range.size()); + } + return output; +} + +uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const { + return doMaxCompressedLength(uncompressedLength); +} + +Optional Codec::getUncompressedLength( + const folly::IOBuf* data, + Optional uncompressedLength) const { + auto const compressedLength = data->computeChainDataLength(); + if (compressedLength == 0) { + if (uncompressedLength.value_or(0) != 0) { + throw std::runtime_error("Invalid uncompressed length"); + } + return 0; + } + return doGetUncompressedLength(data, uncompressedLength); +} + +Optional Codec::doGetUncompressedLength( + const folly::IOBuf*, + Optional uncompressedLength) const { + return uncompressedLength; +} + +bool StreamCodec::needsDataLength() const { + return doNeedsDataLength(); +} + +bool StreamCodec::doNeedsDataLength() const { + return false; +} + +void StreamCodec::assertStateIs(State expected) const { + if (state_ != expected) { + throw std::logic_error(folly::to( + "Codec: state is ", state_, "; expected state ", expected)); + } +} + +void StreamCodec::resetStream(Optional uncompressedLength) { + state_ = State::RESET; + uncompressedLength_ = uncompressedLength; + progressMade_ = true; + doResetStream(); +} + +bool StreamCodec::compressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) { + if (state_ == State::RESET && input.empty() && + flushOp == StreamCodec::FlushOp::END && + uncompressedLength().value_or(0) != 0) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + + if (!uncompressedLength() && needsDataLength()) { + throw std::runtime_error("Codec: uncompressed length required"); + } + if (state_ == State::RESET && !input.empty() && + uncompressedLength() == uint64_t(0)) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + // Handle input state transitions + switch (flushOp) { + case StreamCodec::FlushOp::NONE: + if (state_ == State::RESET) { + state_ = State::COMPRESS; + } + assertStateIs(State::COMPRESS); + break; + case StreamCodec::FlushOp::FLUSH: + if (state_ == State::RESET || state_ == State::COMPRESS) { + state_ = State::COMPRESS_FLUSH; + } + assertStateIs(State::COMPRESS_FLUSH); + break; + case StreamCodec::FlushOp::END: + if (state_ == State::RESET || state_ == State::COMPRESS) { + state_ = State::COMPRESS_END; + } + assertStateIs(State::COMPRESS_END); + break; + } + size_t const inputSize = input.size(); + size_t const outputSize = output.size(); + bool const done = doCompressStream(input, output, flushOp); + if (!done && inputSize == input.size() && outputSize == output.size()) { + if (!progressMade_) { + throw std::runtime_error("Codec: No forward progress made"); + } + // Throw an exception if there is no progress again next time + progressMade_ = false; + } else { + progressMade_ = true; + } + // Handle output state transitions + if (done) { + if (state_ == State::COMPRESS_FLUSH) { + state_ = State::COMPRESS; + } else if (state_ == State::COMPRESS_END) { + state_ = State::END; + } + // Check internal invariants + DCHECK(input.empty()); + DCHECK(flushOp != StreamCodec::FlushOp::NONE); + } + return done; +} + +bool StreamCodec::uncompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) { + if (state_ == State::RESET && input.empty()) { + if (uncompressedLength().value_or(0) == 0) { + return true; + } + return false; + } + // Handle input state transitions + if (state_ == State::RESET) { + state_ = State::UNCOMPRESS; + } + assertStateIs(State::UNCOMPRESS); + size_t const inputSize = input.size(); + size_t const outputSize = output.size(); + bool const done = doUncompressStream(input, output, flushOp); + if (!done && inputSize == input.size() && outputSize == output.size()) { + if (!progressMade_) { + throw std::runtime_error("Codec: no forward progress made"); + } + // Throw an exception if there is no progress again next time + progressMade_ = false; + } else { + progressMade_ = true; + } + // Handle output state transitions + if (done) { + state_ = State::END; + } + return done; +} + +static std::unique_ptr addOutputBuffer( + MutableByteRange& output, + uint64_t size) { + DCHECK(output.empty()); + auto buffer = IOBuf::create(size); + buffer->append(buffer->capacity()); + output = {buffer->writableData(), buffer->length()}; + return buffer; +} + +std::unique_ptr StreamCodec::doCompress(IOBuf const* data) { + uint64_t const uncompressedLength = data->computeChainDataLength(); + resetStream(uncompressedLength); + uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength); + + auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB + auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB + + MutableByteRange output; + auto buffer = addOutputBuffer( + output, + maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen + : kDefaultBufferLength); + + // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer + IOBuf const* current = data; + ByteRange input{current->data(), current->length()}; + StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE; + bool done = false; + while (!done) { + while (input.empty() && current->next() != data) { + current = current->next(); + input = {current->data(), current->length()}; + } + if (current->next() == data) { + // This is the last input buffer so end the stream + flushOp = StreamCodec::FlushOp::END; + } + if (output.empty()) { + buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength)); + } + done = compressStream(input, output, flushOp); + if (done) { + DCHECK(input.empty()); + DCHECK(flushOp == StreamCodec::FlushOp::END); + DCHECK_EQ(current->next(), data); + } + } + buffer->prev()->trimEnd(output.size()); + return buffer; +} + +static uint64_t computeBufferLength( + uint64_t const compressedLength, + uint64_t const blockSize) { + uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB + uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength); + return std::min(goodBufferSize, kMaxBufferLength); +} + +std::unique_ptr StreamCodec::doUncompress( + IOBuf const* data, + Optional uncompressedLength) { + auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB + auto constexpr kBlockSize = uint64_t(128) << 10; + auto const defaultBufferLength = + computeBufferLength(data->computeChainDataLength(), kBlockSize); + + uncompressedLength = getUncompressedLength(data, uncompressedLength); + resetStream(uncompressedLength); + + MutableByteRange output; + auto buffer = addOutputBuffer( + output, + (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength + ? *uncompressedLength + : defaultBufferLength)); + + // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer + IOBuf const* current = data; + ByteRange input{current->data(), current->length()}; + StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE; + bool done = false; + while (!done) { + while (input.empty() && current->next() != data) { + current = current->next(); + input = {current->data(), current->length()}; + } + if (current->next() == data) { + // Tell the uncompressor there is no more input (it may optimize) + flushOp = StreamCodec::FlushOp::END; + } + if (output.empty()) { + buffer->prependChain(addOutputBuffer(output, defaultBufferLength)); + } + done = uncompressStream(input, output, flushOp); + } + if (!input.empty()) { + throw std::runtime_error("Codec: Junk after end of data"); + } + + buffer->prev()->trimEnd(output.size()); + if (uncompressedLength && + *uncompressedLength != buffer->computeChainDataLength()) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + + return buffer; +} + +namespace { + +/** + * No compression + */ +class NoCompressionCodec final : public Codec { + public: + static std::unique_ptr create(int level, CodecType type); + explicit NoCompressionCodec(int level, CodecType type); + + private: + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; + std::unique_ptr doCompress(const IOBuf* data) override; + std::unique_ptr doUncompress( + const IOBuf* data, + Optional uncompressedLength) override; +}; + +std::unique_ptr NoCompressionCodec::create(int level, CodecType type) { + return std::make_unique(level, type); +} + +NoCompressionCodec::NoCompressionCodec(int level, CodecType type) + : Codec(type) { + DCHECK(type == CodecType::NO_COMPRESSION); + switch (level) { + case COMPRESSION_LEVEL_DEFAULT: + case COMPRESSION_LEVEL_FASTEST: + case COMPRESSION_LEVEL_BEST: + level = 0; + } + if (level != 0) { + throw std::invalid_argument(to( + "NoCompressionCodec: invalid level ", level)); + } +} + +uint64_t NoCompressionCodec::doMaxCompressedLength( + uint64_t uncompressedLength) const { + return uncompressedLength; +} + +std::unique_ptr NoCompressionCodec::doCompress( + const IOBuf* data) { + return data->clone(); +} + +std::unique_ptr NoCompressionCodec::doUncompress( + const IOBuf* data, + Optional uncompressedLength) { + if (uncompressedLength && + data->computeChainDataLength() != *uncompressedLength) { + throw std::runtime_error( + to("NoCompressionCodec: invalid uncompressed length")); + } + return data->clone(); +} + +#if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA) + +namespace { + +void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) { + DCHECK_GE(out->tailroom(), kMaxVarintLength64); + out->append(encodeVarint(val, out->writableTail())); +} + +inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) { + uint64_t val = 0; + int8_t b = 0; + for (int shift = 0; shift <= 63; shift += 7) { + b = cursor.read(); + val |= static_cast(b & 0x7f) << shift; + if (b >= 0) { + break; + } + } + if (b < 0) { + throw std::invalid_argument("Invalid varint value. Too big."); + } + return val; +} + +} // namespace + +#endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA + +#if FOLLY_HAVE_LIBLZ4 + +/** + * LZ4 compression + */ +class LZ4Codec final : public Codec { + public: + static std::unique_ptr create(int level, CodecType type); + explicit LZ4Codec(int level, CodecType type); + + private: + bool doNeedsUncompressedLength() const override; + uint64_t doMaxUncompressedLength() const override; + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; + + bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; } + + std::unique_ptr doCompress(const IOBuf* data) override; + std::unique_ptr doUncompress( + const IOBuf* data, + Optional uncompressedLength) override; + + bool highCompression_; +}; + +std::unique_ptr LZ4Codec::create(int level, CodecType type) { + return std::make_unique(level, type); +} + +LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) { + DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE); + + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + case COMPRESSION_LEVEL_DEFAULT: + level = 1; + break; + case COMPRESSION_LEVEL_BEST: + level = 2; + break; + } + if (level < 1 || level > 2) { + throw std::invalid_argument(to( + "LZ4Codec: invalid level: ", level)); + } + highCompression_ = (level > 1); +} + +bool LZ4Codec::doNeedsUncompressedLength() const { + return !encodeSize(); +} + +// The value comes from lz4.h in lz4-r117, but older versions of lz4 don't +// define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it +// here. +#ifndef LZ4_MAX_INPUT_SIZE +# define LZ4_MAX_INPUT_SIZE 0x7E000000 +#endif + +uint64_t LZ4Codec::doMaxUncompressedLength() const { + return LZ4_MAX_INPUT_SIZE; +} + +uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const { + return LZ4_compressBound(uncompressedLength) + + (encodeSize() ? kMaxVarintLength64 : 0); +} + +std::unique_ptr LZ4Codec::doCompress(const IOBuf* data) { + IOBuf clone; + if (data->isChained()) { + // LZ4 doesn't support streaming, so we have to coalesce + clone = data->cloneCoalescedAsValue(); + data = &clone; + } + + auto out = IOBuf::create(maxCompressedLength(data->length())); + if (encodeSize()) { + encodeVarintToIOBuf(data->length(), out.get()); + } + + int n; + auto input = reinterpret_cast(data->data()); + auto output = reinterpret_cast(out->writableTail()); + const auto inputLength = data->length(); +#if LZ4_VERSION_NUMBER >= 10700 + if (highCompression_) { + n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0); + } else { + n = LZ4_compress_default(input, output, inputLength, out->tailroom()); + } +#else + if (highCompression_) { + n = LZ4_compressHC(input, output, inputLength); + } else { + n = LZ4_compress(input, output, inputLength); + } +#endif + + CHECK_GE(n, 0); + CHECK_LE(n, out->capacity()); + + out->append(n); + return out; +} + +std::unique_ptr LZ4Codec::doUncompress( + const IOBuf* data, + Optional uncompressedLength) { + IOBuf clone; + if (data->isChained()) { + // LZ4 doesn't support streaming, so we have to coalesce + clone = data->cloneCoalescedAsValue(); + data = &clone; + } + + folly::io::Cursor cursor(data); + uint64_t actualUncompressedLength; + if (encodeSize()) { + actualUncompressedLength = decodeVarintFromCursor(cursor); + if (uncompressedLength && *uncompressedLength != actualUncompressedLength) { + throw std::runtime_error("LZ4Codec: invalid uncompressed length"); + } + } else { + // Invariants + DCHECK(uncompressedLength.hasValue()); + DCHECK(*uncompressedLength <= maxUncompressedLength()); + actualUncompressedLength = *uncompressedLength; + } + + auto sp = StringPiece{cursor.peekBytes()}; + auto out = IOBuf::create(actualUncompressedLength); + int n = LZ4_decompress_safe( + sp.data(), + reinterpret_cast(out->writableTail()), + sp.size(), + actualUncompressedLength); + + if (n < 0 || uint64_t(n) != actualUncompressedLength) { + throw std::runtime_error(to( + "LZ4 decompression returned invalid value ", n)); + } + out->append(actualUncompressedLength); + return out; +} + +#if LZ4_VERSION_NUMBER >= 10301 + +class LZ4FrameCodec final : public Codec { + public: + static std::unique_ptr create(int level, CodecType type); + explicit LZ4FrameCodec(int level, CodecType type); + ~LZ4FrameCodec() override; + + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, Optional uncompressedLength) + const override; + + private: + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; + + std::unique_ptr doCompress(const IOBuf* data) override; + std::unique_ptr doUncompress( + const IOBuf* data, + Optional uncompressedLength) override; + + // Reset the dctx_ if it is dirty or null. + void resetDCtx(); + + int level_; + LZ4F_decompressionContext_t dctx_{nullptr}; + bool dirty_{false}; +}; + +/* static */ std::unique_ptr LZ4FrameCodec::create( + int level, + CodecType type) { + return std::make_unique(level, type); +} + +static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204; + +std::vector LZ4FrameCodec::validPrefixes() const { + return {prefixToStringLE(kLZ4FrameMagicLE)}; +} + +bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional) const { + return dataStartsWithLE(data, kLZ4FrameMagicLE); +} + +uint64_t LZ4FrameCodec::doMaxCompressedLength( + uint64_t uncompressedLength) const { + LZ4F_preferences_t prefs{}; + prefs.compressionLevel = level_; + prefs.frameInfo.contentSize = uncompressedLength; + return LZ4F_compressFrameBound(uncompressedLength, &prefs); +} + +static size_t lz4FrameThrowOnError(size_t code) { + if (LZ4F_isError(code)) { + throw std::runtime_error( + to("LZ4Frame error: ", LZ4F_getErrorName(code))); + } + return code; +} + +void LZ4FrameCodec::resetDCtx() { + if (dctx_ && !dirty_) { + return; + } + if (dctx_) { + LZ4F_freeDecompressionContext(dctx_); + } + lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100)); + dirty_ = false; +} + +LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) { + DCHECK(type == CodecType::LZ4_FRAME); + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + case COMPRESSION_LEVEL_DEFAULT: + level_ = 0; + break; + case COMPRESSION_LEVEL_BEST: + level_ = 16; + break; + default: + level_ = level; + break; + } +} + +LZ4FrameCodec::~LZ4FrameCodec() { + if (dctx_) { + LZ4F_freeDecompressionContext(dctx_); + } +} + +std::unique_ptr LZ4FrameCodec::doCompress(const IOBuf* data) { + // LZ4 Frame compression doesn't support streaming so we have to coalesce + IOBuf clone; + if (data->isChained()) { + clone = data->cloneCoalescedAsValue(); + data = &clone; + } + // Set preferences + const auto uncompressedLength = data->length(); + LZ4F_preferences_t prefs{}; + prefs.compressionLevel = level_; + prefs.frameInfo.contentSize = uncompressedLength; + // Compress + auto buf = IOBuf::create(maxCompressedLength(uncompressedLength)); + const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame( + buf->writableTail(), + buf->tailroom(), + data->data(), + data->length(), + &prefs)); + buf->append(written); + return buf; +} + +std::unique_ptr LZ4FrameCodec::doUncompress( + const IOBuf* data, + Optional uncompressedLength) { + // Reset the dctx if any errors have occurred + resetDCtx(); + // Coalesce the data + ByteRange in = *data->begin(); + IOBuf clone; + if (data->isChained()) { + clone = data->cloneCoalescedAsValue(); + in = clone.coalesce(); + } + data = nullptr; + // Select decompression options + LZ4F_decompressOptions_t options; + options.stableDst = 1; + // Select blockSize and growthSize for the IOBufQueue + IOBufQueue queue(IOBufQueue::cacheChainLength()); + auto blockSize = uint64_t{64} << 10; + auto growthSize = uint64_t{4} << 20; + if (uncompressedLength) { + // Allocate uncompressedLength in one chunk (up to 64 MB) + const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20); + queue.preallocate(allocateSize, allocateSize); + blockSize = std::min(*uncompressedLength, blockSize); + growthSize = std::min(*uncompressedLength, growthSize); + } else { + // Reduce growthSize for small data + const auto guessUncompressedLen = + 4 * std::max(blockSize, in.size()); + growthSize = std::min(guessUncompressedLen, growthSize); + } + // Once LZ4_decompress() is called, the dctx_ cannot be reused until it + // returns 0 + dirty_ = true; + // Decompress until the frame is over + size_t code = 0; + do { + // Allocate enough space to decompress at least a block + void* out; + size_t outSize; + std::tie(out, outSize) = queue.preallocate(blockSize, growthSize); + // Decompress + size_t inSize = in.size(); + code = lz4FrameThrowOnError( + LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options)); + if (in.empty() && outSize == 0 && code != 0) { + // We passed no input, no output was produced, and the frame isn't over + // No more forward progress is possible + throw std::runtime_error("LZ4Frame error: Incomplete frame"); + } + in.uncheckedAdvance(inSize); + queue.postallocate(outSize); + } while (code != 0); + // At this point the decompression context can be reused + dirty_ = false; + if (uncompressedLength && queue.chainLength() != *uncompressedLength) { + throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength"); + } + return queue.move(); +} + +#endif // LZ4_VERSION_NUMBER >= 10301 +#endif // FOLLY_HAVE_LIBLZ4 + +#if FOLLY_HAVE_LIBSNAPPY + +/** + * Snappy compression + */ + +/** + * Implementation of snappy::Source that reads from a IOBuf chain. + */ +class IOBufSnappySource final : public snappy::Source { + public: + explicit IOBufSnappySource(const IOBuf* data); + size_t Available() const override; + const char* Peek(size_t* len) override; + void Skip(size_t n) override; + private: + size_t available_; + io::Cursor cursor_; +}; + +IOBufSnappySource::IOBufSnappySource(const IOBuf* data) + : available_(data->computeChainDataLength()), + cursor_(data) { +} + +size_t IOBufSnappySource::Available() const { + return available_; +} + +const char* IOBufSnappySource::Peek(size_t* len) { + auto sp = StringPiece{cursor_.peekBytes()}; + *len = sp.size(); + return sp.data(); +} + +void IOBufSnappySource::Skip(size_t n) { + CHECK_LE(n, available_); + cursor_.skip(n); + available_ -= n; +} + +class SnappyCodec final : public Codec { + public: + static std::unique_ptr create(int level, CodecType type); + explicit SnappyCodec(int level, CodecType type); + + private: + uint64_t doMaxUncompressedLength() const override; + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; + std::unique_ptr doCompress(const IOBuf* data) override; + std::unique_ptr doUncompress( + const IOBuf* data, + Optional uncompressedLength) override; +}; + +std::unique_ptr SnappyCodec::create(int level, CodecType type) { + return std::make_unique(level, type); +} + +SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) { + DCHECK(type == CodecType::SNAPPY); + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + case COMPRESSION_LEVEL_DEFAULT: + case COMPRESSION_LEVEL_BEST: + level = 1; + } + if (level != 1) { + throw std::invalid_argument(to( + "SnappyCodec: invalid level: ", level)); + } +} + +uint64_t SnappyCodec::doMaxUncompressedLength() const { + // snappy.h uses uint32_t for lengths, so there's that. + return std::numeric_limits::max(); +} + +uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const { + return snappy::MaxCompressedLength(uncompressedLength); +} + +std::unique_ptr SnappyCodec::doCompress(const IOBuf* data) { + IOBufSnappySource source(data); + auto out = IOBuf::create(maxCompressedLength(source.Available())); + + snappy::UncheckedByteArraySink sink(reinterpret_cast( + out->writableTail())); + + size_t n = snappy::Compress(&source, &sink); + + CHECK_LE(n, out->capacity()); + out->append(n); + return out; +} + +std::unique_ptr SnappyCodec::doUncompress( + const IOBuf* data, + Optional uncompressedLength) { + uint32_t actualUncompressedLength = 0; + + { + IOBufSnappySource source(data); + if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) { + throw std::runtime_error("snappy::GetUncompressedLength failed"); + } + if (uncompressedLength && *uncompressedLength != actualUncompressedLength) { + throw std::runtime_error("snappy: invalid uncompressed length"); + } + } + + auto out = IOBuf::create(actualUncompressedLength); + + { + IOBufSnappySource source(data); + if (!snappy::RawUncompress(&source, + reinterpret_cast(out->writableTail()))) { + throw std::runtime_error("snappy::RawUncompress failed"); + } + } + + out->append(actualUncompressedLength); + return out; +} + +#endif // FOLLY_HAVE_LIBSNAPPY + +#if FOLLY_HAVE_LIBLZMA + +/** + * LZMA2 compression + */ +class LZMA2StreamCodec final : public StreamCodec { + public: + static std::unique_ptr createCodec(int level, CodecType type); + static std::unique_ptr createStream(int level, CodecType type); + explicit LZMA2StreamCodec(int level, CodecType type); + ~LZMA2StreamCodec() override; + + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, Optional uncompressedLength) + const override; + + private: + bool doNeedsDataLength() const override; + uint64_t doMaxUncompressedLength() const override; + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; + + bool encodeSize() const { + return type() == CodecType::LZMA2_VARINT_SIZE; + } + + void doResetStream() override; + bool doCompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) override; + bool doUncompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) override; + + void resetCStream(); + void resetDStream(); + + bool decodeAndCheckVarint(ByteRange& input); + bool flushVarintBuffer(MutableByteRange& output); + void resetVarintBuffer(); + + Optional cstream_{}; + Optional dstream_{}; + + std::array varintBuffer_; + ByteRange varintToEncode_; + size_t varintBufferPos_{0}; + + int level_; + bool needReset_{true}; + bool needDecodeSize_{false}; +}; + +static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD; +static constexpr unsigned kLZMA2MagicBytes = 6; + +std::vector LZMA2StreamCodec::validPrefixes() const { + if (type() == CodecType::LZMA2_VARINT_SIZE) { + return {}; + } + return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)}; +} + +bool LZMA2StreamCodec::doNeedsDataLength() const { + return encodeSize(); +} + +bool LZMA2StreamCodec::canUncompress(const IOBuf* data, Optional) + const { + if (type() == CodecType::LZMA2_VARINT_SIZE) { + return false; + } + // Returns false for all inputs less than 8 bytes. + // This is okay, because no valid LZMA2 streams are less than 8 bytes. + return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes); +} + +std::unique_ptr LZMA2StreamCodec::createCodec( + int level, + CodecType type) { + return make_unique(level, type); +} + +std::unique_ptr LZMA2StreamCodec::createStream( + int level, + CodecType type) { + return make_unique(level, type); +} + +LZMA2StreamCodec::LZMA2StreamCodec(int level, CodecType type) + : StreamCodec(type) { + DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE); + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + level = 0; + break; + case COMPRESSION_LEVEL_DEFAULT: + level = LZMA_PRESET_DEFAULT; + break; + case COMPRESSION_LEVEL_BEST: + level = 9; + break; + } + if (level < 0 || level > 9) { + throw std::invalid_argument( + to("LZMA2Codec: invalid level: ", level)); + } + level_ = level; +} + +LZMA2StreamCodec::~LZMA2StreamCodec() { + if (cstream_) { + lzma_end(cstream_.get_pointer()); + cstream_.clear(); + } + if (dstream_) { + lzma_end(dstream_.get_pointer()); + dstream_.clear(); + } +} + +uint64_t LZMA2StreamCodec::doMaxUncompressedLength() const { + // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)" + return uint64_t(1) << 63; +} + +uint64_t LZMA2StreamCodec::doMaxCompressedLength( + uint64_t uncompressedLength) const { + return lzma_stream_buffer_bound(uncompressedLength) + + (encodeSize() ? kMaxVarintLength64 : 0); +} + +void LZMA2StreamCodec::doResetStream() { + needReset_ = true; +} + +void LZMA2StreamCodec::resetCStream() { + if (!cstream_) { + cstream_.assign(LZMA_STREAM_INIT); + } + lzma_ret const rc = + lzma_easy_encoder(cstream_.get_pointer(), level_, LZMA_CHECK_NONE); + if (rc != LZMA_OK) { + throw std::runtime_error(folly::to( + "LZMA2StreamCodec: lzma_easy_encoder error: ", rc)); + } +} + +void LZMA2StreamCodec::resetDStream() { + if (!dstream_) { + dstream_.assign(LZMA_STREAM_INIT); + } + lzma_ret const rc = lzma_auto_decoder( + dstream_.get_pointer(), std::numeric_limits::max(), 0); + if (rc != LZMA_OK) { + throw std::runtime_error(folly::to( + "LZMA2StreamCodec: lzma_auto_decoder error: ", rc)); + } +} + +static lzma_ret lzmaThrowOnError(lzma_ret const rc) { + switch (rc) { + case LZMA_OK: + case LZMA_STREAM_END: + case LZMA_BUF_ERROR: // not fatal: returned if no progress was made twice + return rc; + default: + throw std::runtime_error( + to("LZMA2StreamCodec: error: ", rc)); + } +} + +static lzma_action lzmaTranslateFlush(StreamCodec::FlushOp flush) { + switch (flush) { + case StreamCodec::FlushOp::NONE: + return LZMA_RUN; + case StreamCodec::FlushOp::FLUSH: + return LZMA_SYNC_FLUSH; + case StreamCodec::FlushOp::END: + return LZMA_FINISH; + default: + throw std::invalid_argument("LZMA2StreamCodec: Invalid flush"); + } +} + +/** + * Flushes the varint buffer. + * Advances output by the number of bytes written. + * Returns true when flushing is complete. + */ +bool LZMA2StreamCodec::flushVarintBuffer(MutableByteRange& output) { + if (varintToEncode_.empty()) { + return true; + } + const size_t numBytesToCopy = std::min(varintToEncode_.size(), output.size()); + if (numBytesToCopy > 0) { + memcpy(output.data(), varintToEncode_.data(), numBytesToCopy); + } + varintToEncode_.advance(numBytesToCopy); + output.advance(numBytesToCopy); + return varintToEncode_.empty(); +} + +bool LZMA2StreamCodec::doCompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) { + if (needReset_) { + resetCStream(); + if (encodeSize()) { + varintBufferPos_ = 0; + size_t const varintSize = + encodeVarint(*uncompressedLength(), varintBuffer_.data()); + varintToEncode_ = {varintBuffer_.data(), varintSize}; + } + needReset_ = false; + } + + if (!flushVarintBuffer(output)) { + return false; + } + + cstream_->next_in = const_cast(input.data()); + cstream_->avail_in = input.size(); + cstream_->next_out = output.data(); + cstream_->avail_out = output.size(); + SCOPE_EXIT { + input.uncheckedAdvance(input.size() - cstream_->avail_in); + output.uncheckedAdvance(output.size() - cstream_->avail_out); + }; + lzma_ret const rc = lzmaThrowOnError( + lzma_code(cstream_.get_pointer(), lzmaTranslateFlush(flushOp))); + switch (flushOp) { + case StreamCodec::FlushOp::NONE: + return false; + case StreamCodec::FlushOp::FLUSH: + return cstream_->avail_in == 0 && cstream_->avail_out != 0; + case StreamCodec::FlushOp::END: + return rc == LZMA_STREAM_END; + default: + throw std::invalid_argument("LZMA2StreamCodec: invalid FlushOp"); + } +} + +/** + * Attempts to decode a varint from input. + * The function advances input by the number of bytes read. + * + * If there are too many bytes and the varint is not valid, throw a + * runtime_error. + * + * If the uncompressed length was provided and a decoded varint does not match + * the provided length, throw a runtime_error. + * + * Returns true if the varint was successfully decoded and matches the + * uncompressed length if provided, and false if more bytes are needed. + */ +bool LZMA2StreamCodec::decodeAndCheckVarint(ByteRange& input) { + if (input.empty()) { + return false; + } + size_t const numBytesToCopy = + std::min(kMaxVarintLength64 - varintBufferPos_, input.size()); + memcpy(varintBuffer_.data() + varintBufferPos_, input.data(), numBytesToCopy); + + size_t const rangeSize = varintBufferPos_ + numBytesToCopy; + ByteRange range{varintBuffer_.data(), rangeSize}; + auto const ret = tryDecodeVarint(range); + + if (ret.hasValue()) { + size_t const varintSize = rangeSize - range.size(); + input.advance(varintSize - varintBufferPos_); + if (uncompressedLength() && *uncompressedLength() != ret.value()) { + throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length"); + } + return true; + } else if (ret.error() == DecodeVarintError::TooManyBytes) { + throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length"); + } else { + // Too few bytes + input.advance(numBytesToCopy); + varintBufferPos_ += numBytesToCopy; + return false; + } +} + +bool LZMA2StreamCodec::doUncompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) { + if (needReset_) { + resetDStream(); + needReset_ = false; + needDecodeSize_ = encodeSize(); + if (encodeSize()) { + // Reset buffer + varintBufferPos_ = 0; + } + } + + if (needDecodeSize_) { + // Try decoding the varint. If the input does not contain the entire varint, + // buffer the input. If the varint can not be decoded, fail. + if (!decodeAndCheckVarint(input)) { + return false; + } + needDecodeSize_ = false; + } + + dstream_->next_in = const_cast(input.data()); + dstream_->avail_in = input.size(); + dstream_->next_out = output.data(); + dstream_->avail_out = output.size(); + SCOPE_EXIT { + input.advance(input.size() - dstream_->avail_in); + output.advance(output.size() - dstream_->avail_out); + }; + + lzma_ret rc; + switch (flushOp) { + case StreamCodec::FlushOp::NONE: + case StreamCodec::FlushOp::FLUSH: + rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_RUN)); + break; + case StreamCodec::FlushOp::END: + rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_FINISH)); + break; + default: + throw std::invalid_argument("LZMA2StreamCodec: invalid flush"); + } + return rc == LZMA_STREAM_END; +} +#endif // FOLLY_HAVE_LIBLZMA + +#ifdef FOLLY_HAVE_LIBZSTD + +namespace { +void zstdFreeCStream(ZSTD_CStream* zcs) { + ZSTD_freeCStream(zcs); +} + +void zstdFreeDStream(ZSTD_DStream* zds) { + ZSTD_freeDStream(zds); +} +} + +/** + * ZSTD compression + */ +class ZSTDStreamCodec final : public StreamCodec { + public: + static std::unique_ptr createCodec(int level, CodecType); + static std::unique_ptr createStream(int level, CodecType); + explicit ZSTDStreamCodec(int level, CodecType type); + + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, Optional uncompressedLength) + const override; + + private: + bool doNeedsUncompressedLength() const override; + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; + Optional doGetUncompressedLength( + IOBuf const* data, + Optional uncompressedLength) const override; + + void doResetStream() override; + bool doCompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) override; + bool doUncompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) override; + + void resetCStream(); + void resetDStream(); + + bool tryBlockCompress(ByteRange& input, MutableByteRange& output) const; + bool tryBlockUncompress(ByteRange& input, MutableByteRange& output) const; + + int level_; + bool needReset_{true}; + std::unique_ptr< + ZSTD_CStream, + folly::static_function_deleter> + cstream_{nullptr}; + std::unique_ptr< + ZSTD_DStream, + folly::static_function_deleter> + dstream_{nullptr}; +}; + +static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528; + +std::vector ZSTDStreamCodec::validPrefixes() const { + return {prefixToStringLE(kZSTDMagicLE)}; +} + +bool ZSTDStreamCodec::canUncompress(const IOBuf* data, Optional) + const { + return dataStartsWithLE(data, kZSTDMagicLE); +} + +std::unique_ptr ZSTDStreamCodec::createCodec(int level, CodecType type) { + return make_unique(level, type); +} + +std::unique_ptr ZSTDStreamCodec::createStream( + int level, + CodecType type) { + return make_unique(level, type); +} + +ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type) + : StreamCodec(type) { + DCHECK(type == CodecType::ZSTD); + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + level = 1; + break; + case COMPRESSION_LEVEL_DEFAULT: + level = 1; + break; + case COMPRESSION_LEVEL_BEST: + level = 19; + break; + } + if (level < 1 || level > ZSTD_maxCLevel()) { + throw std::invalid_argument( + to("ZSTD: invalid level: ", level)); + } + level_ = level; +} + +bool ZSTDStreamCodec::doNeedsUncompressedLength() const { + return false; +} + +uint64_t ZSTDStreamCodec::doMaxCompressedLength( + uint64_t uncompressedLength) const { + return ZSTD_compressBound(uncompressedLength); +} + +void zstdThrowIfError(size_t rc) { + if (!ZSTD_isError(rc)) { + return; + } + throw std::runtime_error( + to("ZSTD returned an error: ", ZSTD_getErrorName(rc))); +} + +Optional ZSTDStreamCodec::doGetUncompressedLength( + IOBuf const* data, + Optional uncompressedLength) const { + // Read decompressed size from frame if available in first IOBuf. + auto const decompressedSize = + ZSTD_getDecompressedSize(data->data(), data->length()); + if (decompressedSize != 0) { + if (uncompressedLength && *uncompressedLength != decompressedSize) { + throw std::runtime_error("ZSTD: invalid uncompressed length"); + } + uncompressedLength = decompressedSize; + } + return uncompressedLength; +} + +void ZSTDStreamCodec::doResetStream() { + needReset_ = true; +} + +bool ZSTDStreamCodec::tryBlockCompress( + ByteRange& input, + MutableByteRange& output) const { + DCHECK(needReset_); + // We need to know that we have enough output space to use block compression + if (output.size() < ZSTD_compressBound(input.size())) { + return false; + } + size_t const length = ZSTD_compress( + output.data(), output.size(), input.data(), input.size(), level_); + zstdThrowIfError(length); + input.uncheckedAdvance(input.size()); + output.uncheckedAdvance(length); + return true; +} + +void ZSTDStreamCodec::resetCStream() { + if (!cstream_) { + cstream_.reset(ZSTD_createCStream()); + if (!cstream_) { + throw std::bad_alloc{}; + } + } + // Advanced API usage works for all supported versions of zstd. + // Required to set contentSizeFlag. + auto params = ZSTD_getParams(level_, uncompressedLength().value_or(0), 0); + params.fParams.contentSizeFlag = uncompressedLength().hasValue(); + zstdThrowIfError(ZSTD_initCStream_advanced( + cstream_.get(), nullptr, 0, params, uncompressedLength().value_or(0))); +} + +bool ZSTDStreamCodec::doCompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) { + if (needReset_) { + // If we are given all the input in one chunk try to use block compression + if (flushOp == StreamCodec::FlushOp::END && + tryBlockCompress(input, output)) { + return true; + } + resetCStream(); + needReset_ = false; + } + ZSTD_inBuffer in = {input.data(), input.size(), 0}; + ZSTD_outBuffer out = {output.data(), output.size(), 0}; + SCOPE_EXIT { + input.uncheckedAdvance(in.pos); + output.uncheckedAdvance(out.pos); + }; + if (flushOp == StreamCodec::FlushOp::NONE || !input.empty()) { + zstdThrowIfError(ZSTD_compressStream(cstream_.get(), &out, &in)); + } + if (in.pos == in.size && flushOp != StreamCodec::FlushOp::NONE) { + size_t rc; + switch (flushOp) { + case StreamCodec::FlushOp::FLUSH: + rc = ZSTD_flushStream(cstream_.get(), &out); + break; + case StreamCodec::FlushOp::END: + rc = ZSTD_endStream(cstream_.get(), &out); + break; + default: + throw std::invalid_argument("ZSTD: invalid FlushOp"); + } + zstdThrowIfError(rc); + if (rc == 0) { + return true; + } + } + return false; +} + +bool ZSTDStreamCodec::tryBlockUncompress( + ByteRange& input, + MutableByteRange& output) const { + DCHECK(needReset_); +#if ZSTD_VERSION_NUMBER < 10104 + // We require ZSTD_findFrameCompressedSize() to perform this optimization. + return false; +#else + // We need to know the uncompressed length and have enough output space. + if (!uncompressedLength() || output.size() < *uncompressedLength()) { + return false; + } + size_t const compressedLength = + ZSTD_findFrameCompressedSize(input.data(), input.size()); + zstdThrowIfError(compressedLength); + size_t const length = ZSTD_decompress( + output.data(), *uncompressedLength(), input.data(), compressedLength); + zstdThrowIfError(length); + if (length != *uncompressedLength()) { + throw std::runtime_error("ZSTDStreamCodec: Incorrect uncompressed length"); + } + input.uncheckedAdvance(compressedLength); + output.uncheckedAdvance(length); + return true; +#endif +} + +void ZSTDStreamCodec::resetDStream() { + if (!dstream_) { + dstream_.reset(ZSTD_createDStream()); + if (!dstream_) { + throw std::bad_alloc{}; + } + } + zstdThrowIfError(ZSTD_initDStream(dstream_.get())); +} + +bool ZSTDStreamCodec::doUncompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) { + if (needReset_) { + // If we are given all the input in one chunk try to use block uncompression + if (flushOp == StreamCodec::FlushOp::END && + tryBlockUncompress(input, output)) { + return true; + } + resetDStream(); + needReset_ = false; + } + ZSTD_inBuffer in = {input.data(), input.size(), 0}; + ZSTD_outBuffer out = {output.data(), output.size(), 0}; + SCOPE_EXIT { + input.uncheckedAdvance(in.pos); + output.uncheckedAdvance(out.pos); + }; + size_t const rc = ZSTD_decompressStream(dstream_.get(), &out, &in); + zstdThrowIfError(rc); + return rc == 0; +} + +#endif // FOLLY_HAVE_LIBZSTD + +#if FOLLY_HAVE_LIBBZ2 + +class Bzip2Codec final : public Codec { + public: + static std::unique_ptr create(int level, CodecType type); + explicit Bzip2Codec(int level, CodecType type); + + std::vector validPrefixes() const override; + bool canUncompress(IOBuf const* data, Optional uncompressedLength) + const override; + + private: + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; + std::unique_ptr doCompress(IOBuf const* data) override; + std::unique_ptr doUncompress( + IOBuf const* data, + Optional uncompressedLength) override; + + int level_; +}; + +/* static */ std::unique_ptr Bzip2Codec::create( + int level, + CodecType type) { + return std::make_unique(level, type); +} + +Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) { + DCHECK(type == CodecType::BZIP2); + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + level = 1; + break; + case COMPRESSION_LEVEL_DEFAULT: + level = 9; + break; + case COMPRESSION_LEVEL_BEST: + level = 9; + break; + } + if (level < 1 || level > 9) { + throw std::invalid_argument( + to("Bzip2: invalid level: ", level)); + } + level_ = level; +} + +static uint32_t constexpr kBzip2MagicLE = 0x685a42; +static uint64_t constexpr kBzip2MagicBytes = 3; + +std::vector Bzip2Codec::validPrefixes() const { + return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)}; +} + +bool Bzip2Codec::canUncompress(IOBuf const* data, Optional) const { + return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes); +} + +uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const { + // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress + // To guarantee that the compressed data will fit in its buffer, allocate an + // output buffer of size 1% larger than the uncompressed data, plus six + // hundred extra bytes. + return uncompressedLength + uncompressedLength / 100 + 600; +} + +static bz_stream createBzStream() { + bz_stream stream; + stream.bzalloc = nullptr; + stream.bzfree = nullptr; + stream.opaque = nullptr; + stream.next_in = stream.next_out = nullptr; + stream.avail_in = stream.avail_out = 0; + return stream; +} + +// Throws on error condition, otherwise returns the code. +static int bzCheck(int const rc) { + switch (rc) { + case BZ_OK: + case BZ_RUN_OK: + case BZ_FLUSH_OK: + case BZ_FINISH_OK: + case BZ_STREAM_END: + return rc; + default: + throw std::runtime_error(to("Bzip2 error: ", rc)); + } +} + +static std::unique_ptr addOutputBuffer( + bz_stream* stream, + uint64_t const bufferLength) { + DCHECK_LE(bufferLength, std::numeric_limits::max()); + DCHECK_EQ(stream->avail_out, 0); + + auto buf = IOBuf::create(bufferLength); + buf->append(buf->capacity()); + + stream->next_out = reinterpret_cast(buf->writableData()); + stream->avail_out = buf->length(); + + return buf; +} + +std::unique_ptr Bzip2Codec::doCompress(IOBuf const* data) { + bz_stream stream = createBzStream(); + bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0)); + SCOPE_EXIT { + bzCheck(BZ2_bzCompressEnd(&stream)); + }; + + uint64_t const uncompressedLength = data->computeChainDataLength(); + uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength); + uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB + uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20; + + auto out = addOutputBuffer( + &stream, + maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen + : kDefaultBufferLength); + + for (auto range : *data) { + while (!range.empty()) { + auto const inSize = std::min(range.size(), kMaxSingleStepLength); + stream.next_in = + const_cast(reinterpret_cast(range.data())); + stream.avail_in = inSize; + + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); + } + + bzCheck(BZ2_bzCompress(&stream, BZ_RUN)); + range.uncheckedAdvance(inSize - stream.avail_in); + } + } + do { + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); + } + } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END); + + out->prev()->trimEnd(stream.avail_out); + + return out; +} + +std::unique_ptr Bzip2Codec::doUncompress( + const IOBuf* data, + Optional uncompressedLength) { + bz_stream stream = createBzStream(); + bzCheck(BZ2_bzDecompressInit(&stream, 0, 0)); + SCOPE_EXIT { + bzCheck(BZ2_bzDecompressEnd(&stream)); + }; + + uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB + uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB + uint64_t const kDefaultBufferLength = + computeBufferLength(data->computeChainDataLength(), kBlockSize); + + auto out = addOutputBuffer( + &stream, + ((uncompressedLength && *uncompressedLength <= kMaxSingleStepLength) + ? *uncompressedLength + : kDefaultBufferLength)); + + int rc = BZ_OK; + for (auto range : *data) { + while (!range.empty()) { + auto const inSize = std::min(range.size(), kMaxSingleStepLength); + stream.next_in = + const_cast(reinterpret_cast(range.data())); + stream.avail_in = inSize; + + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); + } + + rc = bzCheck(BZ2_bzDecompress(&stream)); + range.uncheckedAdvance(inSize - stream.avail_in); + } + } + while (rc != BZ_STREAM_END) { + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); + } + size_t const outputSize = stream.avail_out; + rc = bzCheck(BZ2_bzDecompress(&stream)); + if (outputSize == stream.avail_out) { + throw std::runtime_error("Bzip2Codec: Truncated input"); + } + } + + out->prev()->trimEnd(stream.avail_out); + + uint64_t const totalOut = + (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32; + if (uncompressedLength && uncompressedLength != totalOut) { + throw std::runtime_error("Bzip2 error: Invalid uncompressed length"); + } + + return out; +} + +#endif // FOLLY_HAVE_LIBBZ2 + +#if FOLLY_HAVE_LIBZ + +zlib::Options getZlibOptions(CodecType type) { + DCHECK(type == CodecType::GZIP || type == CodecType::ZLIB); + return type == CodecType::GZIP ? zlib::defaultGzipOptions() + : zlib::defaultZlibOptions(); +} + +std::unique_ptr getZlibCodec(int level, CodecType type) { + return zlib::getCodec(getZlibOptions(type), level); +} + +std::unique_ptr getZlibStreamCodec(int level, CodecType type) { + return zlib::getStreamCodec(getZlibOptions(type), level); +} + +#endif // FOLLY_HAVE_LIBZ + +/** + * Automatic decompression + */ +class AutomaticCodec final : public Codec { + public: + static std::unique_ptr create( + std::vector> customCodecs, + std::unique_ptr terminalCodec); + explicit AutomaticCodec( + std::vector> customCodecs, + std::unique_ptr terminalCodec); + + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, Optional uncompressedLength) + const override; + + private: + bool doNeedsUncompressedLength() const override; + uint64_t doMaxUncompressedLength() const override; + + uint64_t doMaxCompressedLength(uint64_t) const override { + throw std::runtime_error( + "AutomaticCodec error: maxCompressedLength() not supported."); + } + std::unique_ptr doCompress(const IOBuf*) override { + throw std::runtime_error("AutomaticCodec error: compress() not supported."); + } + std::unique_ptr doUncompress( + const IOBuf* data, + Optional uncompressedLength) override; + + void addCodecIfSupported(CodecType type); + + // Throws iff the codecs aren't compatible (very slow) + void checkCompatibleCodecs() const; + + std::vector> codecs_; + std::unique_ptr terminalCodec_; + bool needsUncompressedLength_; + uint64_t maxUncompressedLength_; +}; + +std::vector AutomaticCodec::validPrefixes() const { + std::unordered_set prefixes; + for (const auto& codec : codecs_) { + const auto codecPrefixes = codec->validPrefixes(); + prefixes.insert(codecPrefixes.begin(), codecPrefixes.end()); + } + return std::vector{prefixes.begin(), prefixes.end()}; +} + +bool AutomaticCodec::canUncompress( + const IOBuf* data, + Optional uncompressedLength) const { + return std::any_of( + codecs_.begin(), + codecs_.end(), + [data, uncompressedLength](std::unique_ptr const& codec) { + return codec->canUncompress(data, uncompressedLength); + }); +} + +void AutomaticCodec::addCodecIfSupported(CodecType type) { + const bool present = std::any_of( + codecs_.begin(), + codecs_.end(), + [&type](std::unique_ptr const& codec) { + return codec->type() == type; + }); + bool const isTerminalType = terminalCodec_ && terminalCodec_->type() == type; + if (hasCodec(type) && !present && !isTerminalType) { + codecs_.push_back(getCodec(type)); + } +} + +/* static */ std::unique_ptr AutomaticCodec::create( + std::vector> customCodecs, + std::unique_ptr terminalCodec) { + return std::make_unique( + std::move(customCodecs), std::move(terminalCodec)); +} + +AutomaticCodec::AutomaticCodec( + std::vector> customCodecs, + std::unique_ptr terminalCodec) + : Codec(CodecType::USER_DEFINED), + codecs_(std::move(customCodecs)), + terminalCodec_(std::move(terminalCodec)) { + // Fastest -> slowest + std::array defaultTypes{{ + CodecType::LZ4_FRAME, + CodecType::ZSTD, + CodecType::ZLIB, + CodecType::GZIP, + CodecType::LZMA2, + CodecType::BZIP2, + }}; + + for (auto type : defaultTypes) { + addCodecIfSupported(type); + } + + if (kIsDebug) { + checkCompatibleCodecs(); + } + + // Check that none of the codecs are null + DCHECK(std::none_of( + codecs_.begin(), codecs_.end(), [](std::unique_ptr const& codec) { + return codec == nullptr; + })); + + // Check that the terminal codec's type is not duplicated (with the exception + // of USER_DEFINED). + if (terminalCodec_) { + DCHECK(std::none_of( + codecs_.begin(), + codecs_.end(), + [&](std::unique_ptr const& codec) { + return codec->type() != CodecType::USER_DEFINED && + codec->type() == terminalCodec_->type(); + })); + } + + bool const terminalNeedsUncompressedLength = + terminalCodec_ && terminalCodec_->needsUncompressedLength(); + needsUncompressedLength_ = std::any_of( + codecs_.begin(), + codecs_.end(), + [](std::unique_ptr const& codec) { + return codec->needsUncompressedLength(); + }) || + terminalNeedsUncompressedLength; + + const auto it = std::max_element( + codecs_.begin(), + codecs_.end(), + [](std::unique_ptr const& lhs, std::unique_ptr const& rhs) { + return lhs->maxUncompressedLength() < rhs->maxUncompressedLength(); + }); + DCHECK(it != codecs_.end()); + auto const terminalMaxUncompressedLength = + terminalCodec_ ? terminalCodec_->maxUncompressedLength() : 0; + maxUncompressedLength_ = + std::max((*it)->maxUncompressedLength(), terminalMaxUncompressedLength); +} + +void AutomaticCodec::checkCompatibleCodecs() const { + // Keep track of all the possible headers. + std::unordered_set headers; + // The empty header is not allowed. + headers.insert(""); + // Step 1: + // Construct a set of headers and check that none of the headers occur twice. + // Eliminate edge cases. + for (auto&& codec : codecs_) { + const auto codecHeaders = codec->validPrefixes(); + // Codecs without any valid headers are not allowed. + if (codecHeaders.empty()) { + throw std::invalid_argument{ + "AutomaticCodec: validPrefixes() must not be empty."}; + } + // Insert all the headers for the current codec. + const size_t beforeSize = headers.size(); + headers.insert(codecHeaders.begin(), codecHeaders.end()); + // Codecs are not compatible if any header occurred twice. + if (beforeSize + codecHeaders.size() != headers.size()) { + throw std::invalid_argument{ + "AutomaticCodec: Two valid prefixes collide."}; + } + } + // Step 2: + // Check if any strict non-empty prefix of any header is a header. + for (const auto& header : headers) { + for (size_t i = 1; i < header.size(); ++i) { + if (headers.count(header.substr(0, i))) { + throw std::invalid_argument{ + "AutomaticCodec: One valid prefix is a prefix of another valid " + "prefix."}; + } + } + } +} + +bool AutomaticCodec::doNeedsUncompressedLength() const { + return needsUncompressedLength_; +} + +uint64_t AutomaticCodec::doMaxUncompressedLength() const { + return maxUncompressedLength_; +} + +std::unique_ptr AutomaticCodec::doUncompress( + const IOBuf* data, + Optional uncompressedLength) { + try { + for (auto&& codec : codecs_) { + if (codec->canUncompress(data, uncompressedLength)) { + return codec->uncompress(data, uncompressedLength); + } + } + } catch (std::exception const& e) { + if (!terminalCodec_) { + throw e; + } + } + + // Try terminal codec + if (terminalCodec_) { + return terminalCodec_->uncompress(data, uncompressedLength); + } + + throw std::runtime_error("AutomaticCodec error: Unknown compressed data"); +} + +using CodecFactory = std::unique_ptr (*)(int, CodecType); +using StreamCodecFactory = std::unique_ptr (*)(int, CodecType); +struct Factory { + CodecFactory codec; + StreamCodecFactory stream; +}; + +constexpr Factory + codecFactories[static_cast(CodecType::NUM_CODEC_TYPES)] = { + {}, // USER_DEFINED + {NoCompressionCodec::create, nullptr}, + +#if FOLLY_HAVE_LIBLZ4 + {LZ4Codec::create, nullptr}, +#else + {}, +#endif + +#if FOLLY_HAVE_LIBSNAPPY + {SnappyCodec::create, nullptr}, +#else + {}, +#endif + +#if FOLLY_HAVE_LIBZ + {getZlibCodec, getZlibStreamCodec}, +#else + {}, +#endif + +#if FOLLY_HAVE_LIBLZ4 + {LZ4Codec::create, nullptr}, +#else + {}, +#endif + +#if FOLLY_HAVE_LIBLZMA + {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream}, + {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream}, +#else + {}, + {}, +#endif + +#if FOLLY_HAVE_LIBZSTD + {ZSTDStreamCodec::createCodec, ZSTDStreamCodec::createStream}, +#else + {}, +#endif + +#if FOLLY_HAVE_LIBZ + {getZlibCodec, getZlibStreamCodec}, +#else + {}, +#endif + +#if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301) + {LZ4FrameCodec::create, nullptr}, +#else + {}, +#endif + +#if FOLLY_HAVE_LIBBZ2 + {Bzip2Codec::create, nullptr}, +#else + {}, +#endif +}; + +Factory const& getFactory(CodecType type) { + size_t const idx = static_cast(type); + if (idx >= static_cast(CodecType::NUM_CODEC_TYPES)) { + throw std::invalid_argument( + to("Compression type ", idx, " invalid")); + } + return codecFactories[idx]; +} +} // namespace + +bool hasCodec(CodecType type) { + return getFactory(type).codec != nullptr; +} + +std::unique_ptr getCodec(CodecType type, int level) { + auto const factory = getFactory(type).codec; + if (!factory) { + throw std::invalid_argument( + to("Compression type ", type, " not supported")); + } + auto codec = (*factory)(level, type); + DCHECK(codec->type() == type); + return codec; +} + +bool hasStreamCodec(CodecType type) { + return getFactory(type).stream != nullptr; +} + +std::unique_ptr getStreamCodec(CodecType type, int level) { + auto const factory = getFactory(type).stream; + if (!factory) { + throw std::invalid_argument( + to("Compression type ", type, " not supported")); + } + auto codec = (*factory)(level, type); + DCHECK(codec->type() == type); + return codec; +} + +std::unique_ptr getAutoUncompressionCodec( + std::vector> customCodecs, + std::unique_ptr terminalCodec) { + return AutomaticCodec::create( + std::move(customCodecs), std::move(terminalCodec)); +} +} // namespace io +} // namespace folly diff --git a/folly/compression/Compression.h b/folly/compression/Compression.h new file mode 100644 index 00000000..345eda82 --- /dev/null +++ b/folly/compression/Compression.h @@ -0,0 +1,494 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include + +/** + * Compression / decompression over IOBufs + */ + +namespace folly { +namespace io { + +enum class CodecType { + /** + * This codec type is not defined; getCodec() will throw an exception + * if used. Useful if deriving your own classes from Codec without + * going through the getCodec() interface. + */ + USER_DEFINED = 0, + + /** + * Use no compression. + * Levels supported: 0 + */ + NO_COMPRESSION = 1, + + /** + * Use LZ4 compression. + * Levels supported: 1 = fast, 2 = best; default = 1 + */ + LZ4 = 2, + + /** + * Use Snappy compression. + * Levels supported: 1 + */ + SNAPPY = 3, + + /** + * Use zlib compression. + * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6 + */ + ZLIB = 4, + + /** + * Use LZ4 compression, prefixed with size (as Varint). + */ + LZ4_VARINT_SIZE = 5, + + /** + * Use LZMA2 compression. + * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6 + */ + LZMA2 = 6, + LZMA2_VARINT_SIZE = 7, + + /** + * Use ZSTD compression. + */ + ZSTD = 8, + + /** + * Use gzip compression. This is the same compression algorithm as ZLIB but + * gzip-compressed files tend to be easier to work with from the command line. + * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6 + */ + GZIP = 9, + + /** + * Use LZ4 frame compression. + * Levels supported: 0 = fast, 16 = best; default = 0 + */ + LZ4_FRAME = 10, + + /** + * Use bzip2 compression. + * Levels supported: 1 = fast, 9 = best; default = 9 + */ + BZIP2 = 11, + + NUM_CODEC_TYPES = 12, +}; + +class Codec { + public: + virtual ~Codec() { } + + static constexpr uint64_t UNLIMITED_UNCOMPRESSED_LENGTH = uint64_t(-1); + /** + * Return the maximum length of data that may be compressed with this codec. + * NO_COMPRESSION and ZLIB support arbitrary lengths; + * LZ4 supports up to 1.9GiB; SNAPPY supports up to 4GiB. + * May return UNLIMITED_UNCOMPRESSED_LENGTH if unlimited. + */ + uint64_t maxUncompressedLength() const; + + /** + * Return the codec's type. + */ + CodecType type() const { return type_; } + + /** + * Does this codec need the exact uncompressed length on decompression? + */ + bool needsUncompressedLength() const; + + /** + * Compress data, returning an IOBuf (which may share storage with data). + * Throws std::invalid_argument if data is larger than + * maxUncompressedLength(). + */ + std::unique_ptr compress(const folly::IOBuf* data); + + /** + * Compresses data. May involve additional copies compared to the overload + * that takes and returns IOBufs. Has the same error semantics as the IOBuf + * version. + */ + std::string compress(StringPiece data); + + /** + * Uncompress data. Throws std::runtime_error on decompression error. + * + * Some codecs (LZ4) require the exact uncompressed length; this is indicated + * by needsUncompressedLength(). + * + * For other codes (zlib), knowing the exact uncompressed length ahead of + * time might be faster. + * + * Regardless of the behavior of the underlying compressor, uncompressing + * an empty IOBuf chain will return an empty IOBuf chain. + */ + std::unique_ptr uncompress( + const IOBuf* data, + folly::Optional uncompressedLength = folly::none); + + /** + * Uncompresses data. May involve additional copies compared to the overload + * that takes and returns IOBufs. Has the same error semantics as the IOBuf + * version. + */ + std::string uncompress( + StringPiece data, + folly::Optional uncompressedLength = folly::none); + + /** + * Returns a bound on the maximum compressed length when compressing data with + * the given uncompressed length. + */ + uint64_t maxCompressedLength(uint64_t uncompressedLength) const; + + /** + * Extracts the uncompressed length from the compressed data if possible. + * If the codec doesn't store the uncompressed length, or the data is + * corrupted it returns the given uncompressedLength. + * If the uncompressed length is stored in the compressed data and + * uncompressedLength is not none and they do not match a std::runtime_error + * is thrown. + */ + folly::Optional getUncompressedLength( + const folly::IOBuf* data, + folly::Optional uncompressedLength = folly::none) const; + + protected: + explicit Codec(CodecType type); + + public: + /** + * Returns a superset of the set of prefixes for which canUncompress() will + * return true. A superset is allowed for optimizations in canUncompress() + * based on other knowledge such as length. None of the prefixes may be empty. + * default: No prefixes. + */ + virtual std::vector validPrefixes() const; + + /** + * Returns true if the codec thinks it can uncompress the data. + * If a codec doesn't have magic bytes at the beginning, like LZ4 and Snappy, + * it can always return false. + * default: Returns false. + */ + virtual bool canUncompress( + const folly::IOBuf* data, + folly::Optional uncompressedLength = folly::none) const; + + private: + // default: no limits (save for special value UNKNOWN_UNCOMPRESSED_LENGTH) + virtual uint64_t doMaxUncompressedLength() const; + // default: doesn't need uncompressed length + virtual bool doNeedsUncompressedLength() const; + virtual std::unique_ptr doCompress(const folly::IOBuf* data) = 0; + virtual std::unique_ptr doUncompress( + const folly::IOBuf* data, + folly::Optional uncompressedLength) = 0; + // default: an implementation is provided by default to wrap the strings into + // IOBufs and delegate to the IOBuf methods. This incurs a copy of the output + // from IOBuf to string. Implementers, at their discretion, can override + // these methods to avoid the copy. + virtual std::string doCompressString(StringPiece data); + virtual std::string doUncompressString( + StringPiece data, + folly::Optional uncompressedLength); + + virtual uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const = 0; + // default: returns the passed uncompressedLength. + virtual folly::Optional doGetUncompressedLength( + const folly::IOBuf* data, + folly::Optional uncompressedLength) const; + + CodecType type_; +}; + +class StreamCodec : public Codec { + public: + ~StreamCodec() override {} + + /** + * Does the codec need the data length before compression streaming? + */ + bool needsDataLength() const; + + /***************************************************************************** + * Streaming API + ***************************************************************************** + * A low-level stateful streaming API. + * Streaming operations can be started in two ways: + * 1. From a clean Codec on which no non-const methods have been called. + * 2. A call to resetStream(), which will reset any codec to a clean state. + * After a streaming operation has begun, either compressStream() or + * uncompressStream() must be called until the streaming operation ends. + * compressStream() ends when it returns true with flushOp END. + * uncompressStream() ends when it returns true. At this point the codec + * may be reused by calling resetStream(). + * + * compress() and uncompress() can be called at any time, but they interrupt + * any ongoing streaming operations (state is lost and resetStream() must be + * called before another streaming operation). + */ + + /** + * Reset the state of the codec, and set the uncompressed length for the next + * streaming operation. If uncompressedLength is not none it must be exactly + * the uncompressed length. compressStream() must be passed exactly + * uncompressedLength input bytes before the stream is ended. + * uncompressStream() must be passed a compressed frame that uncompresses to + * uncompressedLength. + */ + void resetStream(folly::Optional uncompressedLength = folly::none); + + enum class FlushOp { NONE, FLUSH, END }; + + /** + * Compresses some data from the input buffer and writes the compressed data + * into the output buffer. It may read input without producing any output, + * except when forced to flush. + * + * The input buffer is advanced to point to the range of data that hasn't yet + * been read. Compression will resume at this point for the next call to + * compressStream(). The output buffer is advanced one byte past the last byte + * written. + * + * The default flushOp is NONE, which allows compressStream() complete + * discretion in how much data to gather before writing any output. + * + * If flushOp is END, all pending and input data is flushed to the output + * buffer, and the frame is ended. compressStream() must be called with the + * same input and flushOp END until it returns true. At this point the caller + * must call resetStream() to use the codec again. + * + * If flushOp is FLUSH, all pending and input data is flushed to the output + * buffer, but the frame is not ended. compressStream() must be called with + * the same input and flushOp END until it returns true. At this point the + * caller can continue to compressStream() with any input data and flushOp. + * The uncompressor, if passed all the produced output data, will be able to + * uncompress all the input data passed to compressStream() so far. Excessive + * use of flushOp FLUSH will deteriorate compression ratio. This is useful for + * stateful streaming across a network. Most users don't need to use this + * flushOp. + * + * A std::logic_error is thrown on incorrect usage of the API. + * A std::runtime_error is thrown upon error conditions or if no forward + * progress could be made twice in a row. + */ + bool compressStream( + folly::ByteRange& input, + folly::MutableByteRange& output, + FlushOp flushOp = StreamCodec::FlushOp::NONE); + + /** + * Uncompresses some data from the input buffer and writes the uncompressed + * data into the output buffer. It may read input without producing any + * output. + * + * The input buffer is advanced to point to the range of data that hasn't yet + * been read. Uncompression will resume at this point for the next call to + * uncompressStream(). The output buffer is advanced one byte past the last + * byte written. + * + * The default flushOp is NONE, which allows uncompressStream() complete + * discretion in how much output data to flush. The uncompressor may not make + * maximum forward progress, but will make some forward progress when + * possible. + * + * If flushOp is END, the caller guarantees that no more input will be + * presented to uncompressStream(). uncompressStream() must be called with the + * same input and flushOp END until it returns true. This is not mandatory, + * but if the input is all available in one buffer, and there is enough output + * space to write the entire frame, codecs can uncompress faster. + * + * If flushOp is FLUSH, uncompressStream() is guaranteed to make the maximum + * amount of forward progress possible. When using this flushOp and + * uncompressStream() returns with `!output.empty()` the caller knows that all + * pending output has been flushed. This is useful for stateful streaming + * across a network, and it should be used in conjunction with + * compressStream() with flushOp FLUSH. Most users don't need to use this + * flushOp. + * + * A std::runtime_error is thrown upon error conditions or if no forward + * progress could be made upon two consecutive calls to the function (only the + * second call will throw an exception). + * + * Returns true at the end of a frame. At this point resetStream() must be + * called to reuse the codec. + */ + bool uncompressStream( + folly::ByteRange& input, + folly::MutableByteRange& output, + FlushOp flushOp = StreamCodec::FlushOp::NONE); + + protected: + explicit StreamCodec(CodecType type) : Codec(type) {} + + // Returns the uncompressed length last passed to resetStream() or none if it + // hasn't been called yet. + folly::Optional uncompressedLength() const { + return uncompressedLength_; + } + + private: + // default: Implemented using the streaming API. + std::unique_ptr doCompress(const folly::IOBuf* data) override; + std::unique_ptr doUncompress( + const folly::IOBuf* data, + folly::Optional uncompressedLength) override; + + // default: Returns false + virtual bool doNeedsDataLength() const; + virtual void doResetStream() = 0; + virtual bool doCompressStream( + folly::ByteRange& input, + folly::MutableByteRange& output, + FlushOp flushOp) = 0; + virtual bool doUncompressStream( + folly::ByteRange& input, + folly::MutableByteRange& output, + FlushOp flushOp) = 0; + + enum class State { + RESET, + COMPRESS, + COMPRESS_FLUSH, + COMPRESS_END, + UNCOMPRESS, + END, + }; + void assertStateIs(State expected) const; + + CodecType type_; + State state_{State::RESET}; + ByteRange previousInput_{}; + folly::Optional uncompressedLength_{}; + bool progressMade_{true}; +}; + +constexpr int COMPRESSION_LEVEL_FASTEST = -1; +constexpr int COMPRESSION_LEVEL_DEFAULT = -2; +constexpr int COMPRESSION_LEVEL_BEST = -3; + +/** + * Return a codec for the given type. Throws on error. The level + * is a non-negative codec-dependent integer indicating the level of + * compression desired, or one of the following constants: + * + * COMPRESSION_LEVEL_FASTEST is fastest (uses least CPU / memory, + * worst compression) + * COMPRESSION_LEVEL_DEFAULT is the default (likely a tradeoff between + * FASTEST and BEST) + * COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory, + * best compression) + * + * When decompressing, the compression level is ignored. All codecs will + * decompress all data compressed with the a codec of the same type, regardless + * of compression level. + */ +std::unique_ptr getCodec( + CodecType type, + int level = COMPRESSION_LEVEL_DEFAULT); + +/** + * Return a codec for the given type. Throws on error. The level + * is a non-negative codec-dependent integer indicating the level of + * compression desired, or one of the following constants: + * + * COMPRESSION_LEVEL_FASTEST is fastest (uses least CPU / memory, + * worst compression) + * COMPRESSION_LEVEL_DEFAULT is the default (likely a tradeoff between + * FASTEST and BEST) + * COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory, + * best compression) + * + * When decompressing, the compression level is ignored. All codecs will + * decompress all data compressed with the a codec of the same type, regardless + * of compression level. + */ +std::unique_ptr getStreamCodec( + CodecType type, + int level = COMPRESSION_LEVEL_DEFAULT); + +/** + * Returns a codec that can uncompress any of the given codec types as well as + * {LZ4_FRAME, ZSTD, ZLIB, GZIP, LZMA2, BZIP2}. Appends each default codec to + * customCodecs in order, so long as a codec with the same type() isn't already + * present in customCodecs or as the terminalCodec. When uncompress() is called, + * each codec's canUncompress() is called in the order that they are given. + * Appended default codecs are checked last. uncompress() is called on the + * first codec whose canUncompress() returns true. + * + * In addition, an optional `terminalCodec` can be provided. This codec's + * uncompress() will be called either when no other codec canUncompress() the + * data or the chosen codec throws an exception on the data. The terminalCodec + * is intended for ambiguous headers, when canUncompress() is false for some + * data it can actually uncompress. The terminalCodec does not need to override + * validPrefixes() or canUncompress() and overriding these functions will have + * no effect on the returned codec's validPrefixes() or canUncompress() + * functions. The terminalCodec's needsUncompressedLength() and + * maxUncompressedLength() will affect the returned codec's respective + * functions. The terminalCodec must not be duplicated in customCodecs. + * + * An exception is thrown if no codec canUncompress() the data and either no + * terminal codec was provided or a terminal codec was provided and it throws on + * the data. + * An exception is thrown if the chosen codec's uncompress() throws on the data + * and either no terminal codec was provided or a terminal codec was provided + * and it also throws on the data. + * An exception is thrown if compress() is called on the returned codec. + * + * Requirements are checked in debug mode and are as follows: + * Let headers be the concatenation of every codec's validPrefixes(). + * 1. Each codec must override validPrefixes() and canUncompress(). + * 2. No codec's validPrefixes() may be empty. + * 3. No header in headers may be empty. + * 4. headers must not contain any duplicate elements. + * 5. No strict non-empty prefix of any header in headers may be in headers. + * 6. The terminalCodec's type must not be the same as any other codec's type + * (with USER_DEFINED being the exception). + */ +std::unique_ptr getAutoUncompressionCodec( + std::vector> customCodecs = {}, + std::unique_ptr terminalCodec = {}); + +/** + * Check if a specified codec is supported. + */ +bool hasCodec(CodecType type); + +/** + * Check if a specified codec is supported and supports streaming. + */ +bool hasStreamCodec(CodecType type); +} // namespace io +} // namespace folly diff --git a/folly/compression/Utils.h b/folly/compression/Utils.h new file mode 100644 index 00000000..8d23723f --- /dev/null +++ b/folly/compression/Utils.h @@ -0,0 +1,67 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include + +/** + * Helper functions for compression codecs. + */ +namespace folly { +namespace io { +namespace compression { +namespace detail { + +/** + * Reads sizeof(T) bytes, and returns false if not enough bytes are available. + * Returns true if the first n bytes are equal to prefix when interpreted as + * a little endian T. + */ +template +typename std::enable_if::value, bool>::type +dataStartsWithLE(const IOBuf* data, T prefix, uint64_t n = sizeof(T)) { + DCHECK_GT(n, 0); + DCHECK_LE(n, sizeof(T)); + T value; + Cursor cursor{data}; + if (!cursor.tryReadLE(value)) { + return false; + } + const T mask = n == sizeof(T) ? T(-1) : (T(1) << (8 * n)) - 1; + return prefix == (value & mask); +} + +template +typename std::enable_if::value, std::string>::type +prefixToStringLE(T prefix, uint64_t n = sizeof(T)) { + DCHECK_GT(n, 0); + DCHECK_LE(n, sizeof(T)); + prefix = Endian::little(prefix); + std::string result; + result.resize(n); + memcpy(&result[0], &prefix, n); + return result; +} + +} // namespace detail +} // namespace compression +} // namespace io +} // namespace folly diff --git a/folly/compression/Zlib.cpp b/folly/compression/Zlib.cpp new file mode 100644 index 00000000..4df54d5d --- /dev/null +++ b/folly/compression/Zlib.cpp @@ -0,0 +1,416 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#if FOLLY_HAVE_LIBZ + +#include +#include +#include +#include +#include +#include +#include + +using folly::io::compression::detail::dataStartsWithLE; +using folly::io::compression::detail::prefixToStringLE; + +namespace folly { +namespace io { +namespace zlib { + +namespace { + +bool isValidStrategy(int strategy) { + std::array strategies{{ + Z_DEFAULT_STRATEGY, + Z_FILTERED, + Z_HUFFMAN_ONLY, + Z_RLE, + Z_FIXED + }}; + return std::any_of(strategies.begin(), strategies.end(), [&](int i) { + return i == strategy; + }); +} + +int getWindowBits(Options::Format format, int windowSize) { + switch (format) { + case Options::Format::ZLIB: + return windowSize; + case Options::Format::GZIP: + return windowSize + 16; + case Options::Format::RAW: + return -windowSize; + case Options::Format::AUTO: + return windowSize + 32; + default: + return windowSize; + } +} + +CodecType getCodecType(Options options) { + if (options.windowSize == 15 && options.format == Options::Format::ZLIB) { + return CodecType::ZLIB; + } else if ( + options.windowSize == 15 && options.format == Options::Format::GZIP) { + return CodecType::GZIP; + } else { + return CodecType::USER_DEFINED; + } +} + +class ZlibStreamCodec final : public StreamCodec { + public: + static std::unique_ptr createCodec(Options options, int level); + static std::unique_ptr createStream(Options options, int level); + + explicit ZlibStreamCodec(Options options, int level); + ~ZlibStreamCodec() override; + + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, Optional uncompressedLength) + const override; + + private: + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; + + void doResetStream() override; + bool doCompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flush) override; + bool doUncompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flush) override; + + void resetDeflateStream(); + void resetInflateStream(); + + Options options_; + + Optional deflateStream_{}; + Optional inflateStream_{}; + int level_; + bool needReset_{true}; +}; +static constexpr uint16_t kGZIPMagicLE = 0x8B1F; + +std::vector ZlibStreamCodec::validPrefixes() const { + if (type() == CodecType::ZLIB) { + // Zlib streams start with a 2 byte header. + // + // 0 1 + // +---+---+ + // |CMF|FLG| + // +---+---+ + // + // We won't restrict the values of any sub-fields except as described below. + // + // The lowest 4 bits of CMF is the compression method (CM). + // CM == 0x8 is the deflate compression method, which is currently the only + // supported compression method, so any valid prefix must have CM == 0x8. + // + // The lowest 5 bits of FLG is FCHECK. + // FCHECK must be such that the two header bytes are a multiple of 31 when + // interpreted as a big endian 16-bit number. + std::vector result; + // 16 values for the first byte, 8 values for the second byte. + // There are also 4 combinations where both 0x00 and 0x1F work as FCHECK. + result.reserve(132); + // Select all values for the CMF byte that use the deflate algorithm 0x8. + for (uint32_t first = 0x0800; first <= 0xF800; first += 0x1000) { + // Select all values for the FLG, but leave FCHECK as 0 since it's fixed. + for (uint32_t second = 0x00; second <= 0xE0; second += 0x20) { + uint16_t prefix = first | second; + // Compute FCHECK. + prefix += 31 - (prefix % 31); + result.push_back(prefixToStringLE(Endian::big(prefix))); + // zlib won't produce this, but it is a valid prefix. + if ((prefix & 0x1F) == 31) { + prefix -= 31; + result.push_back(prefixToStringLE(Endian::big(prefix))); + } + } + } + return result; + } else if (type() == CodecType::GZIP) { + // The gzip frame starts with 2 magic bytes. + return {prefixToStringLE(kGZIPMagicLE)}; + } else { + return {}; + } +} + +bool ZlibStreamCodec::canUncompress(const IOBuf* data, Optional) + const { + if (type() == CodecType::ZLIB) { + uint16_t value; + Cursor cursor{data}; + if (!cursor.tryReadBE(value)) { + return false; + } + // zlib compressed if using deflate and is a multiple of 31. + return (value & 0x0F00) == 0x0800 && value % 31 == 0; + } else if (type() == CodecType::GZIP) { + return dataStartsWithLE(data, kGZIPMagicLE); + } else { + return false; + } +} + +uint64_t ZlibStreamCodec::doMaxCompressedLength( + uint64_t uncompressedLength) const { + // When passed a nullptr, deflateBound() adds 6 bytes for a zlib wrapper. A + // gzip wrapper is 18 bytes, so we add the 12 byte difference. + return deflateBound(nullptr, uncompressedLength) + + (options_.format == Options::Format::GZIP ? 12 : 0); +} + +std::unique_ptr ZlibStreamCodec::createCodec( + Options options, + int level) { + return std::make_unique(options, level); +} + +std::unique_ptr ZlibStreamCodec::createStream( + Options options, + int level) { + return std::make_unique(options, level); +} + +ZlibStreamCodec::ZlibStreamCodec(Options options, int level) + : StreamCodec(getCodecType(options)) { + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + level = 1; + break; + case COMPRESSION_LEVEL_DEFAULT: + level = Z_DEFAULT_COMPRESSION; + break; + case COMPRESSION_LEVEL_BEST: + level = 9; + break; + } + auto inBounds = [](int value, int low, int high) { + return (value >= low) && (value <= high); + }; + + if (level != Z_DEFAULT_COMPRESSION && !inBounds(level, 0, 9)) { + throw std::invalid_argument( + to("ZlibStreamCodec: invalid level: ", level)); + } + level_ = level; + options_ = options; + + // Although zlib allows a windowSize of 8..15, a value of 8 is not + // properly supported and is treated as a value of 9. This means data deflated + // with windowSize==8 can not be re-inflated with windowSize==8. windowSize==8 + // is also not supported for gzip and raw deflation. + // Hence, the codec supports only 9..15. + if (!inBounds(options_.windowSize, 9, 15)) { + throw std::invalid_argument(to( + "ZlibStreamCodec: invalid windowSize option: ", options.windowSize)); + } + if (!inBounds(options_.memLevel, 1, 9)) { + throw std::invalid_argument(to( + "ZlibStreamCodec: invalid memLevel option: ", options.memLevel)); + } + if (!isValidStrategy(options_.strategy)) { + throw std::invalid_argument(to( + "ZlibStreamCodec: invalid strategy: ", options.strategy)); + } +} + +ZlibStreamCodec::~ZlibStreamCodec() { + if (deflateStream_) { + deflateEnd(deflateStream_.get_pointer()); + deflateStream_.clear(); + } + if (inflateStream_) { + inflateEnd(inflateStream_.get_pointer()); + inflateStream_.clear(); + } +} + +void ZlibStreamCodec::doResetStream() { + needReset_ = true; +} + +void ZlibStreamCodec::resetDeflateStream() { + if (deflateStream_) { + int const rc = deflateReset(deflateStream_.get_pointer()); + if (rc != Z_OK) { + deflateStream_.clear(); + throw std::runtime_error( + to("ZlibStreamCodec: deflateReset error: ", rc)); + } + return; + } + deflateStream_ = z_stream{}; + + // The automatic header detection format is only for inflation. + // Use zlib for deflation if the format is auto. + int const windowBits = getWindowBits( + options_.format == Options::Format::AUTO ? Options::Format::ZLIB + : options_.format, + options_.windowSize); + + int const rc = deflateInit2( + deflateStream_.get_pointer(), + level_, + Z_DEFLATED, + windowBits, + options_.memLevel, + options_.strategy); + if (rc != Z_OK) { + deflateStream_.clear(); + throw std::runtime_error( + to("ZlibStreamCodec: deflateInit error: ", rc)); + } +} + +void ZlibStreamCodec::resetInflateStream() { + if (inflateStream_) { + int const rc = inflateReset(inflateStream_.get_pointer()); + if (rc != Z_OK) { + inflateStream_.clear(); + throw std::runtime_error( + to("ZlibStreamCodec: inflateReset error: ", rc)); + } + return; + } + inflateStream_ = z_stream{}; + int const rc = inflateInit2( + inflateStream_.get_pointer(), + getWindowBits(options_.format, options_.windowSize)); + if (rc != Z_OK) { + inflateStream_.clear(); + throw std::runtime_error( + to("ZlibStreamCodec: inflateInit error: ", rc)); + } +} + +static int zlibTranslateFlush(StreamCodec::FlushOp flush) { + switch (flush) { + case StreamCodec::FlushOp::NONE: + return Z_NO_FLUSH; + case StreamCodec::FlushOp::FLUSH: + return Z_SYNC_FLUSH; + case StreamCodec::FlushOp::END: + return Z_FINISH; + default: + throw std::invalid_argument("ZlibStreamCodec: Invalid flush"); + } +} + +static int zlibThrowOnError(int rc) { + switch (rc) { + case Z_OK: + case Z_BUF_ERROR: + case Z_STREAM_END: + return rc; + default: + throw std::runtime_error(to("ZlibStreamCodec: error: ", rc)); + } +} + +bool ZlibStreamCodec::doCompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flush) { + if (needReset_) { + resetDeflateStream(); + needReset_ = false; + } + DCHECK(deflateStream_.hasValue()); + // zlib will return Z_STREAM_ERROR if output.data() is null. + if (output.data() == nullptr) { + return false; + } + deflateStream_->next_in = const_cast(input.data()); + deflateStream_->avail_in = input.size(); + deflateStream_->next_out = output.data(); + deflateStream_->avail_out = output.size(); + SCOPE_EXIT { + input.uncheckedAdvance(input.size() - deflateStream_->avail_in); + output.uncheckedAdvance(output.size() - deflateStream_->avail_out); + }; + int const rc = zlibThrowOnError( + deflate(deflateStream_.get_pointer(), zlibTranslateFlush(flush))); + switch (flush) { + case StreamCodec::FlushOp::NONE: + return false; + case StreamCodec::FlushOp::FLUSH: + return deflateStream_->avail_in == 0 && deflateStream_->avail_out != 0; + case StreamCodec::FlushOp::END: + return rc == Z_STREAM_END; + default: + throw std::invalid_argument("ZlibStreamCodec: Invalid flush"); + } +} + +bool ZlibStreamCodec::doUncompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flush) { + if (needReset_) { + resetInflateStream(); + needReset_ = false; + } + DCHECK(inflateStream_.hasValue()); + // zlib will return Z_STREAM_ERROR if output.data() is null. + if (output.data() == nullptr) { + return false; + } + inflateStream_->next_in = const_cast(input.data()); + inflateStream_->avail_in = input.size(); + inflateStream_->next_out = output.data(); + inflateStream_->avail_out = output.size(); + SCOPE_EXIT { + input.advance(input.size() - inflateStream_->avail_in); + output.advance(output.size() - inflateStream_->avail_out); + }; + int const rc = zlibThrowOnError( + inflate(inflateStream_.get_pointer(), zlibTranslateFlush(flush))); + return rc == Z_STREAM_END; +} + +} // namespace + +Options defaultGzipOptions() { + return Options(Options::Format::GZIP); +} + +Options defaultZlibOptions() { + return Options(Options::Format::ZLIB); +} + +std::unique_ptr getCodec(Options options, int level) { + return ZlibStreamCodec::createCodec(options, level); +} + +std::unique_ptr getStreamCodec(Options options, int level) { + return ZlibStreamCodec::createStream(options, level); +} + +} // namespace zlib +} // namespace io +} // namespace folly + +#endif // FOLLY_HAVE_LIBZ diff --git a/folly/compression/Zlib.h b/folly/compression/Zlib.h new file mode 100644 index 00000000..fb04cccc --- /dev/null +++ b/folly/compression/Zlib.h @@ -0,0 +1,129 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#if FOLLY_HAVE_LIBZ + +#include + +/** + * Interface for Zlib-specific codec initialization. + */ +namespace folly { +namespace io { +namespace zlib { + +struct Options { + /** + * ZLIB: default option -- write a zlib wrapper as documented in RFC 1950. + * + * GZIP: write a simple gzip header and trailer around the compressed data + * instead of a zlib wrapper. + * + * RAW: deflate will generate raw deflate data with no zlib header or + * trailer, and will not compute a check value. + * + * AUTO: enable automatic header detection for decoding gzip or zlib data. + * For deflation, ZLIB will be used. + */ + enum class Format { ZLIB, GZIP, RAW, AUTO }; + + explicit Options( + Format format = Format::ZLIB, + int windowSize = 15, + int memLevel = 8, + int strategy = Z_DEFAULT_STRATEGY) + : format(format), + windowSize(windowSize), + memLevel(memLevel), + strategy(strategy) {} + + Format format; + + /** + * windowSize is the base two logarithm of the window size (the size of the + * history buffer). It should be in the range 9..15. Larger values of this + * parameter result in better compression at the expense of memory usage. + * + * The default value is 15. + * + * NB: when inflating/uncompressing data, the windowSize must be greater than + * or equal to the size used when deflating/compressing. + */ + int windowSize; + + /** + * "The memLevel parameter specifies how much memory should be allocated for + * the internal compression state. memLevel=1 uses minimum memory but is slow + * and reduces compression ratio; memLevel=9 uses maximum memory for optimal + * speed. The default value is 8." + */ + int memLevel; + + /** + * The strategy parameter is used to tune the compression algorithm. + * Supported values: + * - Z_DEFAULT_STRATEGY: normal data + * - Z_FILTERED: data produced by a filter (or predictor) + * - Z_HUFFMAN_ONLY: force Huffman encoding only (no string match) + * - Z_RLE: limit match distances to one + * - Z_FIXED: prevents the use of dynamic Huffman codes + * + * The strategy parameter only affects the compression ratio but not the + * correctness of the compressed output. + */ + int strategy; +}; + +/** + * Get the default options for gzip compression. + * A codec created with these options will have type CodecType::GZIP. + */ +Options defaultGzipOptions(); + +/** + * Get the default options for zlib compression. + * A codec created with these options will have type CodecType::ZLIB. + */ +Options defaultZlibOptions(); + +/** + * Get a codec with the given options and compression level. + * + * If the windowSize is 15 and the format is Format::ZLIB or Format::GZIP, then + * the type of the codec will be CodecType::ZLIB or CodecType::GZIP + * respectively. Otherwise, the type will be CodecType::USER_DEFINED. + * + * Automatic uncompression is not supported with USER_DEFINED codecs. + * + * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6 + */ +std::unique_ptr getCodec( + Options options = Options(), + int level = COMPRESSION_LEVEL_DEFAULT); +std::unique_ptr getStreamCodec( + Options options = Options(), + int level = COMPRESSION_LEVEL_DEFAULT); + +} // namespace zlib +} // namespace io +} // namespace folly + +#endif // FOLLY_HAVE_LIBZ diff --git a/folly/compression/test/CompressionTest.cpp b/folly/compression/test/CompressionTest.cpp new file mode 100644 index 00000000..9c846048 --- /dev/null +++ b/folly/compression/test/CompressionTest.cpp @@ -0,0 +1,1502 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#if FOLLY_HAVE_LIBZSTD +#include +#endif + +#if FOLLY_HAVE_LIBZ +#include +#endif + +namespace zlib = folly::io::zlib; + +namespace folly { +namespace io { +namespace test { + +class DataHolder : private boost::noncopyable { + public: + uint64_t hash(size_t size) const; + ByteRange data(size_t size) const; + + protected: + explicit DataHolder(size_t sizeLog2); + const size_t size_; + std::unique_ptr data_; + mutable std::unordered_map hashCache_; +}; + +DataHolder::DataHolder(size_t sizeLog2) + : size_(size_t(1) << sizeLog2), + data_(new uint8_t[size_]) { +} + +uint64_t DataHolder::hash(size_t size) const { + CHECK_LE(size, size_); + auto p = hashCache_.find(size); + if (p != hashCache_.end()) { + return p->second; + } + + uint64_t h = folly::hash::fnv64_buf(data_.get(), size); + hashCache_[size] = h; + return h; +} + +ByteRange DataHolder::data(size_t size) const { + CHECK_LE(size, size_); + return ByteRange(data_.get(), size); +} + +uint64_t hashIOBuf(const IOBuf* buf) { + uint64_t h = folly::hash::FNV_64_HASH_START; + for (auto& range : *buf) { + h = folly::hash::fnv64_buf(range.data(), range.size(), h); + } + return h; +} + +class RandomDataHolder : public DataHolder { + public: + explicit RandomDataHolder(size_t sizeLog2); +}; + +RandomDataHolder::RandomDataHolder(size_t sizeLog2) + : DataHolder(sizeLog2) { + static constexpr size_t numThreadsLog2 = 3; + static constexpr size_t numThreads = size_t(1) << numThreadsLog2; + + uint32_t seed = randomNumberSeed(); + + std::vector threads; + threads.reserve(numThreads); + for (size_t t = 0; t < numThreads; ++t) { + threads.emplace_back([this, seed, t, sizeLog2] { + std::mt19937 rng(seed + t); + size_t countLog2 = sizeLog2 - numThreadsLog2; + size_t start = size_t(t) << countLog2; + for (size_t i = 0; i < countLog2; ++i) { + this->data_[start + i] = rng(); + } + }); + } + + for (auto& t : threads) { + t.join(); + } +} + +class ConstantDataHolder : public DataHolder { + public: + explicit ConstantDataHolder(size_t sizeLog2); +}; + +ConstantDataHolder::ConstantDataHolder(size_t sizeLog2) + : DataHolder(sizeLog2) { + memset(data_.get(), 'a', size_); +} + +constexpr size_t dataSizeLog2 = 27; // 128MiB +RandomDataHolder randomDataHolder(dataSizeLog2); +ConstantDataHolder constantDataHolder(dataSizeLog2); + +// The intersection of the provided codecs & those that are compiled in. +static std::vector supportedCodecs(std::vector const& v) { + std::vector supported; + + std::copy_if( + std::begin(v), + std::end(v), + std::back_inserter(supported), + hasCodec); + + return supported; +} + +// All compiled-in compression codecs. +static std::vector availableCodecs() { + std::vector codecs; + + for (size_t i = 0; i < static_cast(CodecType::NUM_CODEC_TYPES); ++i) { + auto type = static_cast(i); + if (hasCodec(type)) { + codecs.push_back(type); + } + } + + return codecs; +} + +static std::vector availableStreamCodecs() { + std::vector codecs; + + for (size_t i = 0; i < static_cast(CodecType::NUM_CODEC_TYPES); ++i) { + auto type = static_cast(i); + if (hasStreamCodec(type)) { + codecs.push_back(type); + } + } + + return codecs; +} + +TEST(CompressionTestNeedsUncompressedLength, Simple) { + static const struct { + CodecType type; + bool needsUncompressedLength; + } expectations[] = { + {CodecType::NO_COMPRESSION, false}, + {CodecType::LZ4, true}, + {CodecType::SNAPPY, false}, + {CodecType::ZLIB, false}, + {CodecType::LZ4_VARINT_SIZE, false}, + {CodecType::LZMA2, false}, + {CodecType::LZMA2_VARINT_SIZE, false}, + {CodecType::ZSTD, false}, + {CodecType::GZIP, false}, + {CodecType::LZ4_FRAME, false}, + {CodecType::BZIP2, false}, + }; + + for (auto const& test : expectations) { + if (hasCodec(test.type)) { + EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(), + test.needsUncompressedLength); + } + } +} + +class CompressionTest + : public testing::TestWithParam> { + protected: + void SetUp() override { + auto tup = GetParam(); + int lengthLog = std::tr1::get<0>(tup); + // Small hack to test empty data + uncompressedLength_ = + (lengthLog < 0) ? 0 : uint64_t(1) << std::tr1::get<0>(tup); + chunks_ = std::tr1::get<1>(tup); + codec_ = getCodec(std::tr1::get<2>(tup)); + } + + void runSimpleIOBufTest(const DataHolder& dh); + + void runSimpleStringTest(const DataHolder& dh); + + private: + std::unique_ptr split(std::unique_ptr data) const; + + uint64_t uncompressedLength_; + size_t chunks_; + std::unique_ptr codec_; +}; + +void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) { + const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_))); + const auto compressed = split(codec_->compress(original.get())); + EXPECT_LE( + compressed->computeChainDataLength(), + codec_->maxCompressedLength(uncompressedLength_)); + if (!codec_->needsUncompressedLength()) { + auto uncompressed = codec_->uncompress(compressed.get()); + EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength()); + EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); + } + { + auto uncompressed = codec_->uncompress(compressed.get(), + uncompressedLength_); + EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength()); + EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); + } +} + +void CompressionTest::runSimpleStringTest(const DataHolder& dh) { + const auto original = std::string( + reinterpret_cast(dh.data(uncompressedLength_).data()), + uncompressedLength_); + const auto compressed = codec_->compress(original); + EXPECT_LE( + compressed.length(), codec_->maxCompressedLength(uncompressedLength_)); + + if (!codec_->needsUncompressedLength()) { + auto uncompressed = codec_->uncompress(compressed); + EXPECT_EQ(uncompressedLength_, uncompressed.length()); + EXPECT_EQ(uncompressed, original); + } + { + auto uncompressed = codec_->uncompress(compressed, uncompressedLength_); + EXPECT_EQ(uncompressedLength_, uncompressed.length()); + EXPECT_EQ(uncompressed, original); + } +} + +// Uniformly split data into (potentially empty) chunks. +std::unique_ptr CompressionTest::split( + std::unique_ptr data) const { + if (data->isChained()) { + data->coalesce(); + } + + const size_t size = data->computeChainDataLength(); + + std::multiset splits; + for (size_t i = 1; i < chunks_; ++i) { + splits.insert(Random::rand64(size)); + } + + folly::IOBufQueue result; + + size_t offset = 0; + for (size_t split : splits) { + result.append(IOBuf::copyBuffer(data->data() + offset, split - offset)); + offset = split; + } + result.append(IOBuf::copyBuffer(data->data() + offset, size - offset)); + + return result.move(); +} + +TEST_P(CompressionTest, RandomData) { + runSimpleIOBufTest(randomDataHolder); +} + +TEST_P(CompressionTest, ConstantData) { + runSimpleIOBufTest(constantDataHolder); +} + +TEST_P(CompressionTest, RandomDataString) { + runSimpleStringTest(randomDataHolder); +} + +TEST_P(CompressionTest, ConstantDataString) { + runSimpleStringTest(constantDataHolder); +} + +INSTANTIATE_TEST_CASE_P( + CompressionTest, + CompressionTest, + testing::Combine( + testing::Values(-1, 0, 1, 12, 22, 25, 27), + testing::Values(1, 2, 3, 8, 65), + testing::ValuesIn(availableCodecs()))); + +class CompressionVarintTest + : public testing::TestWithParam> { + protected: + void SetUp() override { + auto tup = GetParam(); + uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup); + codec_ = getCodec(std::tr1::get<1>(tup)); + } + + void runSimpleTest(const DataHolder& dh); + + uint64_t uncompressedLength_; + std::unique_ptr codec_; +}; + +inline uint64_t oneBasedMsbPos(uint64_t number) { + uint64_t pos = 0; + for (; number > 0; ++pos, number >>= 1) { + } + return pos; +} + +void CompressionVarintTest::runSimpleTest(const DataHolder& dh) { + auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_)); + auto compressed = codec_->compress(original.get()); + auto breakPoint = + 1UL + + Random::rand64( + std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL); + auto tinyBuf = IOBuf::copyBuffer(compressed->data(), + std::min(compressed->length(), breakPoint)); + compressed->trimStart(breakPoint); + tinyBuf->prependChain(std::move(compressed)); + compressed = std::move(tinyBuf); + + auto uncompressed = codec_->uncompress(compressed.get()); + + EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength()); + EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); +} + +TEST_P(CompressionVarintTest, RandomData) { + runSimpleTest(randomDataHolder); +} + +TEST_P(CompressionVarintTest, ConstantData) { + runSimpleTest(constantDataHolder); +} + +INSTANTIATE_TEST_CASE_P( + CompressionVarintTest, + CompressionVarintTest, + testing::Combine( + testing::Values(0, 1, 12, 22, 25, 27), + testing::ValuesIn(supportedCodecs({ + CodecType::LZ4_VARINT_SIZE, + CodecType::LZMA2_VARINT_SIZE, + })))); + +TEST(LZMATest, UncompressBadVarint) { + if (hasStreamCodec(CodecType::LZMA2_VARINT_SIZE)) { + std::string const str(kMaxVarintLength64 * 2, '\xff'); + ByteRange input((folly::StringPiece(str))); + auto codec = getStreamCodec(CodecType::LZMA2_VARINT_SIZE); + auto buffer = IOBuf::create(16); + buffer->append(buffer->capacity()); + MutableByteRange output{buffer->writableData(), buffer->length()}; + EXPECT_THROW(codec->uncompressStream(input, output), std::runtime_error); + } +} + +class CompressionCorruptionTest : public testing::TestWithParam { + protected: + void SetUp() override { codec_ = getCodec(GetParam()); } + + void runSimpleTest(const DataHolder& dh); + + std::unique_ptr codec_; +}; + +void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) { + constexpr uint64_t uncompressedLength = 42; + auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength)); + auto compressed = codec_->compress(original.get()); + + if (!codec_->needsUncompressedLength()) { + auto uncompressed = codec_->uncompress(compressed.get()); + EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); + EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get())); + } + { + auto uncompressed = codec_->uncompress(compressed.get(), + uncompressedLength); + EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); + EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get())); + } + + EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1), + std::runtime_error); + + auto corrupted = compressed->clone(); + corrupted->unshare(); + // Truncate the last character + corrupted->prev()->trimEnd(1); + if (!codec_->needsUncompressedLength()) { + EXPECT_THROW(codec_->uncompress(corrupted.get()), + std::runtime_error); + } + + EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength), + std::runtime_error); + + corrupted = compressed->clone(); + corrupted->unshare(); + // Corrupt the first character + ++(corrupted->writableData()[0]); + + if (!codec_->needsUncompressedLength()) { + EXPECT_THROW(codec_->uncompress(corrupted.get()), + std::runtime_error); + } + + EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength), + std::runtime_error); +} + +TEST_P(CompressionCorruptionTest, RandomData) { + runSimpleTest(randomDataHolder); +} + +TEST_P(CompressionCorruptionTest, ConstantData) { + runSimpleTest(constantDataHolder); +} + +INSTANTIATE_TEST_CASE_P( + CompressionCorruptionTest, + CompressionCorruptionTest, + testing::ValuesIn( + // NO_COMPRESSION can't detect corruption + // LZ4 can't detect corruption reliably (sigh) + supportedCodecs({ + CodecType::SNAPPY, + CodecType::ZLIB, + CodecType::LZMA2, + CodecType::ZSTD, + CodecType::LZ4_FRAME, + CodecType::BZIP2, + }))); + +class StreamingUnitTest : public testing::TestWithParam { + protected: + void SetUp() override { + codec_ = getStreamCodec(GetParam()); + } + + std::unique_ptr codec_; +}; + +TEST(StreamingUnitTest, needsDataLength) { + static const struct { + CodecType type; + bool needsDataLength; + } expectations[] = { + {CodecType::ZLIB, false}, + {CodecType::GZIP, false}, + {CodecType::LZMA2, false}, + {CodecType::LZMA2_VARINT_SIZE, true}, + {CodecType::ZSTD, false}, + }; + + for (auto const& test : expectations) { + if (hasStreamCodec(test.type)) { + EXPECT_EQ( + getStreamCodec(test.type)->needsDataLength(), test.needsDataLength); + } + } +} + +TEST_P(StreamingUnitTest, maxCompressedLength) { + for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) { + EXPECT_GE(codec_->maxCompressedLength(length), length); + } +} + +TEST_P(StreamingUnitTest, getUncompressedLength) { + auto const empty = IOBuf::create(0); + EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get())); + EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0)); + EXPECT_ANY_THROW(codec_->getUncompressedLength(empty.get(), 1)); + + auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100)); + auto const compressed = codec_->compress(data.get()); + + if (auto const length = codec_->getUncompressedLength(data.get())) { + EXPECT_EQ(100, *length); + } + EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100)); + // If the uncompressed length is stored in the frame, then make sure it throws + // when it is given the wrong length. + if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) { + EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200)); + } +} + +TEST_P(StreamingUnitTest, emptyData) { + ByteRange input{}; + auto buffer = IOBuf::create(codec_->maxCompressedLength(0)); + buffer->append(buffer->capacity()); + MutableByteRange output; + + // Test compressing empty data in one pass + if (!codec_->needsDataLength()) { + output = {buffer->writableData(), buffer->length()}; + EXPECT_TRUE( + codec_->compressStream(input, output, StreamCodec::FlushOp::END)); + } + codec_->resetStream(0); + output = {buffer->writableData(), buffer->length()}; + EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END)); + + // Test uncompressing the compressed empty data is equivalent to the empty + // string + { + size_t compressedSize = buffer->length() - output.size(); + auto const compressed = + IOBuf::copyBuffer(buffer->writableData(), compressedSize); + auto inputRange = compressed->coalesce(); + codec_->resetStream(0); + output = {buffer->writableData(), buffer->length()}; + EXPECT_TRUE(codec_->uncompressStream( + inputRange, output, StreamCodec::FlushOp::END)); + EXPECT_EQ(output.size(), buffer->length()); + } + + // Test compressing empty data with multiple calls to compressStream() + { + auto largeBuffer = IOBuf::create(codec_->maxCompressedLength(0) * 2); + largeBuffer->append(largeBuffer->capacity()); + codec_->resetStream(0); + output = {largeBuffer->writableData(), largeBuffer->length()}; + EXPECT_FALSE(codec_->compressStream(input, output)); + EXPECT_TRUE( + codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH)); + EXPECT_TRUE( + codec_->compressStream(input, output, StreamCodec::FlushOp::END)); + } + + // Test uncompressing empty data + output = {}; + codec_->resetStream(); + EXPECT_TRUE(codec_->uncompressStream(input, output)); + codec_->resetStream(); + EXPECT_TRUE( + codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH)); + codec_->resetStream(); + EXPECT_TRUE( + codec_->uncompressStream(input, output, StreamCodec::FlushOp::END)); + codec_->resetStream(0); + EXPECT_TRUE(codec_->uncompressStream(input, output)); + codec_->resetStream(0); + EXPECT_TRUE( + codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH)); + codec_->resetStream(0); + EXPECT_TRUE( + codec_->uncompressStream(input, output, StreamCodec::FlushOp::END)); +} + +TEST_P(StreamingUnitTest, noForwardProgress) { + auto inBuffer = IOBuf::create(2); + inBuffer->writableData()[0] = 'a'; + inBuffer->writableData()[1] = 'a'; + inBuffer->append(2); + const auto compressed = codec_->compress(inBuffer.get()); + auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2)); + + ByteRange emptyInput; + MutableByteRange emptyOutput; + + const std::array flushOps = {{ + StreamCodec::FlushOp::NONE, + StreamCodec::FlushOp::FLUSH, + StreamCodec::FlushOp::END, + }}; + + // No progress is not okay twice in a row for all flush operations when + // compressing + for (const auto flushOp : flushOps) { + if (codec_->needsDataLength()) { + codec_->resetStream(inBuffer->computeChainDataLength()); + } else { + codec_->resetStream(); + } + auto input = inBuffer->coalesce(); + MutableByteRange output = {outBuffer->writableTail(), + outBuffer->tailroom()}; + // Compress some data to avoid empty data special casing + while (!input.empty()) { + codec_->compressStream(input, output); + } + EXPECT_FALSE(codec_->compressStream(emptyInput, emptyOutput, flushOp)); + EXPECT_THROW( + codec_->compressStream(emptyInput, emptyOutput, flushOp), + std::runtime_error); + } + + // No progress is not okay twice in a row for all flush operations when + // uncompressing + for (const auto flushOp : flushOps) { + codec_->resetStream(); + auto input = compressed->coalesce(); + // Remove the last byte so the operation is incomplete + input.uncheckedSubtract(1); + MutableByteRange output = {inBuffer->writableData(), inBuffer->length()}; + // Uncompress some data to avoid empty data special casing + while (!input.empty()) { + EXPECT_FALSE(codec_->uncompressStream(input, output)); + } + EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput, flushOp)); + EXPECT_THROW( + codec_->uncompressStream(emptyInput, emptyOutput, flushOp), + std::runtime_error); + } +} + +TEST_P(StreamingUnitTest, stateTransitions) { + auto inBuffer = IOBuf::create(2); + inBuffer->writableData()[0] = 'a'; + inBuffer->writableData()[1] = 'a'; + inBuffer->append(2); + auto compressed = codec_->compress(inBuffer.get()); + ByteRange const in = compressed->coalesce(); + auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size())); + MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()}; + + auto compress = [&]( + StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE, + bool empty = false) { + auto input = in; + auto output = empty ? MutableByteRange{} : out; + return codec_->compressStream(input, output, flushOp); + }; + auto compress_all = [&](bool expect, + StreamCodec::FlushOp flushOp = + StreamCodec::FlushOp::NONE, + bool empty = false) { + auto input = in; + auto output = empty ? MutableByteRange{} : out; + while (!input.empty()) { + if (expect) { + EXPECT_TRUE(codec_->compressStream(input, output, flushOp)); + } else { + EXPECT_FALSE(codec_->compressStream(input, output, flushOp)); + } + } + }; + auto uncompress = [&]( + StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE, + bool empty = false) { + auto input = in; + auto output = empty ? MutableByteRange{} : out; + return codec_->uncompressStream(input, output, flushOp); + }; + + // compression flow + if (!codec_->needsDataLength()) { + codec_->resetStream(); + EXPECT_FALSE(compress()); + EXPECT_FALSE(compress()); + EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH)); + EXPECT_FALSE(compress()); + EXPECT_TRUE(compress(StreamCodec::FlushOp::END)); + } + codec_->resetStream(in.size() * 5); + compress_all(false); + compress_all(false); + compress_all(true, StreamCodec::FlushOp::FLUSH); + compress_all(false); + compress_all(true, StreamCodec::FlushOp::END); + + // uncompression flow + codec_->resetStream(); + EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true)); + codec_->resetStream(); + EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true)); + codec_->resetStream(); + EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true)); + codec_->resetStream(); + EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true)); + codec_->resetStream(); + EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH)); + // compress -> uncompress + codec_->resetStream(in.size()); + EXPECT_FALSE(compress()); + EXPECT_THROW(uncompress(), std::logic_error); + // uncompress -> compress + codec_->resetStream(inBuffer->computeChainDataLength()); + EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH)); + EXPECT_THROW(compress(), std::logic_error); + // end -> compress + if (!codec_->needsDataLength()) { + codec_->resetStream(); + EXPECT_FALSE(compress()); + EXPECT_TRUE(compress(StreamCodec::FlushOp::END)); + EXPECT_THROW(compress(), std::logic_error); + } + codec_->resetStream(in.size() * 2); + compress_all(false); + compress_all(true, StreamCodec::FlushOp::END); + EXPECT_THROW(compress(), std::logic_error); + // end -> uncompress + codec_->resetStream(); + EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH)); + EXPECT_THROW(uncompress(), std::logic_error); + // flush -> compress + codec_->resetStream(in.size()); + EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true)); + EXPECT_THROW(compress(), std::logic_error); + // flush -> end + codec_->resetStream(in.size()); + EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true)); + EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error); + // undefined -> compress + codec_->compress(inBuffer.get()); + EXPECT_THROW(compress(), std::logic_error); + codec_->uncompress(compressed.get(), inBuffer->computeChainDataLength()); + EXPECT_THROW(compress(), std::logic_error); + // undefined -> undefined + codec_->uncompress(compressed.get()); + codec_->compress(inBuffer.get()); +} + +INSTANTIATE_TEST_CASE_P( + StreamingUnitTest, + StreamingUnitTest, + testing::ValuesIn(availableStreamCodecs())); + +class StreamingCompressionTest + : public testing::TestWithParam> { + protected: + void SetUp() override { + auto const tup = GetParam(); + uncompressedLength_ = uint64_t(1) << std::get<0>(tup); + chunkSize_ = size_t(1) << std::get<1>(tup); + codec_ = getStreamCodec(std::get<2>(tup)); + } + + void runResetStreamTest(DataHolder const& dh); + void runCompressStreamTest(DataHolder const& dh); + void runUncompressStreamTest(DataHolder const& dh); + void runFlushTest(DataHolder const& dh); + + private: + std::vector split(ByteRange data) const; + + uint64_t uncompressedLength_; + size_t chunkSize_; + std::unique_ptr codec_; +}; + +std::vector StreamingCompressionTest::split(ByteRange data) const { + size_t const pieces = std::max(1, data.size() / chunkSize_); + std::vector result; + result.reserve(pieces + 1); + while (!data.empty()) { + size_t const pieceSize = std::min(data.size(), chunkSize_); + result.push_back(data.subpiece(0, pieceSize)); + data.uncheckedAdvance(pieceSize); + } + return result; +} + +static std::unique_ptr compressSome( + StreamCodec* codec, + ByteRange data, + uint64_t bufferSize, + StreamCodec::FlushOp flush) { + bool result; + IOBufQueue queue; + do { + auto buffer = IOBuf::create(bufferSize); + buffer->append(buffer->capacity()); + MutableByteRange output{buffer->writableData(), buffer->length()}; + + result = codec->compressStream(data, output, flush); + buffer->trimEnd(output.size()); + queue.append(std::move(buffer)); + + } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result); + EXPECT_TRUE(data.empty()); + return queue.move(); +} + +static std::pair> uncompressSome( + StreamCodec* codec, + ByteRange& data, + uint64_t bufferSize, + StreamCodec::FlushOp flush) { + bool result; + IOBufQueue queue; + do { + auto buffer = IOBuf::create(bufferSize); + buffer->append(buffer->capacity()); + MutableByteRange output{buffer->writableData(), buffer->length()}; + + result = codec->uncompressStream(data, output, flush); + buffer->trimEnd(output.size()); + queue.append(std::move(buffer)); + + } while (queue.tailroom() == 0 && !result); + return std::make_pair(result, queue.move()); +} + +void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) { + auto const input = dh.data(uncompressedLength_); + // Compress some but leave state unclean + codec_->resetStream(uncompressedLength_); + compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE); + // Reset stream and compress all + if (codec_->needsDataLength()) { + codec_->resetStream(uncompressedLength_); + } else { + codec_->resetStream(); + } + auto compressed = + compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END); + auto const uncompressed = codec_->uncompress(compressed.get(), input.size()); + EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); +} + +TEST_P(StreamingCompressionTest, resetStream) { + runResetStreamTest(constantDataHolder); + runResetStreamTest(randomDataHolder); +} + +void StreamingCompressionTest::runCompressStreamTest( + const folly::io::test::DataHolder& dh) { + auto const inputs = split(dh.data(uncompressedLength_)); + + IOBufQueue queue; + codec_->resetStream(uncompressedLength_); + // Compress many inputs in a row + for (auto const input : inputs) { + queue.append(compressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE)); + } + // Finish the operation with empty input. + ByteRange empty; + queue.append( + compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END)); + + auto const uncompressed = codec_->uncompress(queue.front()); + EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); +} + +TEST_P(StreamingCompressionTest, compressStream) { + runCompressStreamTest(constantDataHolder); + runCompressStreamTest(randomDataHolder); +} + +void StreamingCompressionTest::runUncompressStreamTest( + const folly::io::test::DataHolder& dh) { + auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_)); + // Concatenate 3 compressed frames in a row + auto compressed = codec_->compress(data.get()); + compressed->prependChain(codec_->compress(data.get())); + compressed->prependChain(codec_->compress(data.get())); + // Pass all 3 compressed frames in one input buffer + auto input = compressed->coalesce(); + // Uncompress the first frame + codec_->resetStream(data->computeChainDataLength()); + { + auto const result = uncompressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH); + ASSERT_TRUE(result.first); + ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); + } + // Uncompress the second frame + codec_->resetStream(); + { + auto const result = uncompressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END); + ASSERT_TRUE(result.first); + ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); + } + // Uncompress the third frame + codec_->resetStream(); + { + auto const result = uncompressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH); + ASSERT_TRUE(result.first); + ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); + } + EXPECT_TRUE(input.empty()); +} + +TEST_P(StreamingCompressionTest, uncompressStream) { + runUncompressStreamTest(constantDataHolder); + runUncompressStreamTest(randomDataHolder); +} + +void StreamingCompressionTest::runFlushTest(DataHolder const& dh) { + auto const inputs = split(dh.data(uncompressedLength_)); + auto uncodec = getStreamCodec(codec_->type()); + + if (codec_->needsDataLength()) { + codec_->resetStream(uncompressedLength_); + } else { + codec_->resetStream(); + } + for (auto input : inputs) { + // Compress some data and flush the stream + auto compressed = compressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH); + auto compressedRange = compressed->coalesce(); + // Uncompress the compressed data + auto result = uncompressSome( + uncodec.get(), + compressedRange, + chunkSize_, + StreamCodec::FlushOp::FLUSH); + // All compressed data should have been consumed + EXPECT_TRUE(compressedRange.empty()); + // The frame isn't complete + EXPECT_FALSE(result.first); + // The uncompressed data should be exactly the input data + EXPECT_EQ(input.size(), result.second->computeChainDataLength()); + auto const data = IOBuf::wrapBuffer(input); + EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); + } +} + +TEST_P(StreamingCompressionTest, testFlush) { + runFlushTest(constantDataHolder); + runFlushTest(randomDataHolder); +} + +INSTANTIATE_TEST_CASE_P( + StreamingCompressionTest, + StreamingCompressionTest, + testing::Combine( + testing::Values(0, 1, 12, 22, 27), + testing::Values(12, 17, 20), + testing::ValuesIn(availableStreamCodecs()))); + +namespace { + +// Codec types included in the codec returned by getAutoUncompressionCodec() by +// default. +std::vector autoUncompressionCodecTypes = {{ + CodecType::LZ4_FRAME, + CodecType::ZSTD, + CodecType::ZLIB, + CodecType::GZIP, + CodecType::LZMA2, + CodecType::BZIP2, +}}; + +} // namespace + +class AutomaticCodecTest : public testing::TestWithParam { + protected: + void SetUp() override { + codecType_ = GetParam(); + codec_ = getCodec(codecType_); + autoType_ = std::any_of( + autoUncompressionCodecTypes.begin(), + autoUncompressionCodecTypes.end(), + [&](CodecType o) { return codecType_ == o; }); + // Add the codec with type codecType_ as the terminal codec if it is not in + // autoUncompressionCodecTypes. + auto_ = getAutoUncompressionCodec({}, getTerminalCodec()); + } + + void runSimpleTest(const DataHolder& dh); + + std::unique_ptr getTerminalCodec() { + return (autoType_ ? nullptr : getCodec(codecType_)); + } + + std::unique_ptr codec_; + std::unique_ptr auto_; + CodecType codecType_; + // true if codecType_ is in autoUncompressionCodecTypes + bool autoType_; +}; + +void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) { + constexpr uint64_t uncompressedLength = 1000; + auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength)); + auto compressed = codec_->compress(original.get()); + + if (!codec_->needsUncompressedLength()) { + auto uncompressed = auto_->uncompress(compressed.get()); + EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); + EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get())); + } + { + auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength); + EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); + EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get())); + } + ASSERT_GE(compressed->computeChainDataLength(), 8); + for (size_t i = 0; i < 8; ++i) { + auto split = compressed->clone(); + auto rest = compressed->clone(); + split->trimEnd(split->length() - i); + rest->trimStart(i); + split->appendChain(std::move(rest)); + auto uncompressed = auto_->uncompress(split.get(), uncompressedLength); + EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); + EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get())); + } +} + +TEST_P(AutomaticCodecTest, RandomData) { + runSimpleTest(randomDataHolder); +} + +TEST_P(AutomaticCodecTest, ConstantData) { + runSimpleTest(constantDataHolder); +} + +TEST_P(AutomaticCodecTest, ValidPrefixes) { + const auto prefixes = codec_->validPrefixes(); + for (const auto& prefix : prefixes) { + EXPECT_FALSE(prefix.empty()); + // Ensure that all strings are at least 8 bytes for LZMA2. + // The bytes after the prefix should be ignored by `canUncompress()`. + IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8}; + data.append(8); + EXPECT_TRUE(codec_->canUncompress(&data)); + EXPECT_TRUE(auto_->canUncompress(&data)); + } +} + +TEST_P(AutomaticCodecTest, NeedsUncompressedLength) { + if (codec_->needsUncompressedLength()) { + EXPECT_TRUE(auto_->needsUncompressedLength()); + } +} + +TEST_P(AutomaticCodecTest, maxUncompressedLength) { + EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength()); +} + +TEST_P(AutomaticCodecTest, DefaultCodec) { + const uint64_t length = 42; + std::vector> codecs; + codecs.push_back(getCodec(CodecType::ZSTD)); + auto automatic = + getAutoUncompressionCodec(std::move(codecs), getTerminalCodec()); + auto original = IOBuf::wrapBuffer(constantDataHolder.data(length)); + auto compressed = codec_->compress(original.get()); + std::unique_ptr decompressed; + + if (automatic->needsUncompressedLength()) { + decompressed = automatic->uncompress(compressed.get(), length); + } else { + decompressed = automatic->uncompress(compressed.get()); + } + + EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get())); +} + +namespace { +class CustomCodec : public Codec { + public: + static std::unique_ptr create(std::string prefix, CodecType type) { + return std::make_unique(std::move(prefix), type); + } + explicit CustomCodec(std::string prefix, CodecType type) + : Codec(CodecType::USER_DEFINED), + prefix_(std::move(prefix)), + codec_(getCodec(type)) {} + + private: + std::vector validPrefixes() const override { + return {prefix_}; + } + + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override { + return codec_->maxCompressedLength(uncompressedLength) + prefix_.size(); + } + + bool canUncompress(const IOBuf* data, Optional) const override { + auto clone = data->cloneCoalescedAsValue(); + if (clone.length() < prefix_.size()) { + return false; + } + return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0; + } + + std::unique_ptr doCompress(const IOBuf* data) override { + auto result = IOBuf::copyBuffer(prefix_); + result->appendChain(codec_->compress(data)); + EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength())); + return result; + } + + std::unique_ptr doUncompress( + const IOBuf* data, + Optional uncompressedLength) override { + EXPECT_TRUE(canUncompress(data, uncompressedLength)); + auto clone = data->cloneCoalescedAsValue(); + clone.trimStart(prefix_.size()); + return codec_->uncompress(&clone, uncompressedLength); + } + + std::string prefix_; + std::unique_ptr codec_; +}; +} + +TEST_P(AutomaticCodecTest, CustomCodec) { + const uint64_t length = 42; + auto ab = CustomCodec::create("ab", CodecType::ZSTD); + std::vector> codecs; + codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD)); + auto automatic = + getAutoUncompressionCodec(std::move(codecs), getTerminalCodec()); + auto original = IOBuf::wrapBuffer(constantDataHolder.data(length)); + + auto abCompressed = ab->compress(original.get()); + std::unique_ptr abDecompressed; + if (automatic->needsUncompressedLength()) { + abDecompressed = automatic->uncompress(abCompressed.get(), length); + } else { + abDecompressed = automatic->uncompress(abCompressed.get()); + } + EXPECT_TRUE(automatic->canUncompress(abCompressed.get())); + EXPECT_FALSE(auto_->canUncompress(abCompressed.get())); + EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get())); + + auto compressed = codec_->compress(original.get()); + std::unique_ptr decompressed; + if (automatic->needsUncompressedLength()) { + decompressed = automatic->uncompress(compressed.get(), length); + } else { + decompressed = automatic->uncompress(compressed.get()); + } + EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get())); +} + +TEST_P(AutomaticCodecTest, CustomDefaultCodec) { + const uint64_t length = 42; + auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION); + std::vector> codecs; + codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION)); + codecs.push_back(getCodec(CodecType::LZ4_FRAME)); + auto automatic = + getAutoUncompressionCodec(std::move(codecs), getTerminalCodec()); + auto original = IOBuf::wrapBuffer(constantDataHolder.data(length)); + + auto noneCompressed = none->compress(original.get()); + std::unique_ptr noneDecompressed; + if (automatic->needsUncompressedLength()) { + noneDecompressed = automatic->uncompress(noneCompressed.get(), length); + } else { + noneDecompressed = automatic->uncompress(noneCompressed.get()); + } + EXPECT_TRUE(automatic->canUncompress(noneCompressed.get())); + EXPECT_FALSE(auto_->canUncompress(noneCompressed.get())); + EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get())); + + auto compressed = codec_->compress(original.get()); + std::unique_ptr decompressed; + if (automatic->needsUncompressedLength()) { + decompressed = automatic->uncompress(compressed.get(), length); + } else { + decompressed = automatic->uncompress(compressed.get()); + } + EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get())); +} + +TEST_P(AutomaticCodecTest, canUncompressOneBytes) { + // No default codec can uncompress 1 bytes. + IOBuf buf{IOBuf::CREATE, 1}; + buf.append(1); + EXPECT_FALSE(codec_->canUncompress(&buf, 1)); + EXPECT_FALSE(codec_->canUncompress(&buf, folly::none)); + EXPECT_FALSE(auto_->canUncompress(&buf, 1)); + EXPECT_FALSE(auto_->canUncompress(&buf, folly::none)); +} + +INSTANTIATE_TEST_CASE_P( + AutomaticCodecTest, + AutomaticCodecTest, + testing::ValuesIn(availableCodecs())); + +namespace { + +// Codec that always "uncompresses" to the same string. +class ConstantCodec : public Codec { + public: + static std::unique_ptr create( + std::string uncompressed, + CodecType type) { + return std::make_unique(std::move(uncompressed), type); + } + explicit ConstantCodec(std::string uncompressed, CodecType type) + : Codec(type), uncompressed_(std::move(uncompressed)) {} + + private: + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override { + return uncompressedLength; + } + + std::unique_ptr doCompress(const IOBuf*) override { + throw std::runtime_error("ConstantCodec error: compress() not supported."); + } + + std::unique_ptr doUncompress(const IOBuf*, Optional) + override { + return IOBuf::copyBuffer(uncompressed_); + } + + std::string uncompressed_; + std::unique_ptr codec_; +}; + +} // namespace + +class TerminalCodecTest : public testing::TestWithParam { + protected: + void SetUp() override { + codecType_ = GetParam(); + codec_ = getCodec(codecType_); + auto_ = getAutoUncompressionCodec(); + } + + CodecType codecType_; + std::unique_ptr codec_; + std::unique_ptr auto_; +}; + +// Test that the terminal codec's uncompress() function is called when the +// default chosen automatic codec throws. +TEST_P(TerminalCodecTest, uncompressIfDefaultThrows) { + std::string const original = "abc"; + auto const compressed = codec_->compress(original); + + // Sanity check: the automatic codec can uncompress the original string. + auto const uncompressed = auto_->uncompress(compressed); + EXPECT_EQ(uncompressed, original); + + // Truncate the compressed string. + auto const truncated = compressed.substr(0, compressed.size() - 1); + auto const truncatedBuf = + IOBuf::wrapBuffer(truncated.data(), truncated.size()); + EXPECT_TRUE(auto_->canUncompress(truncatedBuf.get())); + EXPECT_ANY_THROW(auto_->uncompress(truncated)); + + // Expect the terminal codec to successfully uncompress the string. + std::unique_ptr terminal = getAutoUncompressionCodec( + {}, ConstantCodec::create("dummyString", CodecType::USER_DEFINED)); + EXPECT_TRUE(terminal->canUncompress(truncatedBuf.get())); + EXPECT_EQ(terminal->uncompress(truncated), "dummyString"); +} + +// If the terminal codec has one of the "default types" automatically added in +// the AutomaticCodec, check that the default codec is no longer added. +TEST_P(TerminalCodecTest, terminalOverridesDefaults) { + std::unique_ptr terminal = getAutoUncompressionCodec( + {}, ConstantCodec::create("dummyString", codecType_)); + std::string const original = "abc"; + auto const compressed = codec_->compress(original); + EXPECT_EQ(terminal->uncompress(compressed), "dummyString"); +} + +INSTANTIATE_TEST_CASE_P( + TerminalCodecTest, + TerminalCodecTest, + testing::ValuesIn(autoUncompressionCodecTypes)); + +TEST(ValidPrefixesTest, CustomCodec) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION)); + const auto none = getAutoUncompressionCodec(std::move(codecs)); + const auto prefixes = none->validPrefixes(); + const auto it = std::find(prefixes.begin(), prefixes.end(), "none"); + EXPECT_TRUE(it != prefixes.end()); +} + +#define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \ + do { \ + if (kIsDebug) { \ + EXPECT_THROW((statement), expected_exception); \ + } else { \ + EXPECT_NO_THROW((statement)); \ + } \ + } while (false) + +TEST(CheckCompatibleTest, SimplePrefixSecond) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION)); + codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION)); + EXPECT_THROW_IF_DEBUG( + getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); +} + +TEST(CheckCompatibleTest, SimplePrefixFirst) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION)); + codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION)); + EXPECT_THROW_IF_DEBUG( + getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); +} + +TEST(CheckCompatibleTest, Empty) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION)); + EXPECT_THROW_IF_DEBUG( + getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); +} + +TEST(CheckCompatibleTest, ZstdPrefix) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD)); + EXPECT_THROW_IF_DEBUG( + getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); +} + +TEST(CheckCompatibleTest, ZstdDuplicate) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD)); + EXPECT_THROW_IF_DEBUG( + getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); +} + +TEST(CheckCompatibleTest, ZlibIsPrefix) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD)); + EXPECT_THROW_IF_DEBUG( + getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); +} + +#if FOLLY_HAVE_LIBZSTD + +TEST(ZstdTest, BackwardCompatible) { + auto codec = getCodec(CodecType::ZSTD); + { + auto const data = IOBuf::wrapBuffer(randomDataHolder.data(size_t(1) << 20)); + auto compressed = codec->compress(data.get()); + compressed->coalesce(); + EXPECT_EQ( + data->length(), + ZSTD_getDecompressedSize(compressed->data(), compressed->length())); + } + { + auto const data = + IOBuf::wrapBuffer(randomDataHolder.data(size_t(100) << 20)); + auto compressed = codec->compress(data.get()); + compressed->coalesce(); + EXPECT_EQ( + data->length(), + ZSTD_getDecompressedSize(compressed->data(), compressed->length())); + } +} + +#endif + +#if FOLLY_HAVE_LIBZ + +using ZlibFormat = zlib::Options::Format; + +TEST(ZlibTest, Auto) { + size_t const uncompressedLength_ = (size_t)1 << 15; + auto const original = std::string( + reinterpret_cast( + randomDataHolder.data(uncompressedLength_).data()), + uncompressedLength_); + auto optionCodec = zlib::getCodec(zlib::Options(ZlibFormat::AUTO)); + + // Test the codec can uncompress zlib data. + { + auto codec = getCodec(CodecType::ZLIB); + auto const compressed = codec->compress(original); + auto const uncompressed = optionCodec->uncompress(compressed); + EXPECT_EQ(original, uncompressed); + } + + // Test the codec can uncompress gzip data. + { + auto codec = getCodec(CodecType::GZIP); + auto const compressed = codec->compress(original); + auto const uncompressed = optionCodec->uncompress(compressed); + EXPECT_EQ(original, uncompressed); + } +} + +TEST(ZlibTest, DefaultOptions) { + size_t const uncompressedLength_ = (size_t)1 << 20; + auto const original = std::string( + reinterpret_cast( + randomDataHolder.data(uncompressedLength_).data()), + uncompressedLength_); + { + auto codec = getCodec(CodecType::ZLIB); + auto optionCodec = zlib::getCodec(zlib::defaultZlibOptions()); + auto const compressed = optionCodec->compress(original); + auto uncompressed = codec->uncompress(compressed); + EXPECT_EQ(original, uncompressed); + uncompressed = optionCodec->uncompress(compressed); + EXPECT_EQ(original, uncompressed); + } + + { + auto codec = getCodec(CodecType::GZIP); + auto optionCodec = zlib::getCodec(zlib::defaultGzipOptions()); + auto const compressed = optionCodec->compress(original); + auto uncompressed = codec->uncompress(compressed); + EXPECT_EQ(original, uncompressed); + uncompressed = optionCodec->uncompress(compressed); + EXPECT_EQ(original, uncompressed); + } +} + +class ZlibOptionsTest : public testing::TestWithParam< + std::tr1::tuple> { + protected: + void SetUp() override { + auto tup = GetParam(); + options_.format = std::tr1::get<0>(tup); + options_.windowSize = std::tr1::get<1>(tup); + options_.memLevel = std::tr1::get<2>(tup); + options_.strategy = std::tr1::get<3>(tup); + codec_ = zlib::getStreamCodec(options_); + } + + void runSimpleRoundTripTest(const DataHolder& dh); + + private: + zlib::Options options_; + std::unique_ptr codec_; +}; + +void ZlibOptionsTest::runSimpleRoundTripTest(const DataHolder& dh) { + size_t const uncompressedLength = (size_t)1 << 16; + auto const original = std::string( + reinterpret_cast(dh.data(uncompressedLength).data()), + uncompressedLength); + + auto const compressed = codec_->compress(original); + auto const uncompressed = codec_->uncompress(compressed); + EXPECT_EQ(uncompressed, original); +} + +TEST_P(ZlibOptionsTest, simpleRoundTripTest) { + runSimpleRoundTripTest(constantDataHolder); + runSimpleRoundTripTest(randomDataHolder); +} + +INSTANTIATE_TEST_CASE_P( + ZlibOptionsTest, + ZlibOptionsTest, + testing::Combine( + testing::Values( + ZlibFormat::ZLIB, + ZlibFormat::GZIP, + ZlibFormat::RAW, + ZlibFormat::AUTO), + testing::Values(9, 12, 15), + testing::Values(1, 8, 9), + testing::Values( + Z_DEFAULT_STRATEGY, + Z_FILTERED, + Z_HUFFMAN_ONLY, + Z_RLE, + Z_FIXED))); + +#endif // FOLLY_HAVE_LIBZ + +} // namespace test +} // namespace io +} // namespace folly + +int main(int argc, char *argv[]) { + testing::InitGoogleTest(&argc, argv); + gflags::ParseCommandLineFlags(&argc, &argv, true); + + auto ret = RUN_ALL_TESTS(); + if (!ret) { + folly::runBenchmarksOnFlag(); + } + return ret; +} diff --git a/folly/io/Compression.cpp b/folly/io/Compression.cpp deleted file mode 100644 index 0d386525..00000000 --- a/folly/io/Compression.cpp +++ /dev/null @@ -1,2146 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#if FOLLY_HAVE_LIBLZ4 -#include -#include -#if LZ4_VERSION_NUMBER >= 10301 -#include -#endif -#endif - -#include - -#if FOLLY_HAVE_LIBSNAPPY -#include -#include -#endif - -#if FOLLY_HAVE_LIBZ -#include -#endif - -#if FOLLY_HAVE_LIBLZMA -#include -#endif - -#if FOLLY_HAVE_LIBZSTD -#define ZSTD_STATIC_LINKING_ONLY -#include -#endif - -#if FOLLY_HAVE_LIBBZ2 -#include -#endif - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using folly::io::compression::detail::dataStartsWithLE; -using folly::io::compression::detail::prefixToStringLE; - -namespace folly { -namespace io { - -Codec::Codec(CodecType type) : type_(type) { } - -// Ensure consistent behavior in the nullptr case -std::unique_ptr Codec::compress(const IOBuf* data) { - if (data == nullptr) { - throw std::invalid_argument("Codec: data must not be nullptr"); - } - uint64_t len = data->computeChainDataLength(); - if (len > maxUncompressedLength()) { - throw std::runtime_error("Codec: uncompressed length too large"); - } - - return doCompress(data); -} - -std::string Codec::compress(const StringPiece data) { - const uint64_t len = data.size(); - if (len > maxUncompressedLength()) { - throw std::runtime_error("Codec: uncompressed length too large"); - } - - return doCompressString(data); -} - -std::unique_ptr Codec::uncompress( - const IOBuf* data, - Optional uncompressedLength) { - if (data == nullptr) { - throw std::invalid_argument("Codec: data must not be nullptr"); - } - if (!uncompressedLength) { - if (needsUncompressedLength()) { - throw std::invalid_argument("Codec: uncompressed length required"); - } - } else if (*uncompressedLength > maxUncompressedLength()) { - throw std::runtime_error("Codec: uncompressed length too large"); - } - - if (data->empty()) { - if (uncompressedLength.value_or(0) != 0) { - throw std::runtime_error("Codec: invalid uncompressed length"); - } - return IOBuf::create(0); - } - - return doUncompress(data, uncompressedLength); -} - -std::string Codec::uncompress( - const StringPiece data, - Optional uncompressedLength) { - if (!uncompressedLength) { - if (needsUncompressedLength()) { - throw std::invalid_argument("Codec: uncompressed length required"); - } - } else if (*uncompressedLength > maxUncompressedLength()) { - throw std::runtime_error("Codec: uncompressed length too large"); - } - - if (data.empty()) { - if (uncompressedLength.value_or(0) != 0) { - throw std::runtime_error("Codec: invalid uncompressed length"); - } - return ""; - } - - return doUncompressString(data, uncompressedLength); -} - -bool Codec::needsUncompressedLength() const { - return doNeedsUncompressedLength(); -} - -uint64_t Codec::maxUncompressedLength() const { - return doMaxUncompressedLength(); -} - -bool Codec::doNeedsUncompressedLength() const { - return false; -} - -uint64_t Codec::doMaxUncompressedLength() const { - return UNLIMITED_UNCOMPRESSED_LENGTH; -} - -std::vector Codec::validPrefixes() const { - return {}; -} - -bool Codec::canUncompress(const IOBuf*, Optional) const { - return false; -} - -std::string Codec::doCompressString(const StringPiece data) { - const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data}; - auto outputBuffer = doCompress(&inputBuffer); - std::string output; - output.reserve(outputBuffer->computeChainDataLength()); - for (auto range : *outputBuffer) { - output.append(reinterpret_cast(range.data()), range.size()); - } - return output; -} - -std::string Codec::doUncompressString( - const StringPiece data, - Optional uncompressedLength) { - const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data}; - auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength); - std::string output; - output.reserve(outputBuffer->computeChainDataLength()); - for (auto range : *outputBuffer) { - output.append(reinterpret_cast(range.data()), range.size()); - } - return output; -} - -uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const { - return doMaxCompressedLength(uncompressedLength); -} - -Optional Codec::getUncompressedLength( - const folly::IOBuf* data, - Optional uncompressedLength) const { - auto const compressedLength = data->computeChainDataLength(); - if (compressedLength == 0) { - if (uncompressedLength.value_or(0) != 0) { - throw std::runtime_error("Invalid uncompressed length"); - } - return 0; - } - return doGetUncompressedLength(data, uncompressedLength); -} - -Optional Codec::doGetUncompressedLength( - const folly::IOBuf*, - Optional uncompressedLength) const { - return uncompressedLength; -} - -bool StreamCodec::needsDataLength() const { - return doNeedsDataLength(); -} - -bool StreamCodec::doNeedsDataLength() const { - return false; -} - -void StreamCodec::assertStateIs(State expected) const { - if (state_ != expected) { - throw std::logic_error(folly::to( - "Codec: state is ", state_, "; expected state ", expected)); - } -} - -void StreamCodec::resetStream(Optional uncompressedLength) { - state_ = State::RESET; - uncompressedLength_ = uncompressedLength; - progressMade_ = true; - doResetStream(); -} - -bool StreamCodec::compressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flushOp) { - if (state_ == State::RESET && input.empty() && - flushOp == StreamCodec::FlushOp::END && - uncompressedLength().value_or(0) != 0) { - throw std::runtime_error("Codec: invalid uncompressed length"); - } - - if (!uncompressedLength() && needsDataLength()) { - throw std::runtime_error("Codec: uncompressed length required"); - } - if (state_ == State::RESET && !input.empty() && - uncompressedLength() == uint64_t(0)) { - throw std::runtime_error("Codec: invalid uncompressed length"); - } - // Handle input state transitions - switch (flushOp) { - case StreamCodec::FlushOp::NONE: - if (state_ == State::RESET) { - state_ = State::COMPRESS; - } - assertStateIs(State::COMPRESS); - break; - case StreamCodec::FlushOp::FLUSH: - if (state_ == State::RESET || state_ == State::COMPRESS) { - state_ = State::COMPRESS_FLUSH; - } - assertStateIs(State::COMPRESS_FLUSH); - break; - case StreamCodec::FlushOp::END: - if (state_ == State::RESET || state_ == State::COMPRESS) { - state_ = State::COMPRESS_END; - } - assertStateIs(State::COMPRESS_END); - break; - } - size_t const inputSize = input.size(); - size_t const outputSize = output.size(); - bool const done = doCompressStream(input, output, flushOp); - if (!done && inputSize == input.size() && outputSize == output.size()) { - if (!progressMade_) { - throw std::runtime_error("Codec: No forward progress made"); - } - // Throw an exception if there is no progress again next time - progressMade_ = false; - } else { - progressMade_ = true; - } - // Handle output state transitions - if (done) { - if (state_ == State::COMPRESS_FLUSH) { - state_ = State::COMPRESS; - } else if (state_ == State::COMPRESS_END) { - state_ = State::END; - } - // Check internal invariants - DCHECK(input.empty()); - DCHECK(flushOp != StreamCodec::FlushOp::NONE); - } - return done; -} - -bool StreamCodec::uncompressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flushOp) { - if (state_ == State::RESET && input.empty()) { - if (uncompressedLength().value_or(0) == 0) { - return true; - } - return false; - } - // Handle input state transitions - if (state_ == State::RESET) { - state_ = State::UNCOMPRESS; - } - assertStateIs(State::UNCOMPRESS); - size_t const inputSize = input.size(); - size_t const outputSize = output.size(); - bool const done = doUncompressStream(input, output, flushOp); - if (!done && inputSize == input.size() && outputSize == output.size()) { - if (!progressMade_) { - throw std::runtime_error("Codec: no forward progress made"); - } - // Throw an exception if there is no progress again next time - progressMade_ = false; - } else { - progressMade_ = true; - } - // Handle output state transitions - if (done) { - state_ = State::END; - } - return done; -} - -static std::unique_ptr addOutputBuffer( - MutableByteRange& output, - uint64_t size) { - DCHECK(output.empty()); - auto buffer = IOBuf::create(size); - buffer->append(buffer->capacity()); - output = {buffer->writableData(), buffer->length()}; - return buffer; -} - -std::unique_ptr StreamCodec::doCompress(IOBuf const* data) { - uint64_t const uncompressedLength = data->computeChainDataLength(); - resetStream(uncompressedLength); - uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength); - - auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB - auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB - - MutableByteRange output; - auto buffer = addOutputBuffer( - output, - maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen - : kDefaultBufferLength); - - // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer - IOBuf const* current = data; - ByteRange input{current->data(), current->length()}; - StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE; - bool done = false; - while (!done) { - while (input.empty() && current->next() != data) { - current = current->next(); - input = {current->data(), current->length()}; - } - if (current->next() == data) { - // This is the last input buffer so end the stream - flushOp = StreamCodec::FlushOp::END; - } - if (output.empty()) { - buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength)); - } - done = compressStream(input, output, flushOp); - if (done) { - DCHECK(input.empty()); - DCHECK(flushOp == StreamCodec::FlushOp::END); - DCHECK_EQ(current->next(), data); - } - } - buffer->prev()->trimEnd(output.size()); - return buffer; -} - -static uint64_t computeBufferLength( - uint64_t const compressedLength, - uint64_t const blockSize) { - uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB - uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength); - return std::min(goodBufferSize, kMaxBufferLength); -} - -std::unique_ptr StreamCodec::doUncompress( - IOBuf const* data, - Optional uncompressedLength) { - auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB - auto constexpr kBlockSize = uint64_t(128) << 10; - auto const defaultBufferLength = - computeBufferLength(data->computeChainDataLength(), kBlockSize); - - uncompressedLength = getUncompressedLength(data, uncompressedLength); - resetStream(uncompressedLength); - - MutableByteRange output; - auto buffer = addOutputBuffer( - output, - (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength - ? *uncompressedLength - : defaultBufferLength)); - - // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer - IOBuf const* current = data; - ByteRange input{current->data(), current->length()}; - StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE; - bool done = false; - while (!done) { - while (input.empty() && current->next() != data) { - current = current->next(); - input = {current->data(), current->length()}; - } - if (current->next() == data) { - // Tell the uncompressor there is no more input (it may optimize) - flushOp = StreamCodec::FlushOp::END; - } - if (output.empty()) { - buffer->prependChain(addOutputBuffer(output, defaultBufferLength)); - } - done = uncompressStream(input, output, flushOp); - } - if (!input.empty()) { - throw std::runtime_error("Codec: Junk after end of data"); - } - - buffer->prev()->trimEnd(output.size()); - if (uncompressedLength && - *uncompressedLength != buffer->computeChainDataLength()) { - throw std::runtime_error("Codec: invalid uncompressed length"); - } - - return buffer; -} - -namespace { - -/** - * No compression - */ -class NoCompressionCodec final : public Codec { - public: - static std::unique_ptr create(int level, CodecType type); - explicit NoCompressionCodec(int level, CodecType type); - - private: - uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; - std::unique_ptr doCompress(const IOBuf* data) override; - std::unique_ptr doUncompress( - const IOBuf* data, - Optional uncompressedLength) override; -}; - -std::unique_ptr NoCompressionCodec::create(int level, CodecType type) { - return std::make_unique(level, type); -} - -NoCompressionCodec::NoCompressionCodec(int level, CodecType type) - : Codec(type) { - DCHECK(type == CodecType::NO_COMPRESSION); - switch (level) { - case COMPRESSION_LEVEL_DEFAULT: - case COMPRESSION_LEVEL_FASTEST: - case COMPRESSION_LEVEL_BEST: - level = 0; - } - if (level != 0) { - throw std::invalid_argument(to( - "NoCompressionCodec: invalid level ", level)); - } -} - -uint64_t NoCompressionCodec::doMaxCompressedLength( - uint64_t uncompressedLength) const { - return uncompressedLength; -} - -std::unique_ptr NoCompressionCodec::doCompress( - const IOBuf* data) { - return data->clone(); -} - -std::unique_ptr NoCompressionCodec::doUncompress( - const IOBuf* data, - Optional uncompressedLength) { - if (uncompressedLength && - data->computeChainDataLength() != *uncompressedLength) { - throw std::runtime_error( - to("NoCompressionCodec: invalid uncompressed length")); - } - return data->clone(); -} - -#if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA) - -namespace { - -void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) { - DCHECK_GE(out->tailroom(), kMaxVarintLength64); - out->append(encodeVarint(val, out->writableTail())); -} - -inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) { - uint64_t val = 0; - int8_t b = 0; - for (int shift = 0; shift <= 63; shift += 7) { - b = cursor.read(); - val |= static_cast(b & 0x7f) << shift; - if (b >= 0) { - break; - } - } - if (b < 0) { - throw std::invalid_argument("Invalid varint value. Too big."); - } - return val; -} - -} // namespace - -#endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA - -#if FOLLY_HAVE_LIBLZ4 - -/** - * LZ4 compression - */ -class LZ4Codec final : public Codec { - public: - static std::unique_ptr create(int level, CodecType type); - explicit LZ4Codec(int level, CodecType type); - - private: - bool doNeedsUncompressedLength() const override; - uint64_t doMaxUncompressedLength() const override; - uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; - - bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; } - - std::unique_ptr doCompress(const IOBuf* data) override; - std::unique_ptr doUncompress( - const IOBuf* data, - Optional uncompressedLength) override; - - bool highCompression_; -}; - -std::unique_ptr LZ4Codec::create(int level, CodecType type) { - return std::make_unique(level, type); -} - -LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) { - DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE); - - switch (level) { - case COMPRESSION_LEVEL_FASTEST: - case COMPRESSION_LEVEL_DEFAULT: - level = 1; - break; - case COMPRESSION_LEVEL_BEST: - level = 2; - break; - } - if (level < 1 || level > 2) { - throw std::invalid_argument(to( - "LZ4Codec: invalid level: ", level)); - } - highCompression_ = (level > 1); -} - -bool LZ4Codec::doNeedsUncompressedLength() const { - return !encodeSize(); -} - -// The value comes from lz4.h in lz4-r117, but older versions of lz4 don't -// define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it -// here. -#ifndef LZ4_MAX_INPUT_SIZE -# define LZ4_MAX_INPUT_SIZE 0x7E000000 -#endif - -uint64_t LZ4Codec::doMaxUncompressedLength() const { - return LZ4_MAX_INPUT_SIZE; -} - -uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const { - return LZ4_compressBound(uncompressedLength) + - (encodeSize() ? kMaxVarintLength64 : 0); -} - -std::unique_ptr LZ4Codec::doCompress(const IOBuf* data) { - IOBuf clone; - if (data->isChained()) { - // LZ4 doesn't support streaming, so we have to coalesce - clone = data->cloneCoalescedAsValue(); - data = &clone; - } - - auto out = IOBuf::create(maxCompressedLength(data->length())); - if (encodeSize()) { - encodeVarintToIOBuf(data->length(), out.get()); - } - - int n; - auto input = reinterpret_cast(data->data()); - auto output = reinterpret_cast(out->writableTail()); - const auto inputLength = data->length(); -#if LZ4_VERSION_NUMBER >= 10700 - if (highCompression_) { - n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0); - } else { - n = LZ4_compress_default(input, output, inputLength, out->tailroom()); - } -#else - if (highCompression_) { - n = LZ4_compressHC(input, output, inputLength); - } else { - n = LZ4_compress(input, output, inputLength); - } -#endif - - CHECK_GE(n, 0); - CHECK_LE(n, out->capacity()); - - out->append(n); - return out; -} - -std::unique_ptr LZ4Codec::doUncompress( - const IOBuf* data, - Optional uncompressedLength) { - IOBuf clone; - if (data->isChained()) { - // LZ4 doesn't support streaming, so we have to coalesce - clone = data->cloneCoalescedAsValue(); - data = &clone; - } - - folly::io::Cursor cursor(data); - uint64_t actualUncompressedLength; - if (encodeSize()) { - actualUncompressedLength = decodeVarintFromCursor(cursor); - if (uncompressedLength && *uncompressedLength != actualUncompressedLength) { - throw std::runtime_error("LZ4Codec: invalid uncompressed length"); - } - } else { - // Invariants - DCHECK(uncompressedLength.hasValue()); - DCHECK(*uncompressedLength <= maxUncompressedLength()); - actualUncompressedLength = *uncompressedLength; - } - - auto sp = StringPiece{cursor.peekBytes()}; - auto out = IOBuf::create(actualUncompressedLength); - int n = LZ4_decompress_safe( - sp.data(), - reinterpret_cast(out->writableTail()), - sp.size(), - actualUncompressedLength); - - if (n < 0 || uint64_t(n) != actualUncompressedLength) { - throw std::runtime_error(to( - "LZ4 decompression returned invalid value ", n)); - } - out->append(actualUncompressedLength); - return out; -} - -#if LZ4_VERSION_NUMBER >= 10301 - -class LZ4FrameCodec final : public Codec { - public: - static std::unique_ptr create(int level, CodecType type); - explicit LZ4FrameCodec(int level, CodecType type); - ~LZ4FrameCodec() override; - - std::vector validPrefixes() const override; - bool canUncompress(const IOBuf* data, Optional uncompressedLength) - const override; - - private: - uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; - - std::unique_ptr doCompress(const IOBuf* data) override; - std::unique_ptr doUncompress( - const IOBuf* data, - Optional uncompressedLength) override; - - // Reset the dctx_ if it is dirty or null. - void resetDCtx(); - - int level_; - LZ4F_decompressionContext_t dctx_{nullptr}; - bool dirty_{false}; -}; - -/* static */ std::unique_ptr LZ4FrameCodec::create( - int level, - CodecType type) { - return std::make_unique(level, type); -} - -static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204; - -std::vector LZ4FrameCodec::validPrefixes() const { - return {prefixToStringLE(kLZ4FrameMagicLE)}; -} - -bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional) const { - return dataStartsWithLE(data, kLZ4FrameMagicLE); -} - -uint64_t LZ4FrameCodec::doMaxCompressedLength( - uint64_t uncompressedLength) const { - LZ4F_preferences_t prefs{}; - prefs.compressionLevel = level_; - prefs.frameInfo.contentSize = uncompressedLength; - return LZ4F_compressFrameBound(uncompressedLength, &prefs); -} - -static size_t lz4FrameThrowOnError(size_t code) { - if (LZ4F_isError(code)) { - throw std::runtime_error( - to("LZ4Frame error: ", LZ4F_getErrorName(code))); - } - return code; -} - -void LZ4FrameCodec::resetDCtx() { - if (dctx_ && !dirty_) { - return; - } - if (dctx_) { - LZ4F_freeDecompressionContext(dctx_); - } - lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100)); - dirty_ = false; -} - -LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) { - DCHECK(type == CodecType::LZ4_FRAME); - switch (level) { - case COMPRESSION_LEVEL_FASTEST: - case COMPRESSION_LEVEL_DEFAULT: - level_ = 0; - break; - case COMPRESSION_LEVEL_BEST: - level_ = 16; - break; - default: - level_ = level; - break; - } -} - -LZ4FrameCodec::~LZ4FrameCodec() { - if (dctx_) { - LZ4F_freeDecompressionContext(dctx_); - } -} - -std::unique_ptr LZ4FrameCodec::doCompress(const IOBuf* data) { - // LZ4 Frame compression doesn't support streaming so we have to coalesce - IOBuf clone; - if (data->isChained()) { - clone = data->cloneCoalescedAsValue(); - data = &clone; - } - // Set preferences - const auto uncompressedLength = data->length(); - LZ4F_preferences_t prefs{}; - prefs.compressionLevel = level_; - prefs.frameInfo.contentSize = uncompressedLength; - // Compress - auto buf = IOBuf::create(maxCompressedLength(uncompressedLength)); - const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame( - buf->writableTail(), - buf->tailroom(), - data->data(), - data->length(), - &prefs)); - buf->append(written); - return buf; -} - -std::unique_ptr LZ4FrameCodec::doUncompress( - const IOBuf* data, - Optional uncompressedLength) { - // Reset the dctx if any errors have occurred - resetDCtx(); - // Coalesce the data - ByteRange in = *data->begin(); - IOBuf clone; - if (data->isChained()) { - clone = data->cloneCoalescedAsValue(); - in = clone.coalesce(); - } - data = nullptr; - // Select decompression options - LZ4F_decompressOptions_t options; - options.stableDst = 1; - // Select blockSize and growthSize for the IOBufQueue - IOBufQueue queue(IOBufQueue::cacheChainLength()); - auto blockSize = uint64_t{64} << 10; - auto growthSize = uint64_t{4} << 20; - if (uncompressedLength) { - // Allocate uncompressedLength in one chunk (up to 64 MB) - const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20); - queue.preallocate(allocateSize, allocateSize); - blockSize = std::min(*uncompressedLength, blockSize); - growthSize = std::min(*uncompressedLength, growthSize); - } else { - // Reduce growthSize for small data - const auto guessUncompressedLen = - 4 * std::max(blockSize, in.size()); - growthSize = std::min(guessUncompressedLen, growthSize); - } - // Once LZ4_decompress() is called, the dctx_ cannot be reused until it - // returns 0 - dirty_ = true; - // Decompress until the frame is over - size_t code = 0; - do { - // Allocate enough space to decompress at least a block - void* out; - size_t outSize; - std::tie(out, outSize) = queue.preallocate(blockSize, growthSize); - // Decompress - size_t inSize = in.size(); - code = lz4FrameThrowOnError( - LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options)); - if (in.empty() && outSize == 0 && code != 0) { - // We passed no input, no output was produced, and the frame isn't over - // No more forward progress is possible - throw std::runtime_error("LZ4Frame error: Incomplete frame"); - } - in.uncheckedAdvance(inSize); - queue.postallocate(outSize); - } while (code != 0); - // At this point the decompression context can be reused - dirty_ = false; - if (uncompressedLength && queue.chainLength() != *uncompressedLength) { - throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength"); - } - return queue.move(); -} - -#endif // LZ4_VERSION_NUMBER >= 10301 -#endif // FOLLY_HAVE_LIBLZ4 - -#if FOLLY_HAVE_LIBSNAPPY - -/** - * Snappy compression - */ - -/** - * Implementation of snappy::Source that reads from a IOBuf chain. - */ -class IOBufSnappySource final : public snappy::Source { - public: - explicit IOBufSnappySource(const IOBuf* data); - size_t Available() const override; - const char* Peek(size_t* len) override; - void Skip(size_t n) override; - private: - size_t available_; - io::Cursor cursor_; -}; - -IOBufSnappySource::IOBufSnappySource(const IOBuf* data) - : available_(data->computeChainDataLength()), - cursor_(data) { -} - -size_t IOBufSnappySource::Available() const { - return available_; -} - -const char* IOBufSnappySource::Peek(size_t* len) { - auto sp = StringPiece{cursor_.peekBytes()}; - *len = sp.size(); - return sp.data(); -} - -void IOBufSnappySource::Skip(size_t n) { - CHECK_LE(n, available_); - cursor_.skip(n); - available_ -= n; -} - -class SnappyCodec final : public Codec { - public: - static std::unique_ptr create(int level, CodecType type); - explicit SnappyCodec(int level, CodecType type); - - private: - uint64_t doMaxUncompressedLength() const override; - uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; - std::unique_ptr doCompress(const IOBuf* data) override; - std::unique_ptr doUncompress( - const IOBuf* data, - Optional uncompressedLength) override; -}; - -std::unique_ptr SnappyCodec::create(int level, CodecType type) { - return std::make_unique(level, type); -} - -SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) { - DCHECK(type == CodecType::SNAPPY); - switch (level) { - case COMPRESSION_LEVEL_FASTEST: - case COMPRESSION_LEVEL_DEFAULT: - case COMPRESSION_LEVEL_BEST: - level = 1; - } - if (level != 1) { - throw std::invalid_argument(to( - "SnappyCodec: invalid level: ", level)); - } -} - -uint64_t SnappyCodec::doMaxUncompressedLength() const { - // snappy.h uses uint32_t for lengths, so there's that. - return std::numeric_limits::max(); -} - -uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const { - return snappy::MaxCompressedLength(uncompressedLength); -} - -std::unique_ptr SnappyCodec::doCompress(const IOBuf* data) { - IOBufSnappySource source(data); - auto out = IOBuf::create(maxCompressedLength(source.Available())); - - snappy::UncheckedByteArraySink sink(reinterpret_cast( - out->writableTail())); - - size_t n = snappy::Compress(&source, &sink); - - CHECK_LE(n, out->capacity()); - out->append(n); - return out; -} - -std::unique_ptr SnappyCodec::doUncompress( - const IOBuf* data, - Optional uncompressedLength) { - uint32_t actualUncompressedLength = 0; - - { - IOBufSnappySource source(data); - if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) { - throw std::runtime_error("snappy::GetUncompressedLength failed"); - } - if (uncompressedLength && *uncompressedLength != actualUncompressedLength) { - throw std::runtime_error("snappy: invalid uncompressed length"); - } - } - - auto out = IOBuf::create(actualUncompressedLength); - - { - IOBufSnappySource source(data); - if (!snappy::RawUncompress(&source, - reinterpret_cast(out->writableTail()))) { - throw std::runtime_error("snappy::RawUncompress failed"); - } - } - - out->append(actualUncompressedLength); - return out; -} - -#endif // FOLLY_HAVE_LIBSNAPPY - -#if FOLLY_HAVE_LIBLZMA - -/** - * LZMA2 compression - */ -class LZMA2StreamCodec final : public StreamCodec { - public: - static std::unique_ptr createCodec(int level, CodecType type); - static std::unique_ptr createStream(int level, CodecType type); - explicit LZMA2StreamCodec(int level, CodecType type); - ~LZMA2StreamCodec() override; - - std::vector validPrefixes() const override; - bool canUncompress(const IOBuf* data, Optional uncompressedLength) - const override; - - private: - bool doNeedsDataLength() const override; - uint64_t doMaxUncompressedLength() const override; - uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; - - bool encodeSize() const { - return type() == CodecType::LZMA2_VARINT_SIZE; - } - - void doResetStream() override; - bool doCompressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flushOp) override; - bool doUncompressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flushOp) override; - - void resetCStream(); - void resetDStream(); - - bool decodeAndCheckVarint(ByteRange& input); - bool flushVarintBuffer(MutableByteRange& output); - void resetVarintBuffer(); - - Optional cstream_{}; - Optional dstream_{}; - - std::array varintBuffer_; - ByteRange varintToEncode_; - size_t varintBufferPos_{0}; - - int level_; - bool needReset_{true}; - bool needDecodeSize_{false}; -}; - -static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD; -static constexpr unsigned kLZMA2MagicBytes = 6; - -std::vector LZMA2StreamCodec::validPrefixes() const { - if (type() == CodecType::LZMA2_VARINT_SIZE) { - return {}; - } - return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)}; -} - -bool LZMA2StreamCodec::doNeedsDataLength() const { - return encodeSize(); -} - -bool LZMA2StreamCodec::canUncompress(const IOBuf* data, Optional) - const { - if (type() == CodecType::LZMA2_VARINT_SIZE) { - return false; - } - // Returns false for all inputs less than 8 bytes. - // This is okay, because no valid LZMA2 streams are less than 8 bytes. - return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes); -} - -std::unique_ptr LZMA2StreamCodec::createCodec( - int level, - CodecType type) { - return make_unique(level, type); -} - -std::unique_ptr LZMA2StreamCodec::createStream( - int level, - CodecType type) { - return make_unique(level, type); -} - -LZMA2StreamCodec::LZMA2StreamCodec(int level, CodecType type) - : StreamCodec(type) { - DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE); - switch (level) { - case COMPRESSION_LEVEL_FASTEST: - level = 0; - break; - case COMPRESSION_LEVEL_DEFAULT: - level = LZMA_PRESET_DEFAULT; - break; - case COMPRESSION_LEVEL_BEST: - level = 9; - break; - } - if (level < 0 || level > 9) { - throw std::invalid_argument( - to("LZMA2Codec: invalid level: ", level)); - } - level_ = level; -} - -LZMA2StreamCodec::~LZMA2StreamCodec() { - if (cstream_) { - lzma_end(cstream_.get_pointer()); - cstream_.clear(); - } - if (dstream_) { - lzma_end(dstream_.get_pointer()); - dstream_.clear(); - } -} - -uint64_t LZMA2StreamCodec::doMaxUncompressedLength() const { - // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)" - return uint64_t(1) << 63; -} - -uint64_t LZMA2StreamCodec::doMaxCompressedLength( - uint64_t uncompressedLength) const { - return lzma_stream_buffer_bound(uncompressedLength) + - (encodeSize() ? kMaxVarintLength64 : 0); -} - -void LZMA2StreamCodec::doResetStream() { - needReset_ = true; -} - -void LZMA2StreamCodec::resetCStream() { - if (!cstream_) { - cstream_.assign(LZMA_STREAM_INIT); - } - lzma_ret const rc = - lzma_easy_encoder(cstream_.get_pointer(), level_, LZMA_CHECK_NONE); - if (rc != LZMA_OK) { - throw std::runtime_error(folly::to( - "LZMA2StreamCodec: lzma_easy_encoder error: ", rc)); - } -} - -void LZMA2StreamCodec::resetDStream() { - if (!dstream_) { - dstream_.assign(LZMA_STREAM_INIT); - } - lzma_ret const rc = lzma_auto_decoder( - dstream_.get_pointer(), std::numeric_limits::max(), 0); - if (rc != LZMA_OK) { - throw std::runtime_error(folly::to( - "LZMA2StreamCodec: lzma_auto_decoder error: ", rc)); - } -} - -static lzma_ret lzmaThrowOnError(lzma_ret const rc) { - switch (rc) { - case LZMA_OK: - case LZMA_STREAM_END: - case LZMA_BUF_ERROR: // not fatal: returned if no progress was made twice - return rc; - default: - throw std::runtime_error( - to("LZMA2StreamCodec: error: ", rc)); - } -} - -static lzma_action lzmaTranslateFlush(StreamCodec::FlushOp flush) { - switch (flush) { - case StreamCodec::FlushOp::NONE: - return LZMA_RUN; - case StreamCodec::FlushOp::FLUSH: - return LZMA_SYNC_FLUSH; - case StreamCodec::FlushOp::END: - return LZMA_FINISH; - default: - throw std::invalid_argument("LZMA2StreamCodec: Invalid flush"); - } -} - -/** - * Flushes the varint buffer. - * Advances output by the number of bytes written. - * Returns true when flushing is complete. - */ -bool LZMA2StreamCodec::flushVarintBuffer(MutableByteRange& output) { - if (varintToEncode_.empty()) { - return true; - } - const size_t numBytesToCopy = std::min(varintToEncode_.size(), output.size()); - if (numBytesToCopy > 0) { - memcpy(output.data(), varintToEncode_.data(), numBytesToCopy); - } - varintToEncode_.advance(numBytesToCopy); - output.advance(numBytesToCopy); - return varintToEncode_.empty(); -} - -bool LZMA2StreamCodec::doCompressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flushOp) { - if (needReset_) { - resetCStream(); - if (encodeSize()) { - varintBufferPos_ = 0; - size_t const varintSize = - encodeVarint(*uncompressedLength(), varintBuffer_.data()); - varintToEncode_ = {varintBuffer_.data(), varintSize}; - } - needReset_ = false; - } - - if (!flushVarintBuffer(output)) { - return false; - } - - cstream_->next_in = const_cast(input.data()); - cstream_->avail_in = input.size(); - cstream_->next_out = output.data(); - cstream_->avail_out = output.size(); - SCOPE_EXIT { - input.uncheckedAdvance(input.size() - cstream_->avail_in); - output.uncheckedAdvance(output.size() - cstream_->avail_out); - }; - lzma_ret const rc = lzmaThrowOnError( - lzma_code(cstream_.get_pointer(), lzmaTranslateFlush(flushOp))); - switch (flushOp) { - case StreamCodec::FlushOp::NONE: - return false; - case StreamCodec::FlushOp::FLUSH: - return cstream_->avail_in == 0 && cstream_->avail_out != 0; - case StreamCodec::FlushOp::END: - return rc == LZMA_STREAM_END; - default: - throw std::invalid_argument("LZMA2StreamCodec: invalid FlushOp"); - } -} - -/** - * Attempts to decode a varint from input. - * The function advances input by the number of bytes read. - * - * If there are too many bytes and the varint is not valid, throw a - * runtime_error. - * - * If the uncompressed length was provided and a decoded varint does not match - * the provided length, throw a runtime_error. - * - * Returns true if the varint was successfully decoded and matches the - * uncompressed length if provided, and false if more bytes are needed. - */ -bool LZMA2StreamCodec::decodeAndCheckVarint(ByteRange& input) { - if (input.empty()) { - return false; - } - size_t const numBytesToCopy = - std::min(kMaxVarintLength64 - varintBufferPos_, input.size()); - memcpy(varintBuffer_.data() + varintBufferPos_, input.data(), numBytesToCopy); - - size_t const rangeSize = varintBufferPos_ + numBytesToCopy; - ByteRange range{varintBuffer_.data(), rangeSize}; - auto const ret = tryDecodeVarint(range); - - if (ret.hasValue()) { - size_t const varintSize = rangeSize - range.size(); - input.advance(varintSize - varintBufferPos_); - if (uncompressedLength() && *uncompressedLength() != ret.value()) { - throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length"); - } - return true; - } else if (ret.error() == DecodeVarintError::TooManyBytes) { - throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length"); - } else { - // Too few bytes - input.advance(numBytesToCopy); - varintBufferPos_ += numBytesToCopy; - return false; - } -} - -bool LZMA2StreamCodec::doUncompressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flushOp) { - if (needReset_) { - resetDStream(); - needReset_ = false; - needDecodeSize_ = encodeSize(); - if (encodeSize()) { - // Reset buffer - varintBufferPos_ = 0; - } - } - - if (needDecodeSize_) { - // Try decoding the varint. If the input does not contain the entire varint, - // buffer the input. If the varint can not be decoded, fail. - if (!decodeAndCheckVarint(input)) { - return false; - } - needDecodeSize_ = false; - } - - dstream_->next_in = const_cast(input.data()); - dstream_->avail_in = input.size(); - dstream_->next_out = output.data(); - dstream_->avail_out = output.size(); - SCOPE_EXIT { - input.advance(input.size() - dstream_->avail_in); - output.advance(output.size() - dstream_->avail_out); - }; - - lzma_ret rc; - switch (flushOp) { - case StreamCodec::FlushOp::NONE: - case StreamCodec::FlushOp::FLUSH: - rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_RUN)); - break; - case StreamCodec::FlushOp::END: - rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_FINISH)); - break; - default: - throw std::invalid_argument("LZMA2StreamCodec: invalid flush"); - } - return rc == LZMA_STREAM_END; -} -#endif // FOLLY_HAVE_LIBLZMA - -#ifdef FOLLY_HAVE_LIBZSTD - -namespace { -void zstdFreeCStream(ZSTD_CStream* zcs) { - ZSTD_freeCStream(zcs); -} - -void zstdFreeDStream(ZSTD_DStream* zds) { - ZSTD_freeDStream(zds); -} -} - -/** - * ZSTD compression - */ -class ZSTDStreamCodec final : public StreamCodec { - public: - static std::unique_ptr createCodec(int level, CodecType); - static std::unique_ptr createStream(int level, CodecType); - explicit ZSTDStreamCodec(int level, CodecType type); - - std::vector validPrefixes() const override; - bool canUncompress(const IOBuf* data, Optional uncompressedLength) - const override; - - private: - bool doNeedsUncompressedLength() const override; - uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; - Optional doGetUncompressedLength( - IOBuf const* data, - Optional uncompressedLength) const override; - - void doResetStream() override; - bool doCompressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flushOp) override; - bool doUncompressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flushOp) override; - - void resetCStream(); - void resetDStream(); - - bool tryBlockCompress(ByteRange& input, MutableByteRange& output) const; - bool tryBlockUncompress(ByteRange& input, MutableByteRange& output) const; - - int level_; - bool needReset_{true}; - std::unique_ptr< - ZSTD_CStream, - folly::static_function_deleter> - cstream_{nullptr}; - std::unique_ptr< - ZSTD_DStream, - folly::static_function_deleter> - dstream_{nullptr}; -}; - -static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528; - -std::vector ZSTDStreamCodec::validPrefixes() const { - return {prefixToStringLE(kZSTDMagicLE)}; -} - -bool ZSTDStreamCodec::canUncompress(const IOBuf* data, Optional) - const { - return dataStartsWithLE(data, kZSTDMagicLE); -} - -std::unique_ptr ZSTDStreamCodec::createCodec(int level, CodecType type) { - return make_unique(level, type); -} - -std::unique_ptr ZSTDStreamCodec::createStream( - int level, - CodecType type) { - return make_unique(level, type); -} - -ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type) - : StreamCodec(type) { - DCHECK(type == CodecType::ZSTD); - switch (level) { - case COMPRESSION_LEVEL_FASTEST: - level = 1; - break; - case COMPRESSION_LEVEL_DEFAULT: - level = 1; - break; - case COMPRESSION_LEVEL_BEST: - level = 19; - break; - } - if (level < 1 || level > ZSTD_maxCLevel()) { - throw std::invalid_argument( - to("ZSTD: invalid level: ", level)); - } - level_ = level; -} - -bool ZSTDStreamCodec::doNeedsUncompressedLength() const { - return false; -} - -uint64_t ZSTDStreamCodec::doMaxCompressedLength( - uint64_t uncompressedLength) const { - return ZSTD_compressBound(uncompressedLength); -} - -void zstdThrowIfError(size_t rc) { - if (!ZSTD_isError(rc)) { - return; - } - throw std::runtime_error( - to("ZSTD returned an error: ", ZSTD_getErrorName(rc))); -} - -Optional ZSTDStreamCodec::doGetUncompressedLength( - IOBuf const* data, - Optional uncompressedLength) const { - // Read decompressed size from frame if available in first IOBuf. - auto const decompressedSize = - ZSTD_getDecompressedSize(data->data(), data->length()); - if (decompressedSize != 0) { - if (uncompressedLength && *uncompressedLength != decompressedSize) { - throw std::runtime_error("ZSTD: invalid uncompressed length"); - } - uncompressedLength = decompressedSize; - } - return uncompressedLength; -} - -void ZSTDStreamCodec::doResetStream() { - needReset_ = true; -} - -bool ZSTDStreamCodec::tryBlockCompress( - ByteRange& input, - MutableByteRange& output) const { - DCHECK(needReset_); - // We need to know that we have enough output space to use block compression - if (output.size() < ZSTD_compressBound(input.size())) { - return false; - } - size_t const length = ZSTD_compress( - output.data(), output.size(), input.data(), input.size(), level_); - zstdThrowIfError(length); - input.uncheckedAdvance(input.size()); - output.uncheckedAdvance(length); - return true; -} - -void ZSTDStreamCodec::resetCStream() { - if (!cstream_) { - cstream_.reset(ZSTD_createCStream()); - if (!cstream_) { - throw std::bad_alloc{}; - } - } - // Advanced API usage works for all supported versions of zstd. - // Required to set contentSizeFlag. - auto params = ZSTD_getParams(level_, uncompressedLength().value_or(0), 0); - params.fParams.contentSizeFlag = uncompressedLength().hasValue(); - zstdThrowIfError(ZSTD_initCStream_advanced( - cstream_.get(), nullptr, 0, params, uncompressedLength().value_or(0))); -} - -bool ZSTDStreamCodec::doCompressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flushOp) { - if (needReset_) { - // If we are given all the input in one chunk try to use block compression - if (flushOp == StreamCodec::FlushOp::END && - tryBlockCompress(input, output)) { - return true; - } - resetCStream(); - needReset_ = false; - } - ZSTD_inBuffer in = {input.data(), input.size(), 0}; - ZSTD_outBuffer out = {output.data(), output.size(), 0}; - SCOPE_EXIT { - input.uncheckedAdvance(in.pos); - output.uncheckedAdvance(out.pos); - }; - if (flushOp == StreamCodec::FlushOp::NONE || !input.empty()) { - zstdThrowIfError(ZSTD_compressStream(cstream_.get(), &out, &in)); - } - if (in.pos == in.size && flushOp != StreamCodec::FlushOp::NONE) { - size_t rc; - switch (flushOp) { - case StreamCodec::FlushOp::FLUSH: - rc = ZSTD_flushStream(cstream_.get(), &out); - break; - case StreamCodec::FlushOp::END: - rc = ZSTD_endStream(cstream_.get(), &out); - break; - default: - throw std::invalid_argument("ZSTD: invalid FlushOp"); - } - zstdThrowIfError(rc); - if (rc == 0) { - return true; - } - } - return false; -} - -bool ZSTDStreamCodec::tryBlockUncompress( - ByteRange& input, - MutableByteRange& output) const { - DCHECK(needReset_); -#if ZSTD_VERSION_NUMBER < 10104 - // We require ZSTD_findFrameCompressedSize() to perform this optimization. - return false; -#else - // We need to know the uncompressed length and have enough output space. - if (!uncompressedLength() || output.size() < *uncompressedLength()) { - return false; - } - size_t const compressedLength = - ZSTD_findFrameCompressedSize(input.data(), input.size()); - zstdThrowIfError(compressedLength); - size_t const length = ZSTD_decompress( - output.data(), *uncompressedLength(), input.data(), compressedLength); - zstdThrowIfError(length); - if (length != *uncompressedLength()) { - throw std::runtime_error("ZSTDStreamCodec: Incorrect uncompressed length"); - } - input.uncheckedAdvance(compressedLength); - output.uncheckedAdvance(length); - return true; -#endif -} - -void ZSTDStreamCodec::resetDStream() { - if (!dstream_) { - dstream_.reset(ZSTD_createDStream()); - if (!dstream_) { - throw std::bad_alloc{}; - } - } - zstdThrowIfError(ZSTD_initDStream(dstream_.get())); -} - -bool ZSTDStreamCodec::doUncompressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flushOp) { - if (needReset_) { - // If we are given all the input in one chunk try to use block uncompression - if (flushOp == StreamCodec::FlushOp::END && - tryBlockUncompress(input, output)) { - return true; - } - resetDStream(); - needReset_ = false; - } - ZSTD_inBuffer in = {input.data(), input.size(), 0}; - ZSTD_outBuffer out = {output.data(), output.size(), 0}; - SCOPE_EXIT { - input.uncheckedAdvance(in.pos); - output.uncheckedAdvance(out.pos); - }; - size_t const rc = ZSTD_decompressStream(dstream_.get(), &out, &in); - zstdThrowIfError(rc); - return rc == 0; -} - -#endif // FOLLY_HAVE_LIBZSTD - -#if FOLLY_HAVE_LIBBZ2 - -class Bzip2Codec final : public Codec { - public: - static std::unique_ptr create(int level, CodecType type); - explicit Bzip2Codec(int level, CodecType type); - - std::vector validPrefixes() const override; - bool canUncompress(IOBuf const* data, Optional uncompressedLength) - const override; - - private: - uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; - std::unique_ptr doCompress(IOBuf const* data) override; - std::unique_ptr doUncompress( - IOBuf const* data, - Optional uncompressedLength) override; - - int level_; -}; - -/* static */ std::unique_ptr Bzip2Codec::create( - int level, - CodecType type) { - return std::make_unique(level, type); -} - -Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) { - DCHECK(type == CodecType::BZIP2); - switch (level) { - case COMPRESSION_LEVEL_FASTEST: - level = 1; - break; - case COMPRESSION_LEVEL_DEFAULT: - level = 9; - break; - case COMPRESSION_LEVEL_BEST: - level = 9; - break; - } - if (level < 1 || level > 9) { - throw std::invalid_argument( - to("Bzip2: invalid level: ", level)); - } - level_ = level; -} - -static uint32_t constexpr kBzip2MagicLE = 0x685a42; -static uint64_t constexpr kBzip2MagicBytes = 3; - -std::vector Bzip2Codec::validPrefixes() const { - return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)}; -} - -bool Bzip2Codec::canUncompress(IOBuf const* data, Optional) const { - return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes); -} - -uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const { - // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress - // To guarantee that the compressed data will fit in its buffer, allocate an - // output buffer of size 1% larger than the uncompressed data, plus six - // hundred extra bytes. - return uncompressedLength + uncompressedLength / 100 + 600; -} - -static bz_stream createBzStream() { - bz_stream stream; - stream.bzalloc = nullptr; - stream.bzfree = nullptr; - stream.opaque = nullptr; - stream.next_in = stream.next_out = nullptr; - stream.avail_in = stream.avail_out = 0; - return stream; -} - -// Throws on error condition, otherwise returns the code. -static int bzCheck(int const rc) { - switch (rc) { - case BZ_OK: - case BZ_RUN_OK: - case BZ_FLUSH_OK: - case BZ_FINISH_OK: - case BZ_STREAM_END: - return rc; - default: - throw std::runtime_error(to("Bzip2 error: ", rc)); - } -} - -static std::unique_ptr addOutputBuffer( - bz_stream* stream, - uint64_t const bufferLength) { - DCHECK_LE(bufferLength, std::numeric_limits::max()); - DCHECK_EQ(stream->avail_out, 0); - - auto buf = IOBuf::create(bufferLength); - buf->append(buf->capacity()); - - stream->next_out = reinterpret_cast(buf->writableData()); - stream->avail_out = buf->length(); - - return buf; -} - -std::unique_ptr Bzip2Codec::doCompress(IOBuf const* data) { - bz_stream stream = createBzStream(); - bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0)); - SCOPE_EXIT { - bzCheck(BZ2_bzCompressEnd(&stream)); - }; - - uint64_t const uncompressedLength = data->computeChainDataLength(); - uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength); - uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB - uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20; - - auto out = addOutputBuffer( - &stream, - maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen - : kDefaultBufferLength); - - for (auto range : *data) { - while (!range.empty()) { - auto const inSize = std::min(range.size(), kMaxSingleStepLength); - stream.next_in = - const_cast(reinterpret_cast(range.data())); - stream.avail_in = inSize; - - if (stream.avail_out == 0) { - out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); - } - - bzCheck(BZ2_bzCompress(&stream, BZ_RUN)); - range.uncheckedAdvance(inSize - stream.avail_in); - } - } - do { - if (stream.avail_out == 0) { - out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); - } - } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END); - - out->prev()->trimEnd(stream.avail_out); - - return out; -} - -std::unique_ptr Bzip2Codec::doUncompress( - const IOBuf* data, - Optional uncompressedLength) { - bz_stream stream = createBzStream(); - bzCheck(BZ2_bzDecompressInit(&stream, 0, 0)); - SCOPE_EXIT { - bzCheck(BZ2_bzDecompressEnd(&stream)); - }; - - uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB - uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB - uint64_t const kDefaultBufferLength = - computeBufferLength(data->computeChainDataLength(), kBlockSize); - - auto out = addOutputBuffer( - &stream, - ((uncompressedLength && *uncompressedLength <= kMaxSingleStepLength) - ? *uncompressedLength - : kDefaultBufferLength)); - - int rc = BZ_OK; - for (auto range : *data) { - while (!range.empty()) { - auto const inSize = std::min(range.size(), kMaxSingleStepLength); - stream.next_in = - const_cast(reinterpret_cast(range.data())); - stream.avail_in = inSize; - - if (stream.avail_out == 0) { - out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); - } - - rc = bzCheck(BZ2_bzDecompress(&stream)); - range.uncheckedAdvance(inSize - stream.avail_in); - } - } - while (rc != BZ_STREAM_END) { - if (stream.avail_out == 0) { - out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength)); - } - size_t const outputSize = stream.avail_out; - rc = bzCheck(BZ2_bzDecompress(&stream)); - if (outputSize == stream.avail_out) { - throw std::runtime_error("Bzip2Codec: Truncated input"); - } - } - - out->prev()->trimEnd(stream.avail_out); - - uint64_t const totalOut = - (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32; - if (uncompressedLength && uncompressedLength != totalOut) { - throw std::runtime_error("Bzip2 error: Invalid uncompressed length"); - } - - return out; -} - -#endif // FOLLY_HAVE_LIBBZ2 - -#if FOLLY_HAVE_LIBZ - -zlib::Options getZlibOptions(CodecType type) { - DCHECK(type == CodecType::GZIP || type == CodecType::ZLIB); - return type == CodecType::GZIP ? zlib::defaultGzipOptions() - : zlib::defaultZlibOptions(); -} - -std::unique_ptr getZlibCodec(int level, CodecType type) { - return zlib::getCodec(getZlibOptions(type), level); -} - -std::unique_ptr getZlibStreamCodec(int level, CodecType type) { - return zlib::getStreamCodec(getZlibOptions(type), level); -} - -#endif // FOLLY_HAVE_LIBZ - -/** - * Automatic decompression - */ -class AutomaticCodec final : public Codec { - public: - static std::unique_ptr create( - std::vector> customCodecs, - std::unique_ptr terminalCodec); - explicit AutomaticCodec( - std::vector> customCodecs, - std::unique_ptr terminalCodec); - - std::vector validPrefixes() const override; - bool canUncompress(const IOBuf* data, Optional uncompressedLength) - const override; - - private: - bool doNeedsUncompressedLength() const override; - uint64_t doMaxUncompressedLength() const override; - - uint64_t doMaxCompressedLength(uint64_t) const override { - throw std::runtime_error( - "AutomaticCodec error: maxCompressedLength() not supported."); - } - std::unique_ptr doCompress(const IOBuf*) override { - throw std::runtime_error("AutomaticCodec error: compress() not supported."); - } - std::unique_ptr doUncompress( - const IOBuf* data, - Optional uncompressedLength) override; - - void addCodecIfSupported(CodecType type); - - // Throws iff the codecs aren't compatible (very slow) - void checkCompatibleCodecs() const; - - std::vector> codecs_; - std::unique_ptr terminalCodec_; - bool needsUncompressedLength_; - uint64_t maxUncompressedLength_; -}; - -std::vector AutomaticCodec::validPrefixes() const { - std::unordered_set prefixes; - for (const auto& codec : codecs_) { - const auto codecPrefixes = codec->validPrefixes(); - prefixes.insert(codecPrefixes.begin(), codecPrefixes.end()); - } - return std::vector{prefixes.begin(), prefixes.end()}; -} - -bool AutomaticCodec::canUncompress( - const IOBuf* data, - Optional uncompressedLength) const { - return std::any_of( - codecs_.begin(), - codecs_.end(), - [data, uncompressedLength](std::unique_ptr const& codec) { - return codec->canUncompress(data, uncompressedLength); - }); -} - -void AutomaticCodec::addCodecIfSupported(CodecType type) { - const bool present = std::any_of( - codecs_.begin(), - codecs_.end(), - [&type](std::unique_ptr const& codec) { - return codec->type() == type; - }); - bool const isTerminalType = terminalCodec_ && terminalCodec_->type() == type; - if (hasCodec(type) && !present && !isTerminalType) { - codecs_.push_back(getCodec(type)); - } -} - -/* static */ std::unique_ptr AutomaticCodec::create( - std::vector> customCodecs, - std::unique_ptr terminalCodec) { - return std::make_unique( - std::move(customCodecs), std::move(terminalCodec)); -} - -AutomaticCodec::AutomaticCodec( - std::vector> customCodecs, - std::unique_ptr terminalCodec) - : Codec(CodecType::USER_DEFINED), - codecs_(std::move(customCodecs)), - terminalCodec_(std::move(terminalCodec)) { - // Fastest -> slowest - std::array defaultTypes{{ - CodecType::LZ4_FRAME, - CodecType::ZSTD, - CodecType::ZLIB, - CodecType::GZIP, - CodecType::LZMA2, - CodecType::BZIP2, - }}; - - for (auto type : defaultTypes) { - addCodecIfSupported(type); - } - - if (kIsDebug) { - checkCompatibleCodecs(); - } - - // Check that none of the codecs are null - DCHECK(std::none_of( - codecs_.begin(), codecs_.end(), [](std::unique_ptr const& codec) { - return codec == nullptr; - })); - - // Check that the terminal codec's type is not duplicated (with the exception - // of USER_DEFINED). - if (terminalCodec_) { - DCHECK(std::none_of( - codecs_.begin(), - codecs_.end(), - [&](std::unique_ptr const& codec) { - return codec->type() != CodecType::USER_DEFINED && - codec->type() == terminalCodec_->type(); - })); - } - - bool const terminalNeedsUncompressedLength = - terminalCodec_ && terminalCodec_->needsUncompressedLength(); - needsUncompressedLength_ = std::any_of( - codecs_.begin(), - codecs_.end(), - [](std::unique_ptr const& codec) { - return codec->needsUncompressedLength(); - }) || - terminalNeedsUncompressedLength; - - const auto it = std::max_element( - codecs_.begin(), - codecs_.end(), - [](std::unique_ptr const& lhs, std::unique_ptr const& rhs) { - return lhs->maxUncompressedLength() < rhs->maxUncompressedLength(); - }); - DCHECK(it != codecs_.end()); - auto const terminalMaxUncompressedLength = - terminalCodec_ ? terminalCodec_->maxUncompressedLength() : 0; - maxUncompressedLength_ = - std::max((*it)->maxUncompressedLength(), terminalMaxUncompressedLength); -} - -void AutomaticCodec::checkCompatibleCodecs() const { - // Keep track of all the possible headers. - std::unordered_set headers; - // The empty header is not allowed. - headers.insert(""); - // Step 1: - // Construct a set of headers and check that none of the headers occur twice. - // Eliminate edge cases. - for (auto&& codec : codecs_) { - const auto codecHeaders = codec->validPrefixes(); - // Codecs without any valid headers are not allowed. - if (codecHeaders.empty()) { - throw std::invalid_argument{ - "AutomaticCodec: validPrefixes() must not be empty."}; - } - // Insert all the headers for the current codec. - const size_t beforeSize = headers.size(); - headers.insert(codecHeaders.begin(), codecHeaders.end()); - // Codecs are not compatible if any header occurred twice. - if (beforeSize + codecHeaders.size() != headers.size()) { - throw std::invalid_argument{ - "AutomaticCodec: Two valid prefixes collide."}; - } - } - // Step 2: - // Check if any strict non-empty prefix of any header is a header. - for (const auto& header : headers) { - for (size_t i = 1; i < header.size(); ++i) { - if (headers.count(header.substr(0, i))) { - throw std::invalid_argument{ - "AutomaticCodec: One valid prefix is a prefix of another valid " - "prefix."}; - } - } - } -} - -bool AutomaticCodec::doNeedsUncompressedLength() const { - return needsUncompressedLength_; -} - -uint64_t AutomaticCodec::doMaxUncompressedLength() const { - return maxUncompressedLength_; -} - -std::unique_ptr AutomaticCodec::doUncompress( - const IOBuf* data, - Optional uncompressedLength) { - try { - for (auto&& codec : codecs_) { - if (codec->canUncompress(data, uncompressedLength)) { - return codec->uncompress(data, uncompressedLength); - } - } - } catch (std::exception const& e) { - if (!terminalCodec_) { - throw e; - } - } - - // Try terminal codec - if (terminalCodec_) { - return terminalCodec_->uncompress(data, uncompressedLength); - } - - throw std::runtime_error("AutomaticCodec error: Unknown compressed data"); -} - -using CodecFactory = std::unique_ptr (*)(int, CodecType); -using StreamCodecFactory = std::unique_ptr (*)(int, CodecType); -struct Factory { - CodecFactory codec; - StreamCodecFactory stream; -}; - -constexpr Factory - codecFactories[static_cast(CodecType::NUM_CODEC_TYPES)] = { - {}, // USER_DEFINED - {NoCompressionCodec::create, nullptr}, - -#if FOLLY_HAVE_LIBLZ4 - {LZ4Codec::create, nullptr}, -#else - {}, -#endif - -#if FOLLY_HAVE_LIBSNAPPY - {SnappyCodec::create, nullptr}, -#else - {}, -#endif - -#if FOLLY_HAVE_LIBZ - {getZlibCodec, getZlibStreamCodec}, -#else - {}, -#endif - -#if FOLLY_HAVE_LIBLZ4 - {LZ4Codec::create, nullptr}, -#else - {}, -#endif - -#if FOLLY_HAVE_LIBLZMA - {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream}, - {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream}, -#else - {}, - {}, -#endif - -#if FOLLY_HAVE_LIBZSTD - {ZSTDStreamCodec::createCodec, ZSTDStreamCodec::createStream}, -#else - {}, -#endif - -#if FOLLY_HAVE_LIBZ - {getZlibCodec, getZlibStreamCodec}, -#else - {}, -#endif - -#if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301) - {LZ4FrameCodec::create, nullptr}, -#else - {}, -#endif - -#if FOLLY_HAVE_LIBBZ2 - {Bzip2Codec::create, nullptr}, -#else - {}, -#endif -}; - -Factory const& getFactory(CodecType type) { - size_t const idx = static_cast(type); - if (idx >= static_cast(CodecType::NUM_CODEC_TYPES)) { - throw std::invalid_argument( - to("Compression type ", idx, " invalid")); - } - return codecFactories[idx]; -} -} // namespace - -bool hasCodec(CodecType type) { - return getFactory(type).codec != nullptr; -} - -std::unique_ptr getCodec(CodecType type, int level) { - auto const factory = getFactory(type).codec; - if (!factory) { - throw std::invalid_argument( - to("Compression type ", type, " not supported")); - } - auto codec = (*factory)(level, type); - DCHECK(codec->type() == type); - return codec; -} - -bool hasStreamCodec(CodecType type) { - return getFactory(type).stream != nullptr; -} - -std::unique_ptr getStreamCodec(CodecType type, int level) { - auto const factory = getFactory(type).stream; - if (!factory) { - throw std::invalid_argument( - to("Compression type ", type, " not supported")); - } - auto codec = (*factory)(level, type); - DCHECK(codec->type() == type); - return codec; -} - -std::unique_ptr getAutoUncompressionCodec( - std::vector> customCodecs, - std::unique_ptr terminalCodec) { - return AutomaticCodec::create( - std::move(customCodecs), std::move(terminalCodec)); -} -} // namespace io -} // namespace folly diff --git a/folly/io/Compression.h b/folly/io/Compression.h deleted file mode 100644 index 345eda82..00000000 --- a/folly/io/Compression.h +++ /dev/null @@ -1,494 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include -#include -#include - -#include -#include -#include - -/** - * Compression / decompression over IOBufs - */ - -namespace folly { -namespace io { - -enum class CodecType { - /** - * This codec type is not defined; getCodec() will throw an exception - * if used. Useful if deriving your own classes from Codec without - * going through the getCodec() interface. - */ - USER_DEFINED = 0, - - /** - * Use no compression. - * Levels supported: 0 - */ - NO_COMPRESSION = 1, - - /** - * Use LZ4 compression. - * Levels supported: 1 = fast, 2 = best; default = 1 - */ - LZ4 = 2, - - /** - * Use Snappy compression. - * Levels supported: 1 - */ - SNAPPY = 3, - - /** - * Use zlib compression. - * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6 - */ - ZLIB = 4, - - /** - * Use LZ4 compression, prefixed with size (as Varint). - */ - LZ4_VARINT_SIZE = 5, - - /** - * Use LZMA2 compression. - * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6 - */ - LZMA2 = 6, - LZMA2_VARINT_SIZE = 7, - - /** - * Use ZSTD compression. - */ - ZSTD = 8, - - /** - * Use gzip compression. This is the same compression algorithm as ZLIB but - * gzip-compressed files tend to be easier to work with from the command line. - * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6 - */ - GZIP = 9, - - /** - * Use LZ4 frame compression. - * Levels supported: 0 = fast, 16 = best; default = 0 - */ - LZ4_FRAME = 10, - - /** - * Use bzip2 compression. - * Levels supported: 1 = fast, 9 = best; default = 9 - */ - BZIP2 = 11, - - NUM_CODEC_TYPES = 12, -}; - -class Codec { - public: - virtual ~Codec() { } - - static constexpr uint64_t UNLIMITED_UNCOMPRESSED_LENGTH = uint64_t(-1); - /** - * Return the maximum length of data that may be compressed with this codec. - * NO_COMPRESSION and ZLIB support arbitrary lengths; - * LZ4 supports up to 1.9GiB; SNAPPY supports up to 4GiB. - * May return UNLIMITED_UNCOMPRESSED_LENGTH if unlimited. - */ - uint64_t maxUncompressedLength() const; - - /** - * Return the codec's type. - */ - CodecType type() const { return type_; } - - /** - * Does this codec need the exact uncompressed length on decompression? - */ - bool needsUncompressedLength() const; - - /** - * Compress data, returning an IOBuf (which may share storage with data). - * Throws std::invalid_argument if data is larger than - * maxUncompressedLength(). - */ - std::unique_ptr compress(const folly::IOBuf* data); - - /** - * Compresses data. May involve additional copies compared to the overload - * that takes and returns IOBufs. Has the same error semantics as the IOBuf - * version. - */ - std::string compress(StringPiece data); - - /** - * Uncompress data. Throws std::runtime_error on decompression error. - * - * Some codecs (LZ4) require the exact uncompressed length; this is indicated - * by needsUncompressedLength(). - * - * For other codes (zlib), knowing the exact uncompressed length ahead of - * time might be faster. - * - * Regardless of the behavior of the underlying compressor, uncompressing - * an empty IOBuf chain will return an empty IOBuf chain. - */ - std::unique_ptr uncompress( - const IOBuf* data, - folly::Optional uncompressedLength = folly::none); - - /** - * Uncompresses data. May involve additional copies compared to the overload - * that takes and returns IOBufs. Has the same error semantics as the IOBuf - * version. - */ - std::string uncompress( - StringPiece data, - folly::Optional uncompressedLength = folly::none); - - /** - * Returns a bound on the maximum compressed length when compressing data with - * the given uncompressed length. - */ - uint64_t maxCompressedLength(uint64_t uncompressedLength) const; - - /** - * Extracts the uncompressed length from the compressed data if possible. - * If the codec doesn't store the uncompressed length, or the data is - * corrupted it returns the given uncompressedLength. - * If the uncompressed length is stored in the compressed data and - * uncompressedLength is not none and they do not match a std::runtime_error - * is thrown. - */ - folly::Optional getUncompressedLength( - const folly::IOBuf* data, - folly::Optional uncompressedLength = folly::none) const; - - protected: - explicit Codec(CodecType type); - - public: - /** - * Returns a superset of the set of prefixes for which canUncompress() will - * return true. A superset is allowed for optimizations in canUncompress() - * based on other knowledge such as length. None of the prefixes may be empty. - * default: No prefixes. - */ - virtual std::vector validPrefixes() const; - - /** - * Returns true if the codec thinks it can uncompress the data. - * If a codec doesn't have magic bytes at the beginning, like LZ4 and Snappy, - * it can always return false. - * default: Returns false. - */ - virtual bool canUncompress( - const folly::IOBuf* data, - folly::Optional uncompressedLength = folly::none) const; - - private: - // default: no limits (save for special value UNKNOWN_UNCOMPRESSED_LENGTH) - virtual uint64_t doMaxUncompressedLength() const; - // default: doesn't need uncompressed length - virtual bool doNeedsUncompressedLength() const; - virtual std::unique_ptr doCompress(const folly::IOBuf* data) = 0; - virtual std::unique_ptr doUncompress( - const folly::IOBuf* data, - folly::Optional uncompressedLength) = 0; - // default: an implementation is provided by default to wrap the strings into - // IOBufs and delegate to the IOBuf methods. This incurs a copy of the output - // from IOBuf to string. Implementers, at their discretion, can override - // these methods to avoid the copy. - virtual std::string doCompressString(StringPiece data); - virtual std::string doUncompressString( - StringPiece data, - folly::Optional uncompressedLength); - - virtual uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const = 0; - // default: returns the passed uncompressedLength. - virtual folly::Optional doGetUncompressedLength( - const folly::IOBuf* data, - folly::Optional uncompressedLength) const; - - CodecType type_; -}; - -class StreamCodec : public Codec { - public: - ~StreamCodec() override {} - - /** - * Does the codec need the data length before compression streaming? - */ - bool needsDataLength() const; - - /***************************************************************************** - * Streaming API - ***************************************************************************** - * A low-level stateful streaming API. - * Streaming operations can be started in two ways: - * 1. From a clean Codec on which no non-const methods have been called. - * 2. A call to resetStream(), which will reset any codec to a clean state. - * After a streaming operation has begun, either compressStream() or - * uncompressStream() must be called until the streaming operation ends. - * compressStream() ends when it returns true with flushOp END. - * uncompressStream() ends when it returns true. At this point the codec - * may be reused by calling resetStream(). - * - * compress() and uncompress() can be called at any time, but they interrupt - * any ongoing streaming operations (state is lost and resetStream() must be - * called before another streaming operation). - */ - - /** - * Reset the state of the codec, and set the uncompressed length for the next - * streaming operation. If uncompressedLength is not none it must be exactly - * the uncompressed length. compressStream() must be passed exactly - * uncompressedLength input bytes before the stream is ended. - * uncompressStream() must be passed a compressed frame that uncompresses to - * uncompressedLength. - */ - void resetStream(folly::Optional uncompressedLength = folly::none); - - enum class FlushOp { NONE, FLUSH, END }; - - /** - * Compresses some data from the input buffer and writes the compressed data - * into the output buffer. It may read input without producing any output, - * except when forced to flush. - * - * The input buffer is advanced to point to the range of data that hasn't yet - * been read. Compression will resume at this point for the next call to - * compressStream(). The output buffer is advanced one byte past the last byte - * written. - * - * The default flushOp is NONE, which allows compressStream() complete - * discretion in how much data to gather before writing any output. - * - * If flushOp is END, all pending and input data is flushed to the output - * buffer, and the frame is ended. compressStream() must be called with the - * same input and flushOp END until it returns true. At this point the caller - * must call resetStream() to use the codec again. - * - * If flushOp is FLUSH, all pending and input data is flushed to the output - * buffer, but the frame is not ended. compressStream() must be called with - * the same input and flushOp END until it returns true. At this point the - * caller can continue to compressStream() with any input data and flushOp. - * The uncompressor, if passed all the produced output data, will be able to - * uncompress all the input data passed to compressStream() so far. Excessive - * use of flushOp FLUSH will deteriorate compression ratio. This is useful for - * stateful streaming across a network. Most users don't need to use this - * flushOp. - * - * A std::logic_error is thrown on incorrect usage of the API. - * A std::runtime_error is thrown upon error conditions or if no forward - * progress could be made twice in a row. - */ - bool compressStream( - folly::ByteRange& input, - folly::MutableByteRange& output, - FlushOp flushOp = StreamCodec::FlushOp::NONE); - - /** - * Uncompresses some data from the input buffer and writes the uncompressed - * data into the output buffer. It may read input without producing any - * output. - * - * The input buffer is advanced to point to the range of data that hasn't yet - * been read. Uncompression will resume at this point for the next call to - * uncompressStream(). The output buffer is advanced one byte past the last - * byte written. - * - * The default flushOp is NONE, which allows uncompressStream() complete - * discretion in how much output data to flush. The uncompressor may not make - * maximum forward progress, but will make some forward progress when - * possible. - * - * If flushOp is END, the caller guarantees that no more input will be - * presented to uncompressStream(). uncompressStream() must be called with the - * same input and flushOp END until it returns true. This is not mandatory, - * but if the input is all available in one buffer, and there is enough output - * space to write the entire frame, codecs can uncompress faster. - * - * If flushOp is FLUSH, uncompressStream() is guaranteed to make the maximum - * amount of forward progress possible. When using this flushOp and - * uncompressStream() returns with `!output.empty()` the caller knows that all - * pending output has been flushed. This is useful for stateful streaming - * across a network, and it should be used in conjunction with - * compressStream() with flushOp FLUSH. Most users don't need to use this - * flushOp. - * - * A std::runtime_error is thrown upon error conditions or if no forward - * progress could be made upon two consecutive calls to the function (only the - * second call will throw an exception). - * - * Returns true at the end of a frame. At this point resetStream() must be - * called to reuse the codec. - */ - bool uncompressStream( - folly::ByteRange& input, - folly::MutableByteRange& output, - FlushOp flushOp = StreamCodec::FlushOp::NONE); - - protected: - explicit StreamCodec(CodecType type) : Codec(type) {} - - // Returns the uncompressed length last passed to resetStream() or none if it - // hasn't been called yet. - folly::Optional uncompressedLength() const { - return uncompressedLength_; - } - - private: - // default: Implemented using the streaming API. - std::unique_ptr doCompress(const folly::IOBuf* data) override; - std::unique_ptr doUncompress( - const folly::IOBuf* data, - folly::Optional uncompressedLength) override; - - // default: Returns false - virtual bool doNeedsDataLength() const; - virtual void doResetStream() = 0; - virtual bool doCompressStream( - folly::ByteRange& input, - folly::MutableByteRange& output, - FlushOp flushOp) = 0; - virtual bool doUncompressStream( - folly::ByteRange& input, - folly::MutableByteRange& output, - FlushOp flushOp) = 0; - - enum class State { - RESET, - COMPRESS, - COMPRESS_FLUSH, - COMPRESS_END, - UNCOMPRESS, - END, - }; - void assertStateIs(State expected) const; - - CodecType type_; - State state_{State::RESET}; - ByteRange previousInput_{}; - folly::Optional uncompressedLength_{}; - bool progressMade_{true}; -}; - -constexpr int COMPRESSION_LEVEL_FASTEST = -1; -constexpr int COMPRESSION_LEVEL_DEFAULT = -2; -constexpr int COMPRESSION_LEVEL_BEST = -3; - -/** - * Return a codec for the given type. Throws on error. The level - * is a non-negative codec-dependent integer indicating the level of - * compression desired, or one of the following constants: - * - * COMPRESSION_LEVEL_FASTEST is fastest (uses least CPU / memory, - * worst compression) - * COMPRESSION_LEVEL_DEFAULT is the default (likely a tradeoff between - * FASTEST and BEST) - * COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory, - * best compression) - * - * When decompressing, the compression level is ignored. All codecs will - * decompress all data compressed with the a codec of the same type, regardless - * of compression level. - */ -std::unique_ptr getCodec( - CodecType type, - int level = COMPRESSION_LEVEL_DEFAULT); - -/** - * Return a codec for the given type. Throws on error. The level - * is a non-negative codec-dependent integer indicating the level of - * compression desired, or one of the following constants: - * - * COMPRESSION_LEVEL_FASTEST is fastest (uses least CPU / memory, - * worst compression) - * COMPRESSION_LEVEL_DEFAULT is the default (likely a tradeoff between - * FASTEST and BEST) - * COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory, - * best compression) - * - * When decompressing, the compression level is ignored. All codecs will - * decompress all data compressed with the a codec of the same type, regardless - * of compression level. - */ -std::unique_ptr getStreamCodec( - CodecType type, - int level = COMPRESSION_LEVEL_DEFAULT); - -/** - * Returns a codec that can uncompress any of the given codec types as well as - * {LZ4_FRAME, ZSTD, ZLIB, GZIP, LZMA2, BZIP2}. Appends each default codec to - * customCodecs in order, so long as a codec with the same type() isn't already - * present in customCodecs or as the terminalCodec. When uncompress() is called, - * each codec's canUncompress() is called in the order that they are given. - * Appended default codecs are checked last. uncompress() is called on the - * first codec whose canUncompress() returns true. - * - * In addition, an optional `terminalCodec` can be provided. This codec's - * uncompress() will be called either when no other codec canUncompress() the - * data or the chosen codec throws an exception on the data. The terminalCodec - * is intended for ambiguous headers, when canUncompress() is false for some - * data it can actually uncompress. The terminalCodec does not need to override - * validPrefixes() or canUncompress() and overriding these functions will have - * no effect on the returned codec's validPrefixes() or canUncompress() - * functions. The terminalCodec's needsUncompressedLength() and - * maxUncompressedLength() will affect the returned codec's respective - * functions. The terminalCodec must not be duplicated in customCodecs. - * - * An exception is thrown if no codec canUncompress() the data and either no - * terminal codec was provided or a terminal codec was provided and it throws on - * the data. - * An exception is thrown if the chosen codec's uncompress() throws on the data - * and either no terminal codec was provided or a terminal codec was provided - * and it also throws on the data. - * An exception is thrown if compress() is called on the returned codec. - * - * Requirements are checked in debug mode and are as follows: - * Let headers be the concatenation of every codec's validPrefixes(). - * 1. Each codec must override validPrefixes() and canUncompress(). - * 2. No codec's validPrefixes() may be empty. - * 3. No header in headers may be empty. - * 4. headers must not contain any duplicate elements. - * 5. No strict non-empty prefix of any header in headers may be in headers. - * 6. The terminalCodec's type must not be the same as any other codec's type - * (with USER_DEFINED being the exception). - */ -std::unique_ptr getAutoUncompressionCodec( - std::vector> customCodecs = {}, - std::unique_ptr terminalCodec = {}); - -/** - * Check if a specified codec is supported. - */ -bool hasCodec(CodecType type); - -/** - * Check if a specified codec is supported and supports streaming. - */ -bool hasStreamCodec(CodecType type); -} // namespace io -} // namespace folly diff --git a/folly/io/compression/Utils.h b/folly/io/compression/Utils.h deleted file mode 100644 index 8d23723f..00000000 --- a/folly/io/compression/Utils.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#include -#include -#include - -/** - * Helper functions for compression codecs. - */ -namespace folly { -namespace io { -namespace compression { -namespace detail { - -/** - * Reads sizeof(T) bytes, and returns false if not enough bytes are available. - * Returns true if the first n bytes are equal to prefix when interpreted as - * a little endian T. - */ -template -typename std::enable_if::value, bool>::type -dataStartsWithLE(const IOBuf* data, T prefix, uint64_t n = sizeof(T)) { - DCHECK_GT(n, 0); - DCHECK_LE(n, sizeof(T)); - T value; - Cursor cursor{data}; - if (!cursor.tryReadLE(value)) { - return false; - } - const T mask = n == sizeof(T) ? T(-1) : (T(1) << (8 * n)) - 1; - return prefix == (value & mask); -} - -template -typename std::enable_if::value, std::string>::type -prefixToStringLE(T prefix, uint64_t n = sizeof(T)) { - DCHECK_GT(n, 0); - DCHECK_LE(n, sizeof(T)); - prefix = Endian::little(prefix); - std::string result; - result.resize(n); - memcpy(&result[0], &prefix, n); - return result; -} - -} // namespace detail -} // namespace compression -} // namespace io -} // namespace folly diff --git a/folly/io/compression/Zlib.cpp b/folly/io/compression/Zlib.cpp deleted file mode 100644 index 38cfa1ad..00000000 --- a/folly/io/compression/Zlib.cpp +++ /dev/null @@ -1,416 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#if FOLLY_HAVE_LIBZ - -#include -#include -#include -#include -#include -#include -#include - -using folly::io::compression::detail::dataStartsWithLE; -using folly::io::compression::detail::prefixToStringLE; - -namespace folly { -namespace io { -namespace zlib { - -namespace { - -bool isValidStrategy(int strategy) { - std::array strategies{{ - Z_DEFAULT_STRATEGY, - Z_FILTERED, - Z_HUFFMAN_ONLY, - Z_RLE, - Z_FIXED - }}; - return std::any_of(strategies.begin(), strategies.end(), [&](int i) { - return i == strategy; - }); -} - -int getWindowBits(Options::Format format, int windowSize) { - switch (format) { - case Options::Format::ZLIB: - return windowSize; - case Options::Format::GZIP: - return windowSize + 16; - case Options::Format::RAW: - return -windowSize; - case Options::Format::AUTO: - return windowSize + 32; - default: - return windowSize; - } -} - -CodecType getCodecType(Options options) { - if (options.windowSize == 15 && options.format == Options::Format::ZLIB) { - return CodecType::ZLIB; - } else if ( - options.windowSize == 15 && options.format == Options::Format::GZIP) { - return CodecType::GZIP; - } else { - return CodecType::USER_DEFINED; - } -} - -class ZlibStreamCodec final : public StreamCodec { - public: - static std::unique_ptr createCodec(Options options, int level); - static std::unique_ptr createStream(Options options, int level); - - explicit ZlibStreamCodec(Options options, int level); - ~ZlibStreamCodec() override; - - std::vector validPrefixes() const override; - bool canUncompress(const IOBuf* data, Optional uncompressedLength) - const override; - - private: - uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; - - void doResetStream() override; - bool doCompressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flush) override; - bool doUncompressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flush) override; - - void resetDeflateStream(); - void resetInflateStream(); - - Options options_; - - Optional deflateStream_{}; - Optional inflateStream_{}; - int level_; - bool needReset_{true}; -}; -static constexpr uint16_t kGZIPMagicLE = 0x8B1F; - -std::vector ZlibStreamCodec::validPrefixes() const { - if (type() == CodecType::ZLIB) { - // Zlib streams start with a 2 byte header. - // - // 0 1 - // +---+---+ - // |CMF|FLG| - // +---+---+ - // - // We won't restrict the values of any sub-fields except as described below. - // - // The lowest 4 bits of CMF is the compression method (CM). - // CM == 0x8 is the deflate compression method, which is currently the only - // supported compression method, so any valid prefix must have CM == 0x8. - // - // The lowest 5 bits of FLG is FCHECK. - // FCHECK must be such that the two header bytes are a multiple of 31 when - // interpreted as a big endian 16-bit number. - std::vector result; - // 16 values for the first byte, 8 values for the second byte. - // There are also 4 combinations where both 0x00 and 0x1F work as FCHECK. - result.reserve(132); - // Select all values for the CMF byte that use the deflate algorithm 0x8. - for (uint32_t first = 0x0800; first <= 0xF800; first += 0x1000) { - // Select all values for the FLG, but leave FCHECK as 0 since it's fixed. - for (uint32_t second = 0x00; second <= 0xE0; second += 0x20) { - uint16_t prefix = first | second; - // Compute FCHECK. - prefix += 31 - (prefix % 31); - result.push_back(prefixToStringLE(Endian::big(prefix))); - // zlib won't produce this, but it is a valid prefix. - if ((prefix & 0x1F) == 31) { - prefix -= 31; - result.push_back(prefixToStringLE(Endian::big(prefix))); - } - } - } - return result; - } else if (type() == CodecType::GZIP) { - // The gzip frame starts with 2 magic bytes. - return {prefixToStringLE(kGZIPMagicLE)}; - } else { - return {}; - } -} - -bool ZlibStreamCodec::canUncompress(const IOBuf* data, Optional) - const { - if (type() == CodecType::ZLIB) { - uint16_t value; - Cursor cursor{data}; - if (!cursor.tryReadBE(value)) { - return false; - } - // zlib compressed if using deflate and is a multiple of 31. - return (value & 0x0F00) == 0x0800 && value % 31 == 0; - } else if (type() == CodecType::GZIP) { - return dataStartsWithLE(data, kGZIPMagicLE); - } else { - return false; - } -} - -uint64_t ZlibStreamCodec::doMaxCompressedLength( - uint64_t uncompressedLength) const { - // When passed a nullptr, deflateBound() adds 6 bytes for a zlib wrapper. A - // gzip wrapper is 18 bytes, so we add the 12 byte difference. - return deflateBound(nullptr, uncompressedLength) + - (options_.format == Options::Format::GZIP ? 12 : 0); -} - -std::unique_ptr ZlibStreamCodec::createCodec( - Options options, - int level) { - return std::make_unique(options, level); -} - -std::unique_ptr ZlibStreamCodec::createStream( - Options options, - int level) { - return std::make_unique(options, level); -} - -ZlibStreamCodec::ZlibStreamCodec(Options options, int level) - : StreamCodec(getCodecType(options)) { - switch (level) { - case COMPRESSION_LEVEL_FASTEST: - level = 1; - break; - case COMPRESSION_LEVEL_DEFAULT: - level = Z_DEFAULT_COMPRESSION; - break; - case COMPRESSION_LEVEL_BEST: - level = 9; - break; - } - auto inBounds = [](int value, int low, int high) { - return (value >= low) && (value <= high); - }; - - if (level != Z_DEFAULT_COMPRESSION && !inBounds(level, 0, 9)) { - throw std::invalid_argument( - to("ZlibStreamCodec: invalid level: ", level)); - } - level_ = level; - options_ = options; - - // Although zlib allows a windowSize of 8..15, a value of 8 is not - // properly supported and is treated as a value of 9. This means data deflated - // with windowSize==8 can not be re-inflated with windowSize==8. windowSize==8 - // is also not supported for gzip and raw deflation. - // Hence, the codec supports only 9..15. - if (!inBounds(options_.windowSize, 9, 15)) { - throw std::invalid_argument(to( - "ZlibStreamCodec: invalid windowSize option: ", options.windowSize)); - } - if (!inBounds(options_.memLevel, 1, 9)) { - throw std::invalid_argument(to( - "ZlibStreamCodec: invalid memLevel option: ", options.memLevel)); - } - if (!isValidStrategy(options_.strategy)) { - throw std::invalid_argument(to( - "ZlibStreamCodec: invalid strategy: ", options.strategy)); - } -} - -ZlibStreamCodec::~ZlibStreamCodec() { - if (deflateStream_) { - deflateEnd(deflateStream_.get_pointer()); - deflateStream_.clear(); - } - if (inflateStream_) { - inflateEnd(inflateStream_.get_pointer()); - inflateStream_.clear(); - } -} - -void ZlibStreamCodec::doResetStream() { - needReset_ = true; -} - -void ZlibStreamCodec::resetDeflateStream() { - if (deflateStream_) { - int const rc = deflateReset(deflateStream_.get_pointer()); - if (rc != Z_OK) { - deflateStream_.clear(); - throw std::runtime_error( - to("ZlibStreamCodec: deflateReset error: ", rc)); - } - return; - } - deflateStream_ = z_stream{}; - - // The automatic header detection format is only for inflation. - // Use zlib for deflation if the format is auto. - int const windowBits = getWindowBits( - options_.format == Options::Format::AUTO ? Options::Format::ZLIB - : options_.format, - options_.windowSize); - - int const rc = deflateInit2( - deflateStream_.get_pointer(), - level_, - Z_DEFLATED, - windowBits, - options_.memLevel, - options_.strategy); - if (rc != Z_OK) { - deflateStream_.clear(); - throw std::runtime_error( - to("ZlibStreamCodec: deflateInit error: ", rc)); - } -} - -void ZlibStreamCodec::resetInflateStream() { - if (inflateStream_) { - int const rc = inflateReset(inflateStream_.get_pointer()); - if (rc != Z_OK) { - inflateStream_.clear(); - throw std::runtime_error( - to("ZlibStreamCodec: inflateReset error: ", rc)); - } - return; - } - inflateStream_ = z_stream{}; - int const rc = inflateInit2( - inflateStream_.get_pointer(), - getWindowBits(options_.format, options_.windowSize)); - if (rc != Z_OK) { - inflateStream_.clear(); - throw std::runtime_error( - to("ZlibStreamCodec: inflateInit error: ", rc)); - } -} - -static int zlibTranslateFlush(StreamCodec::FlushOp flush) { - switch (flush) { - case StreamCodec::FlushOp::NONE: - return Z_NO_FLUSH; - case StreamCodec::FlushOp::FLUSH: - return Z_SYNC_FLUSH; - case StreamCodec::FlushOp::END: - return Z_FINISH; - default: - throw std::invalid_argument("ZlibStreamCodec: Invalid flush"); - } -} - -static int zlibThrowOnError(int rc) { - switch (rc) { - case Z_OK: - case Z_BUF_ERROR: - case Z_STREAM_END: - return rc; - default: - throw std::runtime_error(to("ZlibStreamCodec: error: ", rc)); - } -} - -bool ZlibStreamCodec::doCompressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flush) { - if (needReset_) { - resetDeflateStream(); - needReset_ = false; - } - DCHECK(deflateStream_.hasValue()); - // zlib will return Z_STREAM_ERROR if output.data() is null. - if (output.data() == nullptr) { - return false; - } - deflateStream_->next_in = const_cast(input.data()); - deflateStream_->avail_in = input.size(); - deflateStream_->next_out = output.data(); - deflateStream_->avail_out = output.size(); - SCOPE_EXIT { - input.uncheckedAdvance(input.size() - deflateStream_->avail_in); - output.uncheckedAdvance(output.size() - deflateStream_->avail_out); - }; - int const rc = zlibThrowOnError( - deflate(deflateStream_.get_pointer(), zlibTranslateFlush(flush))); - switch (flush) { - case StreamCodec::FlushOp::NONE: - return false; - case StreamCodec::FlushOp::FLUSH: - return deflateStream_->avail_in == 0 && deflateStream_->avail_out != 0; - case StreamCodec::FlushOp::END: - return rc == Z_STREAM_END; - default: - throw std::invalid_argument("ZlibStreamCodec: Invalid flush"); - } -} - -bool ZlibStreamCodec::doUncompressStream( - ByteRange& input, - MutableByteRange& output, - StreamCodec::FlushOp flush) { - if (needReset_) { - resetInflateStream(); - needReset_ = false; - } - DCHECK(inflateStream_.hasValue()); - // zlib will return Z_STREAM_ERROR if output.data() is null. - if (output.data() == nullptr) { - return false; - } - inflateStream_->next_in = const_cast(input.data()); - inflateStream_->avail_in = input.size(); - inflateStream_->next_out = output.data(); - inflateStream_->avail_out = output.size(); - SCOPE_EXIT { - input.advance(input.size() - inflateStream_->avail_in); - output.advance(output.size() - inflateStream_->avail_out); - }; - int const rc = zlibThrowOnError( - inflate(inflateStream_.get_pointer(), zlibTranslateFlush(flush))); - return rc == Z_STREAM_END; -} - -} // namespace - -Options defaultGzipOptions() { - return Options(Options::Format::GZIP); -} - -Options defaultZlibOptions() { - return Options(Options::Format::ZLIB); -} - -std::unique_ptr getCodec(Options options, int level) { - return ZlibStreamCodec::createCodec(options, level); -} - -std::unique_ptr getStreamCodec(Options options, int level) { - return ZlibStreamCodec::createStream(options, level); -} - -} // namespace zlib -} // namespace io -} // namespace folly - -#endif // FOLLY_HAVE_LIBZ diff --git a/folly/io/compression/Zlib.h b/folly/io/compression/Zlib.h deleted file mode 100644 index 74dfe032..00000000 --- a/folly/io/compression/Zlib.h +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -#if FOLLY_HAVE_LIBZ - -#include - -/** - * Interface for Zlib-specific codec initialization. - */ -namespace folly { -namespace io { -namespace zlib { - -struct Options { - /** - * ZLIB: default option -- write a zlib wrapper as documented in RFC 1950. - * - * GZIP: write a simple gzip header and trailer around the compressed data - * instead of a zlib wrapper. - * - * RAW: deflate will generate raw deflate data with no zlib header or - * trailer, and will not compute a check value. - * - * AUTO: enable automatic header detection for decoding gzip or zlib data. - * For deflation, ZLIB will be used. - */ - enum class Format { ZLIB, GZIP, RAW, AUTO }; - - explicit Options( - Format format = Format::ZLIB, - int windowSize = 15, - int memLevel = 8, - int strategy = Z_DEFAULT_STRATEGY) - : format(format), - windowSize(windowSize), - memLevel(memLevel), - strategy(strategy) {} - - Format format; - - /** - * windowSize is the base two logarithm of the window size (the size of the - * history buffer). It should be in the range 9..15. Larger values of this - * parameter result in better compression at the expense of memory usage. - * - * The default value is 15. - * - * NB: when inflating/uncompressing data, the windowSize must be greater than - * or equal to the size used when deflating/compressing. - */ - int windowSize; - - /** - * "The memLevel parameter specifies how much memory should be allocated for - * the internal compression state. memLevel=1 uses minimum memory but is slow - * and reduces compression ratio; memLevel=9 uses maximum memory for optimal - * speed. The default value is 8." - */ - int memLevel; - - /** - * The strategy parameter is used to tune the compression algorithm. - * Supported values: - * - Z_DEFAULT_STRATEGY: normal data - * - Z_FILTERED: data produced by a filter (or predictor) - * - Z_HUFFMAN_ONLY: force Huffman encoding only (no string match) - * - Z_RLE: limit match distances to one - * - Z_FIXED: prevents the use of dynamic Huffman codes - * - * The strategy parameter only affects the compression ratio but not the - * correctness of the compressed output. - */ - int strategy; -}; - -/** - * Get the default options for gzip compression. - * A codec created with these options will have type CodecType::GZIP. - */ -Options defaultGzipOptions(); - -/** - * Get the default options for zlib compression. - * A codec created with these options will have type CodecType::ZLIB. - */ -Options defaultZlibOptions(); - -/** - * Get a codec with the given options and compression level. - * - * If the windowSize is 15 and the format is Format::ZLIB or Format::GZIP, then - * the type of the codec will be CodecType::ZLIB or CodecType::GZIP - * respectively. Otherwise, the type will be CodecType::USER_DEFINED. - * - * Automatic uncompression is not supported with USER_DEFINED codecs. - * - * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6 - */ -std::unique_ptr getCodec( - Options options = Options(), - int level = COMPRESSION_LEVEL_DEFAULT); -std::unique_ptr getStreamCodec( - Options options = Options(), - int level = COMPRESSION_LEVEL_DEFAULT); - -} // namespace zlib -} // namespace io -} // namespace folly - -#endif // FOLLY_HAVE_LIBZ diff --git a/folly/io/test/CompressionTest.cpp b/folly/io/test/CompressionTest.cpp deleted file mode 100644 index 69a037d8..00000000 --- a/folly/io/test/CompressionTest.cpp +++ /dev/null @@ -1,1502 +0,0 @@ -/* - * Copyright 2017 Facebook, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -#include -#include -#include -#include -#include -#include - -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#if FOLLY_HAVE_LIBZSTD -#include -#endif - -#if FOLLY_HAVE_LIBZ -#include -#endif - -namespace zlib = folly::io::zlib; - -namespace folly { -namespace io { -namespace test { - -class DataHolder : private boost::noncopyable { - public: - uint64_t hash(size_t size) const; - ByteRange data(size_t size) const; - - protected: - explicit DataHolder(size_t sizeLog2); - const size_t size_; - std::unique_ptr data_; - mutable std::unordered_map hashCache_; -}; - -DataHolder::DataHolder(size_t sizeLog2) - : size_(size_t(1) << sizeLog2), - data_(new uint8_t[size_]) { -} - -uint64_t DataHolder::hash(size_t size) const { - CHECK_LE(size, size_); - auto p = hashCache_.find(size); - if (p != hashCache_.end()) { - return p->second; - } - - uint64_t h = folly::hash::fnv64_buf(data_.get(), size); - hashCache_[size] = h; - return h; -} - -ByteRange DataHolder::data(size_t size) const { - CHECK_LE(size, size_); - return ByteRange(data_.get(), size); -} - -uint64_t hashIOBuf(const IOBuf* buf) { - uint64_t h = folly::hash::FNV_64_HASH_START; - for (auto& range : *buf) { - h = folly::hash::fnv64_buf(range.data(), range.size(), h); - } - return h; -} - -class RandomDataHolder : public DataHolder { - public: - explicit RandomDataHolder(size_t sizeLog2); -}; - -RandomDataHolder::RandomDataHolder(size_t sizeLog2) - : DataHolder(sizeLog2) { - static constexpr size_t numThreadsLog2 = 3; - static constexpr size_t numThreads = size_t(1) << numThreadsLog2; - - uint32_t seed = randomNumberSeed(); - - std::vector threads; - threads.reserve(numThreads); - for (size_t t = 0; t < numThreads; ++t) { - threads.emplace_back([this, seed, t, sizeLog2] { - std::mt19937 rng(seed + t); - size_t countLog2 = sizeLog2 - numThreadsLog2; - size_t start = size_t(t) << countLog2; - for (size_t i = 0; i < countLog2; ++i) { - this->data_[start + i] = rng(); - } - }); - } - - for (auto& t : threads) { - t.join(); - } -} - -class ConstantDataHolder : public DataHolder { - public: - explicit ConstantDataHolder(size_t sizeLog2); -}; - -ConstantDataHolder::ConstantDataHolder(size_t sizeLog2) - : DataHolder(sizeLog2) { - memset(data_.get(), 'a', size_); -} - -constexpr size_t dataSizeLog2 = 27; // 128MiB -RandomDataHolder randomDataHolder(dataSizeLog2); -ConstantDataHolder constantDataHolder(dataSizeLog2); - -// The intersection of the provided codecs & those that are compiled in. -static std::vector supportedCodecs(std::vector const& v) { - std::vector supported; - - std::copy_if( - std::begin(v), - std::end(v), - std::back_inserter(supported), - hasCodec); - - return supported; -} - -// All compiled-in compression codecs. -static std::vector availableCodecs() { - std::vector codecs; - - for (size_t i = 0; i < static_cast(CodecType::NUM_CODEC_TYPES); ++i) { - auto type = static_cast(i); - if (hasCodec(type)) { - codecs.push_back(type); - } - } - - return codecs; -} - -static std::vector availableStreamCodecs() { - std::vector codecs; - - for (size_t i = 0; i < static_cast(CodecType::NUM_CODEC_TYPES); ++i) { - auto type = static_cast(i); - if (hasStreamCodec(type)) { - codecs.push_back(type); - } - } - - return codecs; -} - -TEST(CompressionTestNeedsUncompressedLength, Simple) { - static const struct { - CodecType type; - bool needsUncompressedLength; - } expectations[] = { - {CodecType::NO_COMPRESSION, false}, - {CodecType::LZ4, true}, - {CodecType::SNAPPY, false}, - {CodecType::ZLIB, false}, - {CodecType::LZ4_VARINT_SIZE, false}, - {CodecType::LZMA2, false}, - {CodecType::LZMA2_VARINT_SIZE, false}, - {CodecType::ZSTD, false}, - {CodecType::GZIP, false}, - {CodecType::LZ4_FRAME, false}, - {CodecType::BZIP2, false}, - }; - - for (auto const& test : expectations) { - if (hasCodec(test.type)) { - EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(), - test.needsUncompressedLength); - } - } -} - -class CompressionTest - : public testing::TestWithParam> { - protected: - void SetUp() override { - auto tup = GetParam(); - int lengthLog = std::tr1::get<0>(tup); - // Small hack to test empty data - uncompressedLength_ = - (lengthLog < 0) ? 0 : uint64_t(1) << std::tr1::get<0>(tup); - chunks_ = std::tr1::get<1>(tup); - codec_ = getCodec(std::tr1::get<2>(tup)); - } - - void runSimpleIOBufTest(const DataHolder& dh); - - void runSimpleStringTest(const DataHolder& dh); - - private: - std::unique_ptr split(std::unique_ptr data) const; - - uint64_t uncompressedLength_; - size_t chunks_; - std::unique_ptr codec_; -}; - -void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) { - const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_))); - const auto compressed = split(codec_->compress(original.get())); - EXPECT_LE( - compressed->computeChainDataLength(), - codec_->maxCompressedLength(uncompressedLength_)); - if (!codec_->needsUncompressedLength()) { - auto uncompressed = codec_->uncompress(compressed.get()); - EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength()); - EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); - } - { - auto uncompressed = codec_->uncompress(compressed.get(), - uncompressedLength_); - EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength()); - EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); - } -} - -void CompressionTest::runSimpleStringTest(const DataHolder& dh) { - const auto original = std::string( - reinterpret_cast(dh.data(uncompressedLength_).data()), - uncompressedLength_); - const auto compressed = codec_->compress(original); - EXPECT_LE( - compressed.length(), codec_->maxCompressedLength(uncompressedLength_)); - - if (!codec_->needsUncompressedLength()) { - auto uncompressed = codec_->uncompress(compressed); - EXPECT_EQ(uncompressedLength_, uncompressed.length()); - EXPECT_EQ(uncompressed, original); - } - { - auto uncompressed = codec_->uncompress(compressed, uncompressedLength_); - EXPECT_EQ(uncompressedLength_, uncompressed.length()); - EXPECT_EQ(uncompressed, original); - } -} - -// Uniformly split data into (potentially empty) chunks. -std::unique_ptr CompressionTest::split( - std::unique_ptr data) const { - if (data->isChained()) { - data->coalesce(); - } - - const size_t size = data->computeChainDataLength(); - - std::multiset splits; - for (size_t i = 1; i < chunks_; ++i) { - splits.insert(Random::rand64(size)); - } - - folly::IOBufQueue result; - - size_t offset = 0; - for (size_t split : splits) { - result.append(IOBuf::copyBuffer(data->data() + offset, split - offset)); - offset = split; - } - result.append(IOBuf::copyBuffer(data->data() + offset, size - offset)); - - return result.move(); -} - -TEST_P(CompressionTest, RandomData) { - runSimpleIOBufTest(randomDataHolder); -} - -TEST_P(CompressionTest, ConstantData) { - runSimpleIOBufTest(constantDataHolder); -} - -TEST_P(CompressionTest, RandomDataString) { - runSimpleStringTest(randomDataHolder); -} - -TEST_P(CompressionTest, ConstantDataString) { - runSimpleStringTest(constantDataHolder); -} - -INSTANTIATE_TEST_CASE_P( - CompressionTest, - CompressionTest, - testing::Combine( - testing::Values(-1, 0, 1, 12, 22, 25, 27), - testing::Values(1, 2, 3, 8, 65), - testing::ValuesIn(availableCodecs()))); - -class CompressionVarintTest - : public testing::TestWithParam> { - protected: - void SetUp() override { - auto tup = GetParam(); - uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup); - codec_ = getCodec(std::tr1::get<1>(tup)); - } - - void runSimpleTest(const DataHolder& dh); - - uint64_t uncompressedLength_; - std::unique_ptr codec_; -}; - -inline uint64_t oneBasedMsbPos(uint64_t number) { - uint64_t pos = 0; - for (; number > 0; ++pos, number >>= 1) { - } - return pos; -} - -void CompressionVarintTest::runSimpleTest(const DataHolder& dh) { - auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_)); - auto compressed = codec_->compress(original.get()); - auto breakPoint = - 1UL + - Random::rand64( - std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL); - auto tinyBuf = IOBuf::copyBuffer(compressed->data(), - std::min(compressed->length(), breakPoint)); - compressed->trimStart(breakPoint); - tinyBuf->prependChain(std::move(compressed)); - compressed = std::move(tinyBuf); - - auto uncompressed = codec_->uncompress(compressed.get()); - - EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength()); - EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); -} - -TEST_P(CompressionVarintTest, RandomData) { - runSimpleTest(randomDataHolder); -} - -TEST_P(CompressionVarintTest, ConstantData) { - runSimpleTest(constantDataHolder); -} - -INSTANTIATE_TEST_CASE_P( - CompressionVarintTest, - CompressionVarintTest, - testing::Combine( - testing::Values(0, 1, 12, 22, 25, 27), - testing::ValuesIn(supportedCodecs({ - CodecType::LZ4_VARINT_SIZE, - CodecType::LZMA2_VARINT_SIZE, - })))); - -TEST(LZMATest, UncompressBadVarint) { - if (hasStreamCodec(CodecType::LZMA2_VARINT_SIZE)) { - std::string const str(kMaxVarintLength64 * 2, '\xff'); - ByteRange input((folly::StringPiece(str))); - auto codec = getStreamCodec(CodecType::LZMA2_VARINT_SIZE); - auto buffer = IOBuf::create(16); - buffer->append(buffer->capacity()); - MutableByteRange output{buffer->writableData(), buffer->length()}; - EXPECT_THROW(codec->uncompressStream(input, output), std::runtime_error); - } -} - -class CompressionCorruptionTest : public testing::TestWithParam { - protected: - void SetUp() override { codec_ = getCodec(GetParam()); } - - void runSimpleTest(const DataHolder& dh); - - std::unique_ptr codec_; -}; - -void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) { - constexpr uint64_t uncompressedLength = 42; - auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength)); - auto compressed = codec_->compress(original.get()); - - if (!codec_->needsUncompressedLength()) { - auto uncompressed = codec_->uncompress(compressed.get()); - EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); - EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get())); - } - { - auto uncompressed = codec_->uncompress(compressed.get(), - uncompressedLength); - EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); - EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get())); - } - - EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1), - std::runtime_error); - - auto corrupted = compressed->clone(); - corrupted->unshare(); - // Truncate the last character - corrupted->prev()->trimEnd(1); - if (!codec_->needsUncompressedLength()) { - EXPECT_THROW(codec_->uncompress(corrupted.get()), - std::runtime_error); - } - - EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength), - std::runtime_error); - - corrupted = compressed->clone(); - corrupted->unshare(); - // Corrupt the first character - ++(corrupted->writableData()[0]); - - if (!codec_->needsUncompressedLength()) { - EXPECT_THROW(codec_->uncompress(corrupted.get()), - std::runtime_error); - } - - EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength), - std::runtime_error); -} - -TEST_P(CompressionCorruptionTest, RandomData) { - runSimpleTest(randomDataHolder); -} - -TEST_P(CompressionCorruptionTest, ConstantData) { - runSimpleTest(constantDataHolder); -} - -INSTANTIATE_TEST_CASE_P( - CompressionCorruptionTest, - CompressionCorruptionTest, - testing::ValuesIn( - // NO_COMPRESSION can't detect corruption - // LZ4 can't detect corruption reliably (sigh) - supportedCodecs({ - CodecType::SNAPPY, - CodecType::ZLIB, - CodecType::LZMA2, - CodecType::ZSTD, - CodecType::LZ4_FRAME, - CodecType::BZIP2, - }))); - -class StreamingUnitTest : public testing::TestWithParam { - protected: - void SetUp() override { - codec_ = getStreamCodec(GetParam()); - } - - std::unique_ptr codec_; -}; - -TEST(StreamingUnitTest, needsDataLength) { - static const struct { - CodecType type; - bool needsDataLength; - } expectations[] = { - {CodecType::ZLIB, false}, - {CodecType::GZIP, false}, - {CodecType::LZMA2, false}, - {CodecType::LZMA2_VARINT_SIZE, true}, - {CodecType::ZSTD, false}, - }; - - for (auto const& test : expectations) { - if (hasStreamCodec(test.type)) { - EXPECT_EQ( - getStreamCodec(test.type)->needsDataLength(), test.needsDataLength); - } - } -} - -TEST_P(StreamingUnitTest, maxCompressedLength) { - for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) { - EXPECT_GE(codec_->maxCompressedLength(length), length); - } -} - -TEST_P(StreamingUnitTest, getUncompressedLength) { - auto const empty = IOBuf::create(0); - EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get())); - EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0)); - EXPECT_ANY_THROW(codec_->getUncompressedLength(empty.get(), 1)); - - auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100)); - auto const compressed = codec_->compress(data.get()); - - if (auto const length = codec_->getUncompressedLength(data.get())) { - EXPECT_EQ(100, *length); - } - EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100)); - // If the uncompressed length is stored in the frame, then make sure it throws - // when it is given the wrong length. - if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) { - EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200)); - } -} - -TEST_P(StreamingUnitTest, emptyData) { - ByteRange input{}; - auto buffer = IOBuf::create(codec_->maxCompressedLength(0)); - buffer->append(buffer->capacity()); - MutableByteRange output; - - // Test compressing empty data in one pass - if (!codec_->needsDataLength()) { - output = {buffer->writableData(), buffer->length()}; - EXPECT_TRUE( - codec_->compressStream(input, output, StreamCodec::FlushOp::END)); - } - codec_->resetStream(0); - output = {buffer->writableData(), buffer->length()}; - EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END)); - - // Test uncompressing the compressed empty data is equivalent to the empty - // string - { - size_t compressedSize = buffer->length() - output.size(); - auto const compressed = - IOBuf::copyBuffer(buffer->writableData(), compressedSize); - auto inputRange = compressed->coalesce(); - codec_->resetStream(0); - output = {buffer->writableData(), buffer->length()}; - EXPECT_TRUE(codec_->uncompressStream( - inputRange, output, StreamCodec::FlushOp::END)); - EXPECT_EQ(output.size(), buffer->length()); - } - - // Test compressing empty data with multiple calls to compressStream() - { - auto largeBuffer = IOBuf::create(codec_->maxCompressedLength(0) * 2); - largeBuffer->append(largeBuffer->capacity()); - codec_->resetStream(0); - output = {largeBuffer->writableData(), largeBuffer->length()}; - EXPECT_FALSE(codec_->compressStream(input, output)); - EXPECT_TRUE( - codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH)); - EXPECT_TRUE( - codec_->compressStream(input, output, StreamCodec::FlushOp::END)); - } - - // Test uncompressing empty data - output = {}; - codec_->resetStream(); - EXPECT_TRUE(codec_->uncompressStream(input, output)); - codec_->resetStream(); - EXPECT_TRUE( - codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH)); - codec_->resetStream(); - EXPECT_TRUE( - codec_->uncompressStream(input, output, StreamCodec::FlushOp::END)); - codec_->resetStream(0); - EXPECT_TRUE(codec_->uncompressStream(input, output)); - codec_->resetStream(0); - EXPECT_TRUE( - codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH)); - codec_->resetStream(0); - EXPECT_TRUE( - codec_->uncompressStream(input, output, StreamCodec::FlushOp::END)); -} - -TEST_P(StreamingUnitTest, noForwardProgress) { - auto inBuffer = IOBuf::create(2); - inBuffer->writableData()[0] = 'a'; - inBuffer->writableData()[1] = 'a'; - inBuffer->append(2); - const auto compressed = codec_->compress(inBuffer.get()); - auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2)); - - ByteRange emptyInput; - MutableByteRange emptyOutput; - - const std::array flushOps = {{ - StreamCodec::FlushOp::NONE, - StreamCodec::FlushOp::FLUSH, - StreamCodec::FlushOp::END, - }}; - - // No progress is not okay twice in a row for all flush operations when - // compressing - for (const auto flushOp : flushOps) { - if (codec_->needsDataLength()) { - codec_->resetStream(inBuffer->computeChainDataLength()); - } else { - codec_->resetStream(); - } - auto input = inBuffer->coalesce(); - MutableByteRange output = {outBuffer->writableTail(), - outBuffer->tailroom()}; - // Compress some data to avoid empty data special casing - while (!input.empty()) { - codec_->compressStream(input, output); - } - EXPECT_FALSE(codec_->compressStream(emptyInput, emptyOutput, flushOp)); - EXPECT_THROW( - codec_->compressStream(emptyInput, emptyOutput, flushOp), - std::runtime_error); - } - - // No progress is not okay twice in a row for all flush operations when - // uncompressing - for (const auto flushOp : flushOps) { - codec_->resetStream(); - auto input = compressed->coalesce(); - // Remove the last byte so the operation is incomplete - input.uncheckedSubtract(1); - MutableByteRange output = {inBuffer->writableData(), inBuffer->length()}; - // Uncompress some data to avoid empty data special casing - while (!input.empty()) { - EXPECT_FALSE(codec_->uncompressStream(input, output)); - } - EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput, flushOp)); - EXPECT_THROW( - codec_->uncompressStream(emptyInput, emptyOutput, flushOp), - std::runtime_error); - } -} - -TEST_P(StreamingUnitTest, stateTransitions) { - auto inBuffer = IOBuf::create(2); - inBuffer->writableData()[0] = 'a'; - inBuffer->writableData()[1] = 'a'; - inBuffer->append(2); - auto compressed = codec_->compress(inBuffer.get()); - ByteRange const in = compressed->coalesce(); - auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size())); - MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()}; - - auto compress = [&]( - StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE, - bool empty = false) { - auto input = in; - auto output = empty ? MutableByteRange{} : out; - return codec_->compressStream(input, output, flushOp); - }; - auto compress_all = [&](bool expect, - StreamCodec::FlushOp flushOp = - StreamCodec::FlushOp::NONE, - bool empty = false) { - auto input = in; - auto output = empty ? MutableByteRange{} : out; - while (!input.empty()) { - if (expect) { - EXPECT_TRUE(codec_->compressStream(input, output, flushOp)); - } else { - EXPECT_FALSE(codec_->compressStream(input, output, flushOp)); - } - } - }; - auto uncompress = [&]( - StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE, - bool empty = false) { - auto input = in; - auto output = empty ? MutableByteRange{} : out; - return codec_->uncompressStream(input, output, flushOp); - }; - - // compression flow - if (!codec_->needsDataLength()) { - codec_->resetStream(); - EXPECT_FALSE(compress()); - EXPECT_FALSE(compress()); - EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH)); - EXPECT_FALSE(compress()); - EXPECT_TRUE(compress(StreamCodec::FlushOp::END)); - } - codec_->resetStream(in.size() * 5); - compress_all(false); - compress_all(false); - compress_all(true, StreamCodec::FlushOp::FLUSH); - compress_all(false); - compress_all(true, StreamCodec::FlushOp::END); - - // uncompression flow - codec_->resetStream(); - EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true)); - codec_->resetStream(); - EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true)); - codec_->resetStream(); - EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true)); - codec_->resetStream(); - EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true)); - codec_->resetStream(); - EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH)); - // compress -> uncompress - codec_->resetStream(in.size()); - EXPECT_FALSE(compress()); - EXPECT_THROW(uncompress(), std::logic_error); - // uncompress -> compress - codec_->resetStream(inBuffer->computeChainDataLength()); - EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH)); - EXPECT_THROW(compress(), std::logic_error); - // end -> compress - if (!codec_->needsDataLength()) { - codec_->resetStream(); - EXPECT_FALSE(compress()); - EXPECT_TRUE(compress(StreamCodec::FlushOp::END)); - EXPECT_THROW(compress(), std::logic_error); - } - codec_->resetStream(in.size() * 2); - compress_all(false); - compress_all(true, StreamCodec::FlushOp::END); - EXPECT_THROW(compress(), std::logic_error); - // end -> uncompress - codec_->resetStream(); - EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH)); - EXPECT_THROW(uncompress(), std::logic_error); - // flush -> compress - codec_->resetStream(in.size()); - EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true)); - EXPECT_THROW(compress(), std::logic_error); - // flush -> end - codec_->resetStream(in.size()); - EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true)); - EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error); - // undefined -> compress - codec_->compress(inBuffer.get()); - EXPECT_THROW(compress(), std::logic_error); - codec_->uncompress(compressed.get(), inBuffer->computeChainDataLength()); - EXPECT_THROW(compress(), std::logic_error); - // undefined -> undefined - codec_->uncompress(compressed.get()); - codec_->compress(inBuffer.get()); -} - -INSTANTIATE_TEST_CASE_P( - StreamingUnitTest, - StreamingUnitTest, - testing::ValuesIn(availableStreamCodecs())); - -class StreamingCompressionTest - : public testing::TestWithParam> { - protected: - void SetUp() override { - auto const tup = GetParam(); - uncompressedLength_ = uint64_t(1) << std::get<0>(tup); - chunkSize_ = size_t(1) << std::get<1>(tup); - codec_ = getStreamCodec(std::get<2>(tup)); - } - - void runResetStreamTest(DataHolder const& dh); - void runCompressStreamTest(DataHolder const& dh); - void runUncompressStreamTest(DataHolder const& dh); - void runFlushTest(DataHolder const& dh); - - private: - std::vector split(ByteRange data) const; - - uint64_t uncompressedLength_; - size_t chunkSize_; - std::unique_ptr codec_; -}; - -std::vector StreamingCompressionTest::split(ByteRange data) const { - size_t const pieces = std::max(1, data.size() / chunkSize_); - std::vector result; - result.reserve(pieces + 1); - while (!data.empty()) { - size_t const pieceSize = std::min(data.size(), chunkSize_); - result.push_back(data.subpiece(0, pieceSize)); - data.uncheckedAdvance(pieceSize); - } - return result; -} - -static std::unique_ptr compressSome( - StreamCodec* codec, - ByteRange data, - uint64_t bufferSize, - StreamCodec::FlushOp flush) { - bool result; - IOBufQueue queue; - do { - auto buffer = IOBuf::create(bufferSize); - buffer->append(buffer->capacity()); - MutableByteRange output{buffer->writableData(), buffer->length()}; - - result = codec->compressStream(data, output, flush); - buffer->trimEnd(output.size()); - queue.append(std::move(buffer)); - - } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result); - EXPECT_TRUE(data.empty()); - return queue.move(); -} - -static std::pair> uncompressSome( - StreamCodec* codec, - ByteRange& data, - uint64_t bufferSize, - StreamCodec::FlushOp flush) { - bool result; - IOBufQueue queue; - do { - auto buffer = IOBuf::create(bufferSize); - buffer->append(buffer->capacity()); - MutableByteRange output{buffer->writableData(), buffer->length()}; - - result = codec->uncompressStream(data, output, flush); - buffer->trimEnd(output.size()); - queue.append(std::move(buffer)); - - } while (queue.tailroom() == 0 && !result); - return std::make_pair(result, queue.move()); -} - -void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) { - auto const input = dh.data(uncompressedLength_); - // Compress some but leave state unclean - codec_->resetStream(uncompressedLength_); - compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE); - // Reset stream and compress all - if (codec_->needsDataLength()) { - codec_->resetStream(uncompressedLength_); - } else { - codec_->resetStream(); - } - auto compressed = - compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END); - auto const uncompressed = codec_->uncompress(compressed.get(), input.size()); - EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); -} - -TEST_P(StreamingCompressionTest, resetStream) { - runResetStreamTest(constantDataHolder); - runResetStreamTest(randomDataHolder); -} - -void StreamingCompressionTest::runCompressStreamTest( - const folly::io::test::DataHolder& dh) { - auto const inputs = split(dh.data(uncompressedLength_)); - - IOBufQueue queue; - codec_->resetStream(uncompressedLength_); - // Compress many inputs in a row - for (auto const input : inputs) { - queue.append(compressSome( - codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE)); - } - // Finish the operation with empty input. - ByteRange empty; - queue.append( - compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END)); - - auto const uncompressed = codec_->uncompress(queue.front()); - EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); -} - -TEST_P(StreamingCompressionTest, compressStream) { - runCompressStreamTest(constantDataHolder); - runCompressStreamTest(randomDataHolder); -} - -void StreamingCompressionTest::runUncompressStreamTest( - const folly::io::test::DataHolder& dh) { - auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_)); - // Concatenate 3 compressed frames in a row - auto compressed = codec_->compress(data.get()); - compressed->prependChain(codec_->compress(data.get())); - compressed->prependChain(codec_->compress(data.get())); - // Pass all 3 compressed frames in one input buffer - auto input = compressed->coalesce(); - // Uncompress the first frame - codec_->resetStream(data->computeChainDataLength()); - { - auto const result = uncompressSome( - codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH); - ASSERT_TRUE(result.first); - ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); - } - // Uncompress the second frame - codec_->resetStream(); - { - auto const result = uncompressSome( - codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END); - ASSERT_TRUE(result.first); - ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); - } - // Uncompress the third frame - codec_->resetStream(); - { - auto const result = uncompressSome( - codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH); - ASSERT_TRUE(result.first); - ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); - } - EXPECT_TRUE(input.empty()); -} - -TEST_P(StreamingCompressionTest, uncompressStream) { - runUncompressStreamTest(constantDataHolder); - runUncompressStreamTest(randomDataHolder); -} - -void StreamingCompressionTest::runFlushTest(DataHolder const& dh) { - auto const inputs = split(dh.data(uncompressedLength_)); - auto uncodec = getStreamCodec(codec_->type()); - - if (codec_->needsDataLength()) { - codec_->resetStream(uncompressedLength_); - } else { - codec_->resetStream(); - } - for (auto input : inputs) { - // Compress some data and flush the stream - auto compressed = compressSome( - codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH); - auto compressedRange = compressed->coalesce(); - // Uncompress the compressed data - auto result = uncompressSome( - uncodec.get(), - compressedRange, - chunkSize_, - StreamCodec::FlushOp::FLUSH); - // All compressed data should have been consumed - EXPECT_TRUE(compressedRange.empty()); - // The frame isn't complete - EXPECT_FALSE(result.first); - // The uncompressed data should be exactly the input data - EXPECT_EQ(input.size(), result.second->computeChainDataLength()); - auto const data = IOBuf::wrapBuffer(input); - EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); - } -} - -TEST_P(StreamingCompressionTest, testFlush) { - runFlushTest(constantDataHolder); - runFlushTest(randomDataHolder); -} - -INSTANTIATE_TEST_CASE_P( - StreamingCompressionTest, - StreamingCompressionTest, - testing::Combine( - testing::Values(0, 1, 12, 22, 27), - testing::Values(12, 17, 20), - testing::ValuesIn(availableStreamCodecs()))); - -namespace { - -// Codec types included in the codec returned by getAutoUncompressionCodec() by -// default. -std::vector autoUncompressionCodecTypes = {{ - CodecType::LZ4_FRAME, - CodecType::ZSTD, - CodecType::ZLIB, - CodecType::GZIP, - CodecType::LZMA2, - CodecType::BZIP2, -}}; - -} // namespace - -class AutomaticCodecTest : public testing::TestWithParam { - protected: - void SetUp() override { - codecType_ = GetParam(); - codec_ = getCodec(codecType_); - autoType_ = std::any_of( - autoUncompressionCodecTypes.begin(), - autoUncompressionCodecTypes.end(), - [&](CodecType o) { return codecType_ == o; }); - // Add the codec with type codecType_ as the terminal codec if it is not in - // autoUncompressionCodecTypes. - auto_ = getAutoUncompressionCodec({}, getTerminalCodec()); - } - - void runSimpleTest(const DataHolder& dh); - - std::unique_ptr getTerminalCodec() { - return (autoType_ ? nullptr : getCodec(codecType_)); - } - - std::unique_ptr codec_; - std::unique_ptr auto_; - CodecType codecType_; - // true if codecType_ is in autoUncompressionCodecTypes - bool autoType_; -}; - -void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) { - constexpr uint64_t uncompressedLength = 1000; - auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength)); - auto compressed = codec_->compress(original.get()); - - if (!codec_->needsUncompressedLength()) { - auto uncompressed = auto_->uncompress(compressed.get()); - EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); - EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get())); - } - { - auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength); - EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); - EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get())); - } - ASSERT_GE(compressed->computeChainDataLength(), 8); - for (size_t i = 0; i < 8; ++i) { - auto split = compressed->clone(); - auto rest = compressed->clone(); - split->trimEnd(split->length() - i); - rest->trimStart(i); - split->appendChain(std::move(rest)); - auto uncompressed = auto_->uncompress(split.get(), uncompressedLength); - EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); - EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get())); - } -} - -TEST_P(AutomaticCodecTest, RandomData) { - runSimpleTest(randomDataHolder); -} - -TEST_P(AutomaticCodecTest, ConstantData) { - runSimpleTest(constantDataHolder); -} - -TEST_P(AutomaticCodecTest, ValidPrefixes) { - const auto prefixes = codec_->validPrefixes(); - for (const auto& prefix : prefixes) { - EXPECT_FALSE(prefix.empty()); - // Ensure that all strings are at least 8 bytes for LZMA2. - // The bytes after the prefix should be ignored by `canUncompress()`. - IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8}; - data.append(8); - EXPECT_TRUE(codec_->canUncompress(&data)); - EXPECT_TRUE(auto_->canUncompress(&data)); - } -} - -TEST_P(AutomaticCodecTest, NeedsUncompressedLength) { - if (codec_->needsUncompressedLength()) { - EXPECT_TRUE(auto_->needsUncompressedLength()); - } -} - -TEST_P(AutomaticCodecTest, maxUncompressedLength) { - EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength()); -} - -TEST_P(AutomaticCodecTest, DefaultCodec) { - const uint64_t length = 42; - std::vector> codecs; - codecs.push_back(getCodec(CodecType::ZSTD)); - auto automatic = - getAutoUncompressionCodec(std::move(codecs), getTerminalCodec()); - auto original = IOBuf::wrapBuffer(constantDataHolder.data(length)); - auto compressed = codec_->compress(original.get()); - std::unique_ptr decompressed; - - if (automatic->needsUncompressedLength()) { - decompressed = automatic->uncompress(compressed.get(), length); - } else { - decompressed = automatic->uncompress(compressed.get()); - } - - EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get())); -} - -namespace { -class CustomCodec : public Codec { - public: - static std::unique_ptr create(std::string prefix, CodecType type) { - return std::make_unique(std::move(prefix), type); - } - explicit CustomCodec(std::string prefix, CodecType type) - : Codec(CodecType::USER_DEFINED), - prefix_(std::move(prefix)), - codec_(getCodec(type)) {} - - private: - std::vector validPrefixes() const override { - return {prefix_}; - } - - uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override { - return codec_->maxCompressedLength(uncompressedLength) + prefix_.size(); - } - - bool canUncompress(const IOBuf* data, Optional) const override { - auto clone = data->cloneCoalescedAsValue(); - if (clone.length() < prefix_.size()) { - return false; - } - return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0; - } - - std::unique_ptr doCompress(const IOBuf* data) override { - auto result = IOBuf::copyBuffer(prefix_); - result->appendChain(codec_->compress(data)); - EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength())); - return result; - } - - std::unique_ptr doUncompress( - const IOBuf* data, - Optional uncompressedLength) override { - EXPECT_TRUE(canUncompress(data, uncompressedLength)); - auto clone = data->cloneCoalescedAsValue(); - clone.trimStart(prefix_.size()); - return codec_->uncompress(&clone, uncompressedLength); - } - - std::string prefix_; - std::unique_ptr codec_; -}; -} - -TEST_P(AutomaticCodecTest, CustomCodec) { - const uint64_t length = 42; - auto ab = CustomCodec::create("ab", CodecType::ZSTD); - std::vector> codecs; - codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD)); - auto automatic = - getAutoUncompressionCodec(std::move(codecs), getTerminalCodec()); - auto original = IOBuf::wrapBuffer(constantDataHolder.data(length)); - - auto abCompressed = ab->compress(original.get()); - std::unique_ptr abDecompressed; - if (automatic->needsUncompressedLength()) { - abDecompressed = automatic->uncompress(abCompressed.get(), length); - } else { - abDecompressed = automatic->uncompress(abCompressed.get()); - } - EXPECT_TRUE(automatic->canUncompress(abCompressed.get())); - EXPECT_FALSE(auto_->canUncompress(abCompressed.get())); - EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get())); - - auto compressed = codec_->compress(original.get()); - std::unique_ptr decompressed; - if (automatic->needsUncompressedLength()) { - decompressed = automatic->uncompress(compressed.get(), length); - } else { - decompressed = automatic->uncompress(compressed.get()); - } - EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get())); -} - -TEST_P(AutomaticCodecTest, CustomDefaultCodec) { - const uint64_t length = 42; - auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION); - std::vector> codecs; - codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION)); - codecs.push_back(getCodec(CodecType::LZ4_FRAME)); - auto automatic = - getAutoUncompressionCodec(std::move(codecs), getTerminalCodec()); - auto original = IOBuf::wrapBuffer(constantDataHolder.data(length)); - - auto noneCompressed = none->compress(original.get()); - std::unique_ptr noneDecompressed; - if (automatic->needsUncompressedLength()) { - noneDecompressed = automatic->uncompress(noneCompressed.get(), length); - } else { - noneDecompressed = automatic->uncompress(noneCompressed.get()); - } - EXPECT_TRUE(automatic->canUncompress(noneCompressed.get())); - EXPECT_FALSE(auto_->canUncompress(noneCompressed.get())); - EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get())); - - auto compressed = codec_->compress(original.get()); - std::unique_ptr decompressed; - if (automatic->needsUncompressedLength()) { - decompressed = automatic->uncompress(compressed.get(), length); - } else { - decompressed = automatic->uncompress(compressed.get()); - } - EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get())); -} - -TEST_P(AutomaticCodecTest, canUncompressOneBytes) { - // No default codec can uncompress 1 bytes. - IOBuf buf{IOBuf::CREATE, 1}; - buf.append(1); - EXPECT_FALSE(codec_->canUncompress(&buf, 1)); - EXPECT_FALSE(codec_->canUncompress(&buf, folly::none)); - EXPECT_FALSE(auto_->canUncompress(&buf, 1)); - EXPECT_FALSE(auto_->canUncompress(&buf, folly::none)); -} - -INSTANTIATE_TEST_CASE_P( - AutomaticCodecTest, - AutomaticCodecTest, - testing::ValuesIn(availableCodecs())); - -namespace { - -// Codec that always "uncompresses" to the same string. -class ConstantCodec : public Codec { - public: - static std::unique_ptr create( - std::string uncompressed, - CodecType type) { - return std::make_unique(std::move(uncompressed), type); - } - explicit ConstantCodec(std::string uncompressed, CodecType type) - : Codec(type), uncompressed_(std::move(uncompressed)) {} - - private: - uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override { - return uncompressedLength; - } - - std::unique_ptr doCompress(const IOBuf*) override { - throw std::runtime_error("ConstantCodec error: compress() not supported."); - } - - std::unique_ptr doUncompress(const IOBuf*, Optional) - override { - return IOBuf::copyBuffer(uncompressed_); - } - - std::string uncompressed_; - std::unique_ptr codec_; -}; - -} // namespace - -class TerminalCodecTest : public testing::TestWithParam { - protected: - void SetUp() override { - codecType_ = GetParam(); - codec_ = getCodec(codecType_); - auto_ = getAutoUncompressionCodec(); - } - - CodecType codecType_; - std::unique_ptr codec_; - std::unique_ptr auto_; -}; - -// Test that the terminal codec's uncompress() function is called when the -// default chosen automatic codec throws. -TEST_P(TerminalCodecTest, uncompressIfDefaultThrows) { - std::string const original = "abc"; - auto const compressed = codec_->compress(original); - - // Sanity check: the automatic codec can uncompress the original string. - auto const uncompressed = auto_->uncompress(compressed); - EXPECT_EQ(uncompressed, original); - - // Truncate the compressed string. - auto const truncated = compressed.substr(0, compressed.size() - 1); - auto const truncatedBuf = - IOBuf::wrapBuffer(truncated.data(), truncated.size()); - EXPECT_TRUE(auto_->canUncompress(truncatedBuf.get())); - EXPECT_ANY_THROW(auto_->uncompress(truncated)); - - // Expect the terminal codec to successfully uncompress the string. - std::unique_ptr terminal = getAutoUncompressionCodec( - {}, ConstantCodec::create("dummyString", CodecType::USER_DEFINED)); - EXPECT_TRUE(terminal->canUncompress(truncatedBuf.get())); - EXPECT_EQ(terminal->uncompress(truncated), "dummyString"); -} - -// If the terminal codec has one of the "default types" automatically added in -// the AutomaticCodec, check that the default codec is no longer added. -TEST_P(TerminalCodecTest, terminalOverridesDefaults) { - std::unique_ptr terminal = getAutoUncompressionCodec( - {}, ConstantCodec::create("dummyString", codecType_)); - std::string const original = "abc"; - auto const compressed = codec_->compress(original); - EXPECT_EQ(terminal->uncompress(compressed), "dummyString"); -} - -INSTANTIATE_TEST_CASE_P( - TerminalCodecTest, - TerminalCodecTest, - testing::ValuesIn(autoUncompressionCodecTypes)); - -TEST(ValidPrefixesTest, CustomCodec) { - std::vector> codecs; - codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION)); - const auto none = getAutoUncompressionCodec(std::move(codecs)); - const auto prefixes = none->validPrefixes(); - const auto it = std::find(prefixes.begin(), prefixes.end(), "none"); - EXPECT_TRUE(it != prefixes.end()); -} - -#define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \ - do { \ - if (kIsDebug) { \ - EXPECT_THROW((statement), expected_exception); \ - } else { \ - EXPECT_NO_THROW((statement)); \ - } \ - } while (false) - -TEST(CheckCompatibleTest, SimplePrefixSecond) { - std::vector> codecs; - codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION)); - codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION)); - EXPECT_THROW_IF_DEBUG( - getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); -} - -TEST(CheckCompatibleTest, SimplePrefixFirst) { - std::vector> codecs; - codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION)); - codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION)); - EXPECT_THROW_IF_DEBUG( - getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); -} - -TEST(CheckCompatibleTest, Empty) { - std::vector> codecs; - codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION)); - EXPECT_THROW_IF_DEBUG( - getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); -} - -TEST(CheckCompatibleTest, ZstdPrefix) { - std::vector> codecs; - codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD)); - EXPECT_THROW_IF_DEBUG( - getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); -} - -TEST(CheckCompatibleTest, ZstdDuplicate) { - std::vector> codecs; - codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD)); - EXPECT_THROW_IF_DEBUG( - getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); -} - -TEST(CheckCompatibleTest, ZlibIsPrefix) { - std::vector> codecs; - codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD)); - EXPECT_THROW_IF_DEBUG( - getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); -} - -#if FOLLY_HAVE_LIBZSTD - -TEST(ZstdTest, BackwardCompatible) { - auto codec = getCodec(CodecType::ZSTD); - { - auto const data = IOBuf::wrapBuffer(randomDataHolder.data(size_t(1) << 20)); - auto compressed = codec->compress(data.get()); - compressed->coalesce(); - EXPECT_EQ( - data->length(), - ZSTD_getDecompressedSize(compressed->data(), compressed->length())); - } - { - auto const data = - IOBuf::wrapBuffer(randomDataHolder.data(size_t(100) << 20)); - auto compressed = codec->compress(data.get()); - compressed->coalesce(); - EXPECT_EQ( - data->length(), - ZSTD_getDecompressedSize(compressed->data(), compressed->length())); - } -} - -#endif - -#if FOLLY_HAVE_LIBZ - -using ZlibFormat = zlib::Options::Format; - -TEST(ZlibTest, Auto) { - size_t const uncompressedLength_ = (size_t)1 << 15; - auto const original = std::string( - reinterpret_cast( - randomDataHolder.data(uncompressedLength_).data()), - uncompressedLength_); - auto optionCodec = zlib::getCodec(zlib::Options(ZlibFormat::AUTO)); - - // Test the codec can uncompress zlib data. - { - auto codec = getCodec(CodecType::ZLIB); - auto const compressed = codec->compress(original); - auto const uncompressed = optionCodec->uncompress(compressed); - EXPECT_EQ(original, uncompressed); - } - - // Test the codec can uncompress gzip data. - { - auto codec = getCodec(CodecType::GZIP); - auto const compressed = codec->compress(original); - auto const uncompressed = optionCodec->uncompress(compressed); - EXPECT_EQ(original, uncompressed); - } -} - -TEST(ZlibTest, DefaultOptions) { - size_t const uncompressedLength_ = (size_t)1 << 20; - auto const original = std::string( - reinterpret_cast( - randomDataHolder.data(uncompressedLength_).data()), - uncompressedLength_); - { - auto codec = getCodec(CodecType::ZLIB); - auto optionCodec = zlib::getCodec(zlib::defaultZlibOptions()); - auto const compressed = optionCodec->compress(original); - auto uncompressed = codec->uncompress(compressed); - EXPECT_EQ(original, uncompressed); - uncompressed = optionCodec->uncompress(compressed); - EXPECT_EQ(original, uncompressed); - } - - { - auto codec = getCodec(CodecType::GZIP); - auto optionCodec = zlib::getCodec(zlib::defaultGzipOptions()); - auto const compressed = optionCodec->compress(original); - auto uncompressed = codec->uncompress(compressed); - EXPECT_EQ(original, uncompressed); - uncompressed = optionCodec->uncompress(compressed); - EXPECT_EQ(original, uncompressed); - } -} - -class ZlibOptionsTest : public testing::TestWithParam< - std::tr1::tuple> { - protected: - void SetUp() override { - auto tup = GetParam(); - options_.format = std::tr1::get<0>(tup); - options_.windowSize = std::tr1::get<1>(tup); - options_.memLevel = std::tr1::get<2>(tup); - options_.strategy = std::tr1::get<3>(tup); - codec_ = zlib::getStreamCodec(options_); - } - - void runSimpleRoundTripTest(const DataHolder& dh); - - private: - zlib::Options options_; - std::unique_ptr codec_; -}; - -void ZlibOptionsTest::runSimpleRoundTripTest(const DataHolder& dh) { - size_t const uncompressedLength = (size_t)1 << 16; - auto const original = std::string( - reinterpret_cast(dh.data(uncompressedLength).data()), - uncompressedLength); - - auto const compressed = codec_->compress(original); - auto const uncompressed = codec_->uncompress(compressed); - EXPECT_EQ(uncompressed, original); -} - -TEST_P(ZlibOptionsTest, simpleRoundTripTest) { - runSimpleRoundTripTest(constantDataHolder); - runSimpleRoundTripTest(randomDataHolder); -} - -INSTANTIATE_TEST_CASE_P( - ZlibOptionsTest, - ZlibOptionsTest, - testing::Combine( - testing::Values( - ZlibFormat::ZLIB, - ZlibFormat::GZIP, - ZlibFormat::RAW, - ZlibFormat::AUTO), - testing::Values(9, 12, 15), - testing::Values(1, 8, 9), - testing::Values( - Z_DEFAULT_STRATEGY, - Z_FILTERED, - Z_HUFFMAN_ONLY, - Z_RLE, - Z_FIXED))); - -#endif // FOLLY_HAVE_LIBZ - -} // namespace test -} // namespace io -} // namespace folly - -int main(int argc, char *argv[]) { - testing::InitGoogleTest(&argc, argv); - gflags::ParseCommandLineFlags(&argc, &argv, true); - - auto ret = RUN_ALL_TESTS(); - if (!ret) { - folly::runBenchmarksOnFlag(); - } - return ret; -}