X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2FCompression.cpp;h=ae75b2a6a38a958c39ca4f6f32051eeeca21c6da;hb=03ce292a000fb616c12e2aca2a38b7b26e6ef050;hp=aaed2374b83d545497d695356a7ea5abf22d5794;hpb=b82fb4f77c83c14dbb34a1b5189f67018bba7a65;p=folly.git diff --git a/folly/io/Compression.cpp b/folly/io/Compression.cpp index aaed2374..ae75b2a6 100644 --- a/folly/io/Compression.cpp +++ b/folly/io/Compression.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2016 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,9 @@ #if FOLLY_HAVE_LIBLZ4 #include #include +#if LZ4_VERSION_NUMBER >= 10301 +#include +#endif #endif #include @@ -40,12 +43,15 @@ #include #endif +#include #include #include #include #include #include #include +#include +#include namespace folly { namespace io { @@ -56,13 +62,26 @@ std::unique_ptr Codec::compress(const IOBuf* data) { uint64_t len = data->computeChainDataLength(); if (len == 0) { return IOBuf::create(0); - } else if (len > maxUncompressedLength()) { + } + if (len > maxUncompressedLength()) { throw std::runtime_error("Codec: uncompressed length too large"); } return doCompress(data); } +std::string Codec::compress(const StringPiece data) { + const uint64_t len = data.size(); + if (len == 0) { + return ""; + } + if (len > maxUncompressedLength()) { + throw std::runtime_error("Codec: uncompressed length too large"); + } + + return doCompressString(data); +} + std::unique_ptr Codec::uncompress(const IOBuf* data, uint64_t uncompressedLength) { if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) { @@ -84,6 +103,28 @@ std::unique_ptr Codec::uncompress(const IOBuf* data, return doUncompress(data, uncompressedLength); } +std::string Codec::uncompress( + const StringPiece data, + uint64_t uncompressedLength) { + if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) { + if (needsUncompressedLength()) { + throw std::invalid_argument("Codec: uncompressed length required"); + } + } else if (uncompressedLength > maxUncompressedLength()) { + throw std::runtime_error("Codec: uncompressed length too large"); + } + + if (data.empty()) { + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + uncompressedLength != 0) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + return ""; + } + + return doUncompressString(data, uncompressedLength); +} + bool Codec::needsUncompressedLength() const { return doNeedsUncompressedLength(); } @@ -100,6 +141,38 @@ uint64_t Codec::doMaxUncompressedLength() const { return UNLIMITED_UNCOMPRESSED_LENGTH; } +std::vector Codec::validPrefixes() const { + return {}; +} + +bool Codec::canUncompress(const IOBuf*, uint64_t) const { + return false; +} + +std::string Codec::doCompressString(const StringPiece data) { + const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data}; + auto outputBuffer = doCompress(&inputBuffer); + std::string output; + output.reserve(outputBuffer->computeChainDataLength()); + for (auto range : *outputBuffer) { + output.append(reinterpret_cast(range.data()), range.size()); + } + return output; +} + +std::string Codec::doUncompressString( + const StringPiece data, + uint64_t uncompressedLength) { + const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data}; + auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength); + std::string output; + output.reserve(outputBuffer->computeChainDataLength()); + for (auto range : *outputBuffer) { + output.append(reinterpret_cast(range.data()), range.size()); + } + return output; +} + namespace { /** @@ -181,6 +254,39 @@ inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) { #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA +namespace { +/** + * Reads sizeof(T) bytes, and returns false if not enough bytes are available. + * Returns true if the first n bytes are equal to prefix when interpreted as + * a little endian T. + */ +template +typename std::enable_if::value, bool>::type +dataStartsWithLE(const IOBuf* data, T prefix, uint64_t n = sizeof(T)) { + DCHECK_GT(n, 0); + DCHECK_LE(n, sizeof(T)); + T value; + Cursor cursor{data}; + if (!cursor.tryReadLE(value)) { + return false; + } + const T mask = n == sizeof(T) ? T(-1) : (T(1) << (8 * n)) - 1; + return prefix == (value & mask); +} + +template +typename std::enable_if::value, std::string>::type +prefixToStringLE(T prefix, uint64_t n = sizeof(T)) { + DCHECK_GT(n, 0); + DCHECK_LE(n, sizeof(T)); + prefix = Endian::little(prefix); + std::string result; + result.resize(n); + memcpy(&result[0], &prefix, n); + return result; +} +} // namespace + #if FOLLY_HAVE_LIBLZ4 /** @@ -244,12 +350,11 @@ uint64_t LZ4Codec::doMaxUncompressedLength() const { } std::unique_ptr LZ4Codec::doCompress(const IOBuf* data) { - std::unique_ptr clone; + IOBuf clone; if (data->isChained()) { // LZ4 doesn't support streaming, so we have to coalesce - clone = data->clone(); - clone->coalesce(); - data = clone.get(); + clone = data->cloneCoalescedAsValue(); + data = &clone; } uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0; @@ -286,12 +391,11 @@ std::unique_ptr LZ4Codec::doCompress(const IOBuf* data) { std::unique_ptr LZ4Codec::doUncompress( const IOBuf* data, uint64_t uncompressedLength) { - std::unique_ptr clone; + IOBuf clone; if (data->isChained()) { // LZ4 doesn't support streaming, so we have to coalesce - clone = data->clone(); - clone->coalesce(); - data = clone.get(); + clone = data->cloneCoalescedAsValue(); + data = &clone; } folly::io::Cursor cursor(data); @@ -326,7 +430,178 @@ std::unique_ptr LZ4Codec::doUncompress( return out; } -#endif // FOLLY_HAVE_LIBLZ4 +#if LZ4_VERSION_NUMBER >= 10301 + +class LZ4FrameCodec final : public Codec { + public: + static std::unique_ptr create(int level, CodecType type); + explicit LZ4FrameCodec(int level, CodecType type); + ~LZ4FrameCodec(); + + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, uint64_t uncompressedLength) + const override; + + private: + std::unique_ptr doCompress(const IOBuf* data) override; + std::unique_ptr doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) override; + + // Reset the dctx_ if it is dirty or null. + void resetDCtx(); + + int level_; + LZ4F_decompressionContext_t dctx_{nullptr}; + bool dirty_{false}; +}; + +/* static */ std::unique_ptr LZ4FrameCodec::create( + int level, + CodecType type) { + return make_unique(level, type); +} + +static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204; + +std::vector LZ4FrameCodec::validPrefixes() const { + return {prefixToStringLE(kLZ4FrameMagicLE)}; +} + +bool LZ4FrameCodec::canUncompress(const IOBuf* data, uint64_t) const { + return dataStartsWithLE(data, kLZ4FrameMagicLE); +} + +static size_t lz4FrameThrowOnError(size_t code) { + if (LZ4F_isError(code)) { + throw std::runtime_error( + to("LZ4Frame error: ", LZ4F_getErrorName(code))); + } + return code; +} + +void LZ4FrameCodec::resetDCtx() { + if (dctx_ && !dirty_) { + return; + } + if (dctx_) { + LZ4F_freeDecompressionContext(dctx_); + } + lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100)); + dirty_ = false; +} + +LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) { + DCHECK(type == CodecType::LZ4_FRAME); + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + case COMPRESSION_LEVEL_DEFAULT: + level_ = 0; + break; + case COMPRESSION_LEVEL_BEST: + level_ = 16; + break; + default: + level_ = level; + break; + } +} + +LZ4FrameCodec::~LZ4FrameCodec() { + if (dctx_) { + LZ4F_freeDecompressionContext(dctx_); + } +} + +std::unique_ptr LZ4FrameCodec::doCompress(const IOBuf* data) { + // LZ4 Frame compression doesn't support streaming so we have to coalesce + IOBuf clone; + if (data->isChained()) { + clone = data->cloneCoalescedAsValue(); + data = &clone; + } + // Set preferences + const auto uncompressedLength = data->length(); + LZ4F_preferences_t prefs{}; + prefs.compressionLevel = level_; + prefs.frameInfo.contentSize = uncompressedLength; + // Compress + auto buf = IOBuf::create(LZ4F_compressFrameBound(uncompressedLength, &prefs)); + const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame( + buf->writableTail(), + buf->tailroom(), + data->data(), + data->length(), + &prefs)); + buf->append(written); + return buf; +} + +std::unique_ptr LZ4FrameCodec::doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) { + // Reset the dctx if any errors have occurred + resetDCtx(); + // Coalesce the data + ByteRange in = *data->begin(); + IOBuf clone; + if (data->isChained()) { + clone = data->cloneCoalescedAsValue(); + in = clone.coalesce(); + } + data = nullptr; + // Select decompression options + LZ4F_decompressOptions_t options; + options.stableDst = 1; + // Select blockSize and growthSize for the IOBufQueue + IOBufQueue queue(IOBufQueue::cacheChainLength()); + auto blockSize = uint64_t{64} << 10; + auto growthSize = uint64_t{4} << 20; + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) { + // Allocate uncompressedLength in one chunk (up to 64 MB) + const auto allocateSize = std::min(uncompressedLength, uint64_t{64} << 20); + queue.preallocate(allocateSize, allocateSize); + blockSize = std::min(uncompressedLength, blockSize); + growthSize = std::min(uncompressedLength, growthSize); + } else { + // Reduce growthSize for small data + const auto guessUncompressedLen = + 4 * std::max(blockSize, in.size()); + growthSize = std::min(guessUncompressedLen, growthSize); + } + // Once LZ4_decompress() is called, the dctx_ cannot be reused until it + // returns 0 + dirty_ = true; + // Decompress until the frame is over + size_t code = 0; + do { + // Allocate enough space to decompress at least a block + void* out; + size_t outSize; + std::tie(out, outSize) = queue.preallocate(blockSize, growthSize); + // Decompress + size_t inSize = in.size(); + code = lz4FrameThrowOnError( + LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options)); + if (in.empty() && outSize == 0 && code != 0) { + // We passed no input, no output was produced, and the frame isn't over + // No more forward progress is possible + throw std::runtime_error("LZ4Frame error: Incomplete frame"); + } + in.uncheckedAdvance(inSize); + queue.postallocate(outSize); + } while (code != 0); + // At this point the decompression context can be reused + dirty_ = false; + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + queue.chainLength() != uncompressedLength) { + throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength"); + } + return queue.move(); +} + +#endif // LZ4_VERSION_NUMBER >= 10301 +#endif // FOLLY_HAVE_LIBLZ4 #if FOLLY_HAVE_LIBSNAPPY @@ -460,6 +735,10 @@ class ZlibCodec final : public Codec { static std::unique_ptr create(int level, CodecType type); explicit ZlibCodec(int level, CodecType type); + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, uint64_t uncompressedLength) + const override; + private: std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( @@ -472,6 +751,66 @@ class ZlibCodec final : public Codec { int level_; }; +static constexpr uint16_t kGZIPMagicLE = 0x8B1F; + +std::vector ZlibCodec::validPrefixes() const { + if (type() == CodecType::ZLIB) { + // Zlib streams start with a 2 byte header. + // + // 0 1 + // +---+---+ + // |CMF|FLG| + // +---+---+ + // + // We won't restrict the values of any sub-fields except as described below. + // + // The lowest 4 bits of CMF is the compression method (CM). + // CM == 0x8 is the deflate compression method, which is currently the only + // supported compression method, so any valid prefix must have CM == 0x8. + // + // The lowest 5 bits of FLG is FCHECK. + // FCHECK must be such that the two header bytes are a multiple of 31 when + // interpreted as a big endian 16-bit number. + std::vector result; + // 16 values for the first byte, 8 values for the second byte. + // There are also 4 combinations where both 0x00 and 0x1F work as FCHECK. + result.reserve(132); + // Select all values for the CMF byte that use the deflate algorithm 0x8. + for (uint32_t first = 0x0800; first <= 0xF800; first += 0x1000) { + // Select all values for the FLG, but leave FCHECK as 0 since it's fixed. + for (uint32_t second = 0x00; second <= 0xE0; second += 0x20) { + uint16_t prefix = first | second; + // Compute FCHECK. + prefix += 31 - (prefix % 31); + result.push_back(prefixToStringLE(Endian::big(prefix))); + // zlib won't produce this, but it is a valid prefix. + if ((prefix & 0x1F) == 31) { + prefix -= 31; + result.push_back(prefixToStringLE(Endian::big(prefix))); + } + } + } + return result; + } else { + // The gzip frame starts with 2 magic bytes. + return {prefixToStringLE(kGZIPMagicLE)}; + } +} + +bool ZlibCodec::canUncompress(const IOBuf* data, uint64_t) const { + if (type() == CodecType::ZLIB) { + uint16_t value; + Cursor cursor{data}; + if (!cursor.tryReadBE(value)) { + return false; + } + // zlib compressed if using deflate and is a multiple of 31. + return (value & 0x0F00) == 0x0800 && value % 31 == 0; + } else { + return dataStartsWithLE(data, kGZIPMagicLE); + } +} + std::unique_ptr ZlibCodec::create(int level, CodecType type) { return make_unique(level, type); } @@ -630,6 +969,13 @@ std::unique_ptr ZlibCodec::doCompress(const IOBuf* data) { return out; } +static uint64_t computeBufferLength(uint64_t const compressedLength) { + constexpr uint64_t kMaxBufferLength = uint64_t(4) << 20; // 4 MiB + constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB + const uint64_t goodBufferSize = 4 * std::max(kBlockSize, compressedLength); + return std::min(goodBufferSize, kMaxBufferLength); +} + std::unique_ptr ZlibCodec::doUncompress(const IOBuf* data, uint64_t uncompressedLength) { z_stream stream; @@ -662,8 +1008,9 @@ std::unique_ptr ZlibCodec::doUncompress(const IOBuf* data, }; // Max 64MiB in one go - constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB - constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB + constexpr uint64_t maxSingleStepLength = uint64_t(64) << 20; // 64MiB + const uint64_t defaultBufferLength = + computeBufferLength(data->computeChainDataLength()); auto out = addOutputBuffer( &stream, @@ -720,6 +1067,10 @@ class LZMA2Codec final : public Codec { static std::unique_ptr create(int level, CodecType type); explicit LZMA2Codec(int level, CodecType type); + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, uint64_t uncompressedLength) + const override; + private: bool doNeedsUncompressedLength() const override; uint64_t doMaxUncompressedLength() const override; @@ -737,6 +1088,25 @@ class LZMA2Codec final : public Codec { int level_; }; +static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD; +static constexpr unsigned kLZMA2MagicBytes = 6; + +std::vector LZMA2Codec::validPrefixes() const { + if (type() == CodecType::LZMA2_VARINT_SIZE) { + return {}; + } + return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)}; +} + +bool LZMA2Codec::canUncompress(const IOBuf* data, uint64_t) const { + if (type() == CodecType::LZMA2_VARINT_SIZE) { + return false; + } + // Returns false for all inputs less than 8 bytes. + // This is okay, because no valid LZMA2 streams are less than 8 bytes. + return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes); +} + std::unique_ptr LZMA2Codec::create(int level, CodecType type) { return make_unique(level, type); } @@ -762,7 +1132,7 @@ LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) { } bool LZMA2Codec::doNeedsUncompressedLength() const { - return !encodeSize(); + return false; } uint64_t LZMA2Codec::doMaxUncompressedLength() const { @@ -893,27 +1263,25 @@ std::unique_ptr LZMA2Codec::doUncompress(const IOBuf* data, SCOPE_EXIT { lzma_end(&stream); }; // Max 64MiB in one go - constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB - constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB + constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB + constexpr uint32_t defaultBufferLength = uint32_t(256) << 10; // 256 KiB folly::io::Cursor cursor(data); - uint64_t actualUncompressedLength; if (encodeSize()) { - actualUncompressedLength = decodeVarintFromCursor(cursor); + const uint64_t actualUncompressedLength = decodeVarintFromCursor(cursor); if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && uncompressedLength != actualUncompressedLength) { throw std::runtime_error("LZMA2Codec: invalid uncompressed length"); } - } else { - actualUncompressedLength = uncompressedLength; - DCHECK_NE(actualUncompressedLength, UNKNOWN_UNCOMPRESSED_LENGTH); + uncompressedLength = actualUncompressedLength; } auto out = addOutputBuffer( &stream, - (actualUncompressedLength <= maxSingleStepLength ? - actualUncompressedLength : - defaultBufferLength)); + ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + uncompressedLength <= maxSingleStepLength) + ? uncompressedLength + : defaultBufferLength)); bool streamEnd = false; auto buf = cursor.peekBytes(); @@ -940,9 +1308,10 @@ std::unique_ptr LZMA2Codec::doUncompress(const IOBuf* data, out->prev()->trimEnd(stream.avail_out); - if (actualUncompressedLength != stream.total_out) { - throw std::runtime_error(to( - "LZMA2Codec: invalid uncompressed length")); + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + uncompressedLength != stream.total_out) { + throw std::runtime_error( + to("LZMA2Codec: invalid uncompressed length")); } return out; @@ -960,6 +1329,10 @@ class ZSTDCodec final : public Codec { static std::unique_ptr create(int level, CodecType); explicit ZSTDCodec(int level, CodecType type); + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, uint64_t uncompressedLength) + const override; + private: bool doNeedsUncompressedLength() const override; std::unique_ptr doCompress(const IOBuf* data) override; @@ -970,6 +1343,16 @@ class ZSTDCodec final : public Codec { int level_; }; +static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528; + +std::vector ZSTDCodec::validPrefixes() const { + return {prefixToStringLE(kZSTDMagicLE)}; +} + +bool ZSTDCodec::canUncompress(const IOBuf* data, uint64_t) const { + return dataStartsWithLE(data, kZSTDMagicLE); +} + std::unique_ptr ZSTDCodec::create(int level, CodecType type) { return make_unique(level, type); } @@ -1059,7 +1442,28 @@ std::unique_ptr ZSTDCodec::doCompress(const IOBuf* data) { return result; } -std::unique_ptr ZSTDCodec::doUncompress( +static std::unique_ptr zstdUncompressBuffer( + const IOBuf* data, + uint64_t uncompressedLength) { + // Check preconditions + DCHECK(!data->isChained()); + DCHECK(uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH); + + auto uncompressed = IOBuf::create(uncompressedLength); + const auto decompressedSize = ZSTD_decompress( + uncompressed->writableTail(), + uncompressed->tailroom(), + data->data(), + data->length()); + zstdThrowIfError(decompressedSize); + if (decompressedSize != uncompressedLength) { + throw std::runtime_error("ZSTD: invalid uncompressed length"); + } + uncompressed->append(decompressedSize); + return uncompressed; +} + +static std::unique_ptr zstdUncompressStream( const IOBuf* data, uint64_t uncompressedLength) { auto zds = ZSTD_createDStream(); @@ -1074,14 +1478,8 @@ std::unique_ptr ZSTDCodec::doUncompress( ZSTD_inBuffer in{}; auto outputSize = ZSTD_DStreamOutSize(); - if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) { + if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH) { outputSize = uncompressedLength; - } else { - auto decompressedSize = - ZSTD_getDecompressedSize(data->data(), data->length()); - if (decompressedSize != 0 && decompressedSize < outputSize) { - outputSize = decompressedSize; - } } IOBufQueue queue(IOBufQueue::cacheChainLength()); @@ -1120,7 +1518,7 @@ std::unique_ptr ZSTDCodec::doUncompress( if (in.pos != in.size || !cursor.isAtEnd()) { throw std::runtime_error("ZSTD: junk after end of data"); } - if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH && queue.chainLength() != uncompressedLength) { throw std::runtime_error("ZSTD: invalid uncompressed length"); } @@ -1128,67 +1526,263 @@ std::unique_ptr ZSTDCodec::doUncompress( return queue.move(); } +std::unique_ptr ZSTDCodec::doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) { + { + // Read decompressed size from frame if available in first IOBuf. + const auto decompressedSize = + ZSTD_getDecompressedSize(data->data(), data->length()); + if (decompressedSize != 0) { + if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH && + uncompressedLength != decompressedSize) { + throw std::runtime_error("ZSTD: invalid uncompressed length"); + } + uncompressedLength = decompressedSize; + } + } + // Faster to decompress using ZSTD_decompress() if we can. + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && !data->isChained()) { + return zstdUncompressBuffer(data, uncompressedLength); + } + // Fall back to slower streaming decompression. + return zstdUncompressStream(data, uncompressedLength); +} + #endif // FOLLY_HAVE_LIBZSTD -} // namespace +/** + * Automatic decompression + */ +class AutomaticCodec final : public Codec { + public: + static std::unique_ptr create( + std::vector> customCodecs); + explicit AutomaticCodec(std::vector> customCodecs); -std::unique_ptr getCodec(CodecType type, int level) { - typedef std::unique_ptr (*CodecFactory)(int, CodecType); + std::vector validPrefixes() const override; + bool canUncompress(const IOBuf* data, uint64_t uncompressedLength) + const override; - static CodecFactory codecFactories[ - static_cast(CodecType::NUM_CODEC_TYPES)] = { - nullptr, // USER_DEFINED - NoCompressionCodec::create, + private: + bool doNeedsUncompressedLength() const override; + uint64_t doMaxUncompressedLength() const override; + + std::unique_ptr doCompress(const IOBuf*) override { + throw std::runtime_error("AutomaticCodec error: compress() not supported."); + } + std::unique_ptr doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) override; + + void addCodecIfSupported(CodecType type); + + // Throws iff the codecs aren't compatible (very slow) + void checkCompatibleCodecs() const; + + std::vector> codecs_; + bool needsUncompressedLength_; + uint64_t maxUncompressedLength_; +}; + +std::vector AutomaticCodec::validPrefixes() const { + std::unordered_set prefixes; + for (const auto& codec : codecs_) { + const auto codecPrefixes = codec->validPrefixes(); + prefixes.insert(codecPrefixes.begin(), codecPrefixes.end()); + } + return std::vector{prefixes.begin(), prefixes.end()}; +} + +bool AutomaticCodec::canUncompress( + const IOBuf* data, + uint64_t uncompressedLength) const { + return std::any_of( + codecs_.begin(), + codecs_.end(), + [data, uncompressedLength](std::unique_ptr const& codec) { + return codec->canUncompress(data, uncompressedLength); + }); +} + +void AutomaticCodec::addCodecIfSupported(CodecType type) { + const bool present = std::any_of( + codecs_.begin(), + codecs_.end(), + [&type](std::unique_ptr const& codec) { + return codec->type() == type; + }); + if (hasCodec(type) && !present) { + codecs_.push_back(getCodec(type)); + } +} + +/* static */ std::unique_ptr AutomaticCodec::create( + std::vector> customCodecs) { + return make_unique(std::move(customCodecs)); +} + +AutomaticCodec::AutomaticCodec(std::vector> customCodecs) + : Codec(CodecType::USER_DEFINED), codecs_(std::move(customCodecs)) { + // Fastest -> slowest + addCodecIfSupported(CodecType::LZ4_FRAME); + addCodecIfSupported(CodecType::ZSTD); + addCodecIfSupported(CodecType::ZLIB); + addCodecIfSupported(CodecType::GZIP); + addCodecIfSupported(CodecType::LZMA2); + if (kIsDebug) { + checkCompatibleCodecs(); + } + // Check that none of the codes are are null + DCHECK(std::none_of( + codecs_.begin(), codecs_.end(), [](std::unique_ptr const& codec) { + return codec == nullptr; + })); + + needsUncompressedLength_ = std::any_of( + codecs_.begin(), codecs_.end(), [](std::unique_ptr const& codec) { + return codec->needsUncompressedLength(); + }); + + const auto it = std::max_element( + codecs_.begin(), + codecs_.end(), + [](std::unique_ptr const& lhs, std::unique_ptr const& rhs) { + return lhs->maxUncompressedLength() < rhs->maxUncompressedLength(); + }); + DCHECK(it != codecs_.end()); + maxUncompressedLength_ = (*it)->maxUncompressedLength(); +} + +void AutomaticCodec::checkCompatibleCodecs() const { + // Keep track of all the possible headers. + std::unordered_set headers; + // The empty header is not allowed. + headers.insert(""); + // Step 1: + // Construct a set of headers and check that none of the headers occur twice. + // Eliminate edge cases. + for (auto&& codec : codecs_) { + const auto codecHeaders = codec->validPrefixes(); + // Codecs without any valid headers are not allowed. + if (codecHeaders.empty()) { + throw std::invalid_argument{ + "AutomaticCodec: validPrefixes() must not be empty."}; + } + // Insert all the headers for the current codec. + const size_t beforeSize = headers.size(); + headers.insert(codecHeaders.begin(), codecHeaders.end()); + // Codecs are not compatible if any header occurred twice. + if (beforeSize + codecHeaders.size() != headers.size()) { + throw std::invalid_argument{ + "AutomaticCodec: Two valid prefixes collide."}; + } + } + // Step 2: + // Check if any strict non-empty prefix of any header is a header. + for (const auto& header : headers) { + for (size_t i = 1; i < header.size(); ++i) { + if (headers.count(header.substr(0, i))) { + throw std::invalid_argument{ + "AutomaticCodec: One valid prefix is a prefix of another valid " + "prefix."}; + } + } + } +} + +bool AutomaticCodec::doNeedsUncompressedLength() const { + return needsUncompressedLength_; +} + +uint64_t AutomaticCodec::doMaxUncompressedLength() const { + return maxUncompressedLength_; +} + +std::unique_ptr AutomaticCodec::doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) { + for (auto&& codec : codecs_) { + if (codec->canUncompress(data, uncompressedLength)) { + return codec->uncompress(data, uncompressedLength); + } + } + throw std::runtime_error("AutomaticCodec error: Unknown compressed data"); +} + +} // namespace + +typedef std::unique_ptr (*CodecFactory)(int, CodecType); +static constexpr CodecFactory + codecFactories[static_cast(CodecType::NUM_CODEC_TYPES)] = { + nullptr, // USER_DEFINED + NoCompressionCodec::create, #if FOLLY_HAVE_LIBLZ4 - LZ4Codec::create, + LZ4Codec::create, #else - nullptr, + nullptr, #endif #if FOLLY_HAVE_LIBSNAPPY - SnappyCodec::create, + SnappyCodec::create, #else - nullptr, + nullptr, #endif #if FOLLY_HAVE_LIBZ - ZlibCodec::create, + ZlibCodec::create, #else - nullptr, + nullptr, #endif #if FOLLY_HAVE_LIBLZ4 - LZ4Codec::create, + LZ4Codec::create, #else - nullptr, + nullptr, #endif #if FOLLY_HAVE_LIBLZMA - LZMA2Codec::create, - LZMA2Codec::create, + LZMA2Codec::create, + LZMA2Codec::create, #else - nullptr, - nullptr, + nullptr, + nullptr, #endif #if FOLLY_HAVE_LIBZSTD - ZSTDCodec::create, + ZSTDCodec::create, #else - nullptr, + nullptr, #endif #if FOLLY_HAVE_LIBZ - ZlibCodec::create, + ZlibCodec::create, #else - nullptr, + nullptr, #endif - }; +#if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301) + LZ4FrameCodec::create, +#else + nullptr, +#endif +}; + +bool hasCodec(CodecType type) { size_t idx = static_cast(type); if (idx >= static_cast(CodecType::NUM_CODEC_TYPES)) { - throw std::invalid_argument(to( - "Compression type ", idx, " not supported")); + throw std::invalid_argument( + to("Compression type ", idx, " invalid")); + } + return codecFactories[idx] != nullptr; +} + +std::unique_ptr getCodec(CodecType type, int level) { + size_t idx = static_cast(type); + if (idx >= static_cast(CodecType::NUM_CODEC_TYPES)) { + throw std::invalid_argument( + to("Compression type ", idx, " invalid")); } auto factory = codecFactories[idx]; if (!factory) { @@ -1200,4 +1794,8 @@ std::unique_ptr getCodec(CodecType type, int level) { return codec; } +std::unique_ptr getAutoUncompressionCodec( + std::vector> customCodecs) { + return AutomaticCodec::create(std::move(customCodecs)); +} }} // namespaces