From d71636e2f138a238cd7ffece20c7519632a56b71 Mon Sep 17 00:00:00 2001 From: Tudor Bosman Date: Wed, 14 Aug 2013 19:37:40 -0700 Subject: [PATCH] Add Varint-length-prefixed flavor of LZ4 Test Plan: test added Reviewed By: alandau@fb.com FB internal diff: D928836 --- folly/io/Compression.cpp | 139 ++++++++++++++++++------------ folly/io/Compression.h | 34 ++++++-- folly/io/test/CompressionTest.cpp | 11 ++- 3 files changed, 120 insertions(+), 64 deletions(-) diff --git a/folly/io/Compression.cpp b/folly/io/Compression.cpp index f46df401..951619a1 100644 --- a/folly/io/Compression.cpp +++ b/folly/io/Compression.cpp @@ -27,10 +27,13 @@ #include "folly/Memory.h" #include "folly/Portability.h" #include "folly/ScopeGuard.h" +#include "folly/Varint.h" #include "folly/io/Cursor.h" namespace folly { namespace io { +Codec::Codec(CodecType type) : type_(type) { } + // Ensure consistent behavior in the nullptr case std::unique_ptr Codec::compress(const IOBuf* data) { return !data->empty() ? doCompress(data) : IOBuf::create(0); @@ -65,10 +68,6 @@ uint64_t Codec::maxUncompressedLength() const { return doMaxUncompressedLength(); } -CodecType Codec::type() const { - return doType(); -} - bool Codec::doNeedsUncompressedLength() const { return false; } @@ -84,22 +83,23 @@ namespace { */ class NoCompressionCodec FOLLY_FINAL : public Codec { public: - static std::unique_ptr create(int level); - explicit NoCompressionCodec(int level); + static std::unique_ptr create(int level, CodecType type); + explicit NoCompressionCodec(int level, CodecType type); private: - CodecType doType() const FOLLY_OVERRIDE; std::unique_ptr doCompress(const IOBuf* data) FOLLY_OVERRIDE; std::unique_ptr doUncompress( const IOBuf* data, uint64_t uncompressedLength) FOLLY_OVERRIDE; }; -std::unique_ptr NoCompressionCodec::create(int level) { - return make_unique(level); +std::unique_ptr NoCompressionCodec::create(int level, CodecType type) { + return make_unique(level, type); } -NoCompressionCodec::NoCompressionCodec(int level) { +NoCompressionCodec::NoCompressionCodec(int level, CodecType type) + : Codec(type) { + DCHECK(type == CodecType::NO_COMPRESSION); switch (level) { case COMPRESSION_LEVEL_DEFAULT: case COMPRESSION_LEVEL_FASTEST: @@ -112,10 +112,6 @@ NoCompressionCodec::NoCompressionCodec(int level) { } } -CodecType NoCompressionCodec::doType() const { - return CodecType::NO_COMPRESSION; -} - std::unique_ptr NoCompressionCodec::doCompress( const IOBuf* data) { return data->clone(); @@ -137,13 +133,15 @@ std::unique_ptr NoCompressionCodec::doUncompress( */ class LZ4Codec FOLLY_FINAL : public Codec { public: - static std::unique_ptr create(int level); - explicit LZ4Codec(int level); + static std::unique_ptr create(int level, CodecType type); + explicit LZ4Codec(int level, CodecType type); private: bool doNeedsUncompressedLength() const FOLLY_OVERRIDE; uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE; - CodecType doType() const FOLLY_OVERRIDE; + + bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; } + std::unique_ptr doCompress(const IOBuf* data) FOLLY_OVERRIDE; std::unique_ptr doUncompress( const IOBuf* data, @@ -152,11 +150,13 @@ class LZ4Codec FOLLY_FINAL : public Codec { bool highCompression_; }; -std::unique_ptr LZ4Codec::create(int level) { - return make_unique(level); +std::unique_ptr LZ4Codec::create(int level, CodecType type) { + return make_unique(level, type); } -LZ4Codec::LZ4Codec(int level) { +LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) { + DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE); + switch (level) { case COMPRESSION_LEVEL_FASTEST: case COMPRESSION_LEVEL_DEFAULT: @@ -174,7 +174,7 @@ LZ4Codec::LZ4Codec(int level) { } bool LZ4Codec::doNeedsUncompressedLength() const { - return true; + return !encodeSize(); } uint64_t LZ4Codec::doMaxUncompressedLength() const { @@ -183,10 +183,24 @@ uint64_t LZ4Codec::doMaxUncompressedLength() const { return 1.8 * (uint64_t(1) << 30); } -CodecType LZ4Codec::doType() const { - return CodecType::LZ4; +namespace { + +void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) { + DCHECK_GE(out->tailroom(), kMaxVarintLength64); + out->append(encodeVarint(val, out->writableTail())); +} + +uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) { + // Must have enough room in *this* buffer. + auto p = cursor.peek(); + folly::ByteRange range(p.first, p.second); + uint64_t val = decodeVarint(range); + cursor.skip(range.data() - p.first); + return val; } +} // namespace + std::unique_ptr LZ4Codec::doCompress(const IOBuf* data) { std::unique_ptr clone; if (data->isChained()) { @@ -196,16 +210,21 @@ std::unique_ptr LZ4Codec::doCompress(const IOBuf* data) { data = clone.get(); } - auto out = IOBuf::create(LZ4_compressBound(data->length())); + uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0; + auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length())); + if (encodeSize()) { + encodeVarintToIOBuf(data->length(), out.get()); + } + int n; if (highCompression_) { - n = LZ4_compress(reinterpret_cast(data->data()), - reinterpret_cast(out->writableTail()), - data->length()); - } else { n = LZ4_compressHC(reinterpret_cast(data->data()), reinterpret_cast(out->writableTail()), data->length()); + } else { + n = LZ4_compress(reinterpret_cast(data->data()), + reinterpret_cast(out->writableTail()), + data->length()); } CHECK_GE(n, 0); @@ -226,15 +245,29 @@ std::unique_ptr LZ4Codec::doUncompress( data = clone.get(); } - auto out = IOBuf::create(uncompressedLength); - int n = LZ4_uncompress(reinterpret_cast(data->data()), + folly::io::Cursor cursor(data); + uint64_t actualUncompressedLength; + if (encodeSize()) { + actualUncompressedLength = decodeVarintFromCursor(cursor); + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + uncompressedLength != actualUncompressedLength) { + throw std::runtime_error("LZ4Codec: invalid uncompressed length"); + } + } else { + actualUncompressedLength = uncompressedLength; + DCHECK_NE(actualUncompressedLength, UNKNOWN_UNCOMPRESSED_LENGTH); + } + + auto out = IOBuf::create(actualUncompressedLength); + auto p = cursor.peek(); + int n = LZ4_uncompress(reinterpret_cast(p.first), reinterpret_cast(out->writableTail()), - uncompressedLength); - if (n != data->length()) { + actualUncompressedLength); + if (n != p.second) { throw std::runtime_error(to( "LZ4 decompression returned invalid value ", n)); } - out->append(uncompressedLength); + out->append(actualUncompressedLength); return out; } @@ -279,23 +312,23 @@ void IOBufSnappySource::Skip(size_t n) { class SnappyCodec FOLLY_FINAL : public Codec { public: - static std::unique_ptr create(int level); - explicit SnappyCodec(int level); + static std::unique_ptr create(int level, CodecType type); + explicit SnappyCodec(int level, CodecType type); private: uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE; - CodecType doType() const FOLLY_OVERRIDE; std::unique_ptr doCompress(const IOBuf* data) FOLLY_OVERRIDE; std::unique_ptr doUncompress( const IOBuf* data, uint64_t uncompressedLength) FOLLY_OVERRIDE; }; -std::unique_ptr SnappyCodec::create(int level) { - return make_unique(level); +std::unique_ptr SnappyCodec::create(int level, CodecType type) { + return make_unique(level, type); } -SnappyCodec::SnappyCodec(int level) { +SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) { + DCHECK(type == CodecType::SNAPPY); switch (level) { case COMPRESSION_LEVEL_FASTEST: case COMPRESSION_LEVEL_DEFAULT: @@ -313,10 +346,6 @@ uint64_t SnappyCodec::doMaxUncompressedLength() const { return std::numeric_limits::max(); } -CodecType SnappyCodec::doType() const { - return CodecType::SNAPPY; -} - std::unique_ptr SnappyCodec::doCompress(const IOBuf* data) { IOBufSnappySource source(data); auto out = @@ -366,11 +395,10 @@ std::unique_ptr SnappyCodec::doUncompress(const IOBuf* data, */ class ZlibCodec FOLLY_FINAL : public Codec { public: - static std::unique_ptr create(int level); - explicit ZlibCodec(int level); + static std::unique_ptr create(int level, CodecType type); + explicit ZlibCodec(int level, CodecType type); private: - CodecType doType() const FOLLY_OVERRIDE; std::unique_ptr doCompress(const IOBuf* data) FOLLY_OVERRIDE; std::unique_ptr doUncompress( const IOBuf* data, @@ -382,11 +410,12 @@ class ZlibCodec FOLLY_FINAL : public Codec { int level_; }; -std::unique_ptr ZlibCodec::create(int level) { - return make_unique(level); +std::unique_ptr ZlibCodec::create(int level, CodecType type) { + return make_unique(level, type); } -ZlibCodec::ZlibCodec(int level) { +ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) { + DCHECK(type == CodecType::ZLIB); switch (level) { case COMPRESSION_LEVEL_FASTEST: level = 1; @@ -405,10 +434,6 @@ ZlibCodec::ZlibCodec(int level) { level_ = level; } -CodecType ZlibCodec::doType() const { - return CodecType::ZLIB; -} - std::unique_ptr ZlibCodec::addOutputBuffer(z_stream* stream, uint32_t length) { CHECK_EQ(stream->avail_out, 0); @@ -599,14 +624,16 @@ std::unique_ptr ZlibCodec::doUncompress(const IOBuf* data, return out; } -typedef std::unique_ptr (*CodecFactory)(int); +typedef std::unique_ptr (*CodecFactory)(int, CodecType); CodecFactory gCodecFactories[ static_cast(CodecType::NUM_CODEC_TYPES)] = { + nullptr, // USER_DEFINED NoCompressionCodec::create, LZ4Codec::create, SnappyCodec::create, - ZlibCodec::create + ZlibCodec::create, + LZ4Codec::create }; } // namespace @@ -622,7 +649,7 @@ std::unique_ptr getCodec(CodecType type, int level) { throw std::invalid_argument(to( "Compression type ", idx, " not supported")); } - auto codec = (*factory)(level); + auto codec = (*factory)(level, type); DCHECK_EQ(static_cast(codec->type()), idx); return codec; } diff --git a/folly/io/Compression.h b/folly/io/Compression.h index 5a5f9d89..1edba5e6 100644 --- a/folly/io/Compression.h +++ b/folly/io/Compression.h @@ -30,31 +30,43 @@ namespace folly { namespace io { enum class CodecType { + /** + * This codec type is not defined; getCodec() will throw an exception + * if used. Useful if deriving your own classes from Codec without + * going through the getCodec() interface. + */ + USER_DEFINED = 0, + /** * Use no compression. * Levels supported: 0 */ - NO_COMPRESSION = 0, + NO_COMPRESSION = 1, /** * Use LZ4 compression. * Levels supported: 1 = fast, 2 = best; default = 1 */ - LZ4 = 1, + LZ4 = 2, /** * Use Snappy compression. * Levels supported: 1 */ - SNAPPY = 2, + SNAPPY = 3, /** * Use zlib compression. * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6 */ - ZLIB = 3, + ZLIB = 4, - NUM_CODEC_TYPES = 4, + /** + * Use LZ4 compression, prefixed with size (as Varint). + */ + LZ4_VARINT_SIZE = 5, + + NUM_CODEC_TYPES = 6, }; class Codec { @@ -71,7 +83,7 @@ class Codec { /** * Return the codec's type. */ - CodecType type() const; + CodecType type() const { return type_; } /** * Does this codec need the exact uncompressed length on decompression? @@ -106,15 +118,19 @@ class Codec { const IOBuf* data, uint64_t uncompressedLength = UNKNOWN_UNCOMPRESSED_LENGTH); + protected: + explicit Codec(CodecType type); + private: // default: no limits (save for special value UNKNOWN_UNCOMPRESSED_LENGTH) virtual uint64_t doMaxUncompressedLength() const; // default: doesn't need uncompressed length virtual bool doNeedsUncompressedLength() const; - virtual CodecType doType() const = 0; virtual std::unique_ptr doCompress(const folly::IOBuf* data) = 0; virtual std::unique_ptr doUncompress(const folly::IOBuf* data, uint64_t uncompressedLength) = 0; + + CodecType type_; }; constexpr int COMPRESSION_LEVEL_FASTEST = -1; @@ -132,6 +148,10 @@ constexpr int COMPRESSION_LEVEL_BEST = -3; * FASTEST and BEST) * COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory, * best compression) + * + * When decompressing, the compression level is ignored. All codecs will + * decompress all data compressed with the a codec of the same type, regardless + * of compression level. */ std::unique_ptr getCodec(CodecType type, int level = COMPRESSION_LEVEL_DEFAULT); diff --git a/folly/io/test/CompressionTest.cpp b/folly/io/test/CompressionTest.cpp index ae791cd3..57399bea 100644 --- a/folly/io/test/CompressionTest.cpp +++ b/folly/io/test/CompressionTest.cpp @@ -84,6 +84,14 @@ void generateRandomData() { } } +TEST(CompressionTestNeedsUncompressedLength, Simple) { + EXPECT_FALSE(getCodec(CodecType::NO_COMPRESSION)->needsUncompressedLength()); + EXPECT_TRUE(getCodec(CodecType::LZ4)->needsUncompressedLength()); + EXPECT_FALSE(getCodec(CodecType::SNAPPY)->needsUncompressedLength()); + EXPECT_FALSE(getCodec(CodecType::ZLIB)->needsUncompressedLength()); + EXPECT_FALSE(getCodec(CodecType::LZ4_VARINT_SIZE)->needsUncompressedLength()); +} + class CompressionTest : public testing::TestWithParam< std::tr1::tuple> { protected: @@ -123,7 +131,8 @@ INSTANTIATE_TEST_CASE_P( testing::Values(CodecType::NO_COMPRESSION, CodecType::LZ4, CodecType::SNAPPY, - CodecType::ZLIB))); + CodecType::ZLIB, + CodecType::LZ4_VARINT_SIZE))); class CompressionCorruptionTest : public testing::TestWithParam { protected: -- 2.34.1