X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2Ftest%2FCompressionTest.cpp;h=e15a18da39f459c40086ef04ba25af06a626ac17;hp=314eba5ec745112e6e20ea82e263c35d709ea9c7;hb=9883bc3793fc71b8d4c8d1adf6c6fe796e9131e5;hpb=ef40ac8213eeb977db9e9be53988bb11866c830b diff --git a/folly/io/test/CompressionTest.cpp b/folly/io/test/CompressionTest.cpp index 314eba5e..e15a18da 100644 --- a/folly/io/test/CompressionTest.cpp +++ b/folly/io/test/CompressionTest.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,19 +16,27 @@ #include +#include #include +#include #include #include +#include #include #include -#include #include #include +#include #include #include #include +#include + +#if FOLLY_HAVE_LIBZSTD +#include +#endif namespace folly { namespace io { namespace test { @@ -81,23 +89,22 @@ class RandomDataHolder : public DataHolder { RandomDataHolder::RandomDataHolder(size_t sizeLog2) : DataHolder(sizeLog2) { - constexpr size_t numThreadsLog2 = 3; - constexpr size_t numThreads = size_t(1) << numThreadsLog2; + static constexpr size_t numThreadsLog2 = 3; + static constexpr size_t numThreads = size_t(1) << numThreadsLog2; uint32_t seed = randomNumberSeed(); std::vector threads; threads.reserve(numThreads); for (size_t t = 0; t < numThreads; ++t) { - threads.emplace_back( - [this, seed, t, numThreadsLog2, sizeLog2] () { - std::mt19937 rng(seed + t); - size_t countLog2 = size_t(1) << (sizeLog2 - numThreadsLog2); - size_t start = size_t(t) << countLog2; - for (size_t i = 0; i < countLog2; ++i) { - this->data_[start + i] = rng(); - } - }); + threads.emplace_back([this, seed, t, sizeLog2] { + std::mt19937 rng(seed + t); + size_t countLog2 = sizeLog2 - numThreadsLog2; + size_t start = size_t(t) << countLog2; + for (size_t i = 0; i < countLog2; ++i) { + this->data_[start + i] = rng(); + } + }); } for (auto& t : threads) { @@ -119,38 +126,97 @@ constexpr size_t dataSizeLog2 = 27; // 128MiB RandomDataHolder randomDataHolder(dataSizeLog2); ConstantDataHolder constantDataHolder(dataSizeLog2); +// The intersection of the provided codecs & those that are compiled in. +static std::vector supportedCodecs(std::vector const& v) { + std::vector supported; + + std::copy_if( + std::begin(v), + std::end(v), + std::back_inserter(supported), + hasCodec); + + return supported; +} + +// All compiled-in compression codecs. +static std::vector availableCodecs() { + std::vector codecs; + + for (size_t i = 0; i < static_cast(CodecType::NUM_CODEC_TYPES); ++i) { + auto type = static_cast(i); + if (hasCodec(type)) { + codecs.push_back(type); + } + } + + return codecs; +} + +static std::vector availableStreamCodecs() { + std::vector codecs; + + for (size_t i = 0; i < static_cast(CodecType::NUM_CODEC_TYPES); ++i) { + auto type = static_cast(i); + if (hasStreamCodec(type)) { + codecs.push_back(type); + } + } + + return codecs; +} + TEST(CompressionTestNeedsUncompressedLength, Simple) { - EXPECT_FALSE(getCodec(CodecType::NO_COMPRESSION)->needsUncompressedLength()); - EXPECT_TRUE(getCodec(CodecType::LZ4)->needsUncompressedLength()); - EXPECT_FALSE(getCodec(CodecType::SNAPPY)->needsUncompressedLength()); - EXPECT_FALSE(getCodec(CodecType::ZLIB)->needsUncompressedLength()); - EXPECT_FALSE(getCodec(CodecType::LZ4_VARINT_SIZE)->needsUncompressedLength()); - EXPECT_TRUE(getCodec(CodecType::LZMA2)->needsUncompressedLength()); - EXPECT_FALSE(getCodec(CodecType::LZMA2_VARINT_SIZE) - ->needsUncompressedLength()); + static const struct { CodecType type; bool needsUncompressedLength; } + expectations[] = { + { CodecType::NO_COMPRESSION, false }, + { CodecType::LZ4, true }, + { CodecType::SNAPPY, false }, + { CodecType::ZLIB, false }, + { CodecType::LZ4_VARINT_SIZE, false }, + { CodecType::LZMA2, false }, + { CodecType::LZMA2_VARINT_SIZE, false }, + { CodecType::ZSTD, false }, + { CodecType::GZIP, false }, + { CodecType::LZ4_FRAME, false }, + { CodecType::BZIP2, false }, + }; + + for (auto const& test : expectations) { + if (hasCodec(test.type)) { + EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(), + test.needsUncompressedLength); + } + } } class CompressionTest - : public testing::TestWithParam> { - protected: - void SetUp() override { - auto tup = GetParam(); - uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup); - codec_ = getCodec(std::tr1::get<1>(tup)); - } + : public testing::TestWithParam> { + protected: + void SetUp() override { + auto tup = GetParam(); + uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup); + chunks_ = std::tr1::get<1>(tup); + codec_ = getCodec(std::tr1::get<2>(tup)); + } + + void runSimpleIOBufTest(const DataHolder& dh); - void runSimpleTest(const DataHolder& dh); + void runSimpleStringTest(const DataHolder& dh); - uint64_t uncompressedLength_; - std::unique_ptr codec_; + private: + std::unique_ptr split(std::unique_ptr data) const; + + uint64_t uncompressedLength_; + size_t chunks_; + std::unique_ptr codec_; }; -void CompressionTest::runSimpleTest(const DataHolder& dh) { - auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_)); - auto compressed = codec_->compress(original.get()); +void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) { + const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_))); + const auto compressed = split(codec_->compress(original.get())); if (!codec_->needsUncompressedLength()) { auto uncompressed = codec_->uncompress(compressed.get()); - EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength()); EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); } @@ -162,25 +228,72 @@ void CompressionTest::runSimpleTest(const DataHolder& dh) { } } +void CompressionTest::runSimpleStringTest(const DataHolder& dh) { + const auto original = std::string( + reinterpret_cast(dh.data(uncompressedLength_).data()), + uncompressedLength_); + const auto compressed = codec_->compress(original); + if (!codec_->needsUncompressedLength()) { + auto uncompressed = codec_->uncompress(compressed); + EXPECT_EQ(uncompressedLength_, uncompressed.length()); + EXPECT_EQ(uncompressed, original); + } + { + auto uncompressed = codec_->uncompress(compressed, uncompressedLength_); + EXPECT_EQ(uncompressedLength_, uncompressed.length()); + EXPECT_EQ(uncompressed, original); + } +} + +// Uniformly split data into (potentially empty) chunks. +std::unique_ptr CompressionTest::split( + std::unique_ptr data) const { + if (data->isChained()) { + data->coalesce(); + } + + const size_t size = data->computeChainDataLength(); + + std::multiset splits; + for (size_t i = 1; i < chunks_; ++i) { + splits.insert(Random::rand64(size)); + } + + folly::IOBufQueue result; + + size_t offset = 0; + for (size_t split : splits) { + result.append(IOBuf::copyBuffer(data->data() + offset, split - offset)); + offset = split; + } + result.append(IOBuf::copyBuffer(data->data() + offset, size - offset)); + + return result.move(); +} + TEST_P(CompressionTest, RandomData) { - runSimpleTest(randomDataHolder); + runSimpleIOBufTest(randomDataHolder); } TEST_P(CompressionTest, ConstantData) { - runSimpleTest(constantDataHolder); + runSimpleIOBufTest(constantDataHolder); +} + +TEST_P(CompressionTest, RandomDataString) { + runSimpleStringTest(randomDataHolder); +} + +TEST_P(CompressionTest, ConstantDataString) { + runSimpleStringTest(constantDataHolder); } INSTANTIATE_TEST_CASE_P( CompressionTest, CompressionTest, - testing::Combine(testing::Values(0, 1, 12, 22, 25, 27), - testing::Values(CodecType::NO_COMPRESSION, - CodecType::LZ4, - CodecType::SNAPPY, - CodecType::ZLIB, - CodecType::LZ4_VARINT_SIZE, - CodecType::LZMA2, - CodecType::LZMA2_VARINT_SIZE))); + testing::Combine( + testing::Values(0, 1, 12, 22, 25, 27), + testing::Values(1, 2, 3, 8, 65), + testing::ValuesIn(availableCodecs()))); class CompressionVarintTest : public testing::TestWithParam> { @@ -209,7 +322,8 @@ void CompressionVarintTest::runSimpleTest(const DataHolder& dh) { auto compressed = codec_->compress(original.get()); auto breakPoint = 1UL + - Random::rand64(std::max(9UL, oneBasedMsbPos(uncompressedLength_)) / 9UL); + Random::rand64( + std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL); auto tinyBuf = IOBuf::copyBuffer(compressed->data(), std::min(compressed->length(), breakPoint)); compressed->trimStart(breakPoint); @@ -222,7 +336,9 @@ void CompressionVarintTest::runSimpleTest(const DataHolder& dh) { EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); } -TEST_P(CompressionVarintTest, RandomData) { runSimpleTest(randomDataHolder); } +TEST_P(CompressionVarintTest, RandomData) { + runSimpleTest(randomDataHolder); +} TEST_P(CompressionVarintTest, ConstantData) { runSimpleTest(constantDataHolder); @@ -231,9 +347,12 @@ TEST_P(CompressionVarintTest, ConstantData) { INSTANTIATE_TEST_CASE_P( CompressionVarintTest, CompressionVarintTest, - testing::Combine(testing::Values(0, 1, 12, 22, 25, 27), - testing::Values(CodecType::LZ4_VARINT_SIZE, - CodecType::LZMA2_VARINT_SIZE))); + testing::Combine( + testing::Values(0, 1, 12, 22, 25, 27), + testing::ValuesIn(supportedCodecs({ + CodecType::LZ4_VARINT_SIZE, + CodecType::LZMA2_VARINT_SIZE, + })))); class CompressionCorruptionTest : public testing::TestWithParam { protected: @@ -264,15 +383,29 @@ void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) { EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1), std::runtime_error); + auto corrupted = compressed->clone(); + corrupted->unshare(); + // Truncate the last character + corrupted->prev()->trimEnd(1); + if (!codec_->needsUncompressedLength()) { + EXPECT_THROW(codec_->uncompress(corrupted.get()), + std::runtime_error); + } + + EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength), + std::runtime_error); + + corrupted = compressed->clone(); + corrupted->unshare(); // Corrupt the first character - ++(compressed->writableData()[0]); + ++(corrupted->writableData()[0]); if (!codec_->needsUncompressedLength()) { - EXPECT_THROW(codec_->uncompress(compressed.get()), + EXPECT_THROW(codec_->uncompress(corrupted.get()), std::runtime_error); } - EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength), + EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength), std::runtime_error); } @@ -287,12 +420,713 @@ TEST_P(CompressionCorruptionTest, ConstantData) { INSTANTIATE_TEST_CASE_P( CompressionCorruptionTest, CompressionCorruptionTest, - testing::Values( + testing::ValuesIn( // NO_COMPRESSION can't detect corruption // LZ4 can't detect corruption reliably (sigh) - CodecType::SNAPPY, - CodecType::ZLIB)); + supportedCodecs({ + CodecType::SNAPPY, + CodecType::ZLIB, + CodecType::LZMA2, + CodecType::ZSTD, + CodecType::LZ4_FRAME, + CodecType::BZIP2, + }))); + +class StreamingUnitTest : public testing::TestWithParam { + protected: + void SetUp() override { + codec_ = getStreamCodec(GetParam()); + } + + std::unique_ptr codec_; +}; + +TEST_P(StreamingUnitTest, maxCompressedLength) { + EXPECT_EQ(0, codec_->maxCompressedLength(0)); + for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) { + EXPECT_GE(codec_->maxCompressedLength(length), length); + } +} + +TEST_P(StreamingUnitTest, getUncompressedLength) { + auto const empty = IOBuf::create(0); + EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get())); + EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0)); + + auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100)); + auto const compressed = codec_->compress(data.get()); + + EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0)); + if (auto const length = codec_->getUncompressedLength(data.get())) { + EXPECT_EQ(100, *length); + } + EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100)); + // If the uncompressed length is stored in the frame, then make sure it throws + // when it is given the wrong length. + if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) { + EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200)); + } +} + +TEST_P(StreamingUnitTest, emptyData) { + ByteRange input{}; + auto buffer = IOBuf::create(1); + buffer->append(buffer->capacity()); + MutableByteRange output{}; + + // Test compressing empty data in one pass + EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END)); + codec_->resetStream(0); + EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END)); + codec_->resetStream(); + output = {buffer->writableData(), buffer->length()}; + EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END)); + EXPECT_EQ(buffer->length(), output.size()); + + // Test compressing empty data with multiple calls to compressStream() + codec_->resetStream(); + output = {}; + EXPECT_FALSE(codec_->compressStream(input, output)); + EXPECT_TRUE( + codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH)); + EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END)); + codec_->resetStream(); + output = {buffer->writableData(), buffer->length()}; + EXPECT_FALSE(codec_->compressStream(input, output)); + EXPECT_TRUE( + codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH)); + EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END)); + EXPECT_EQ(buffer->length(), output.size()); + + // Test uncompressing empty data + output = {}; + codec_->resetStream(); + EXPECT_TRUE(codec_->uncompressStream(input, output)); + codec_->resetStream(); + EXPECT_TRUE( + codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH)); + codec_->resetStream(); + EXPECT_TRUE( + codec_->uncompressStream(input, output, StreamCodec::FlushOp::END)); + codec_->resetStream(0); + EXPECT_TRUE(codec_->uncompressStream(input, output)); + codec_->resetStream(0); + EXPECT_TRUE( + codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH)); + codec_->resetStream(0); + EXPECT_TRUE( + codec_->uncompressStream(input, output, StreamCodec::FlushOp::END)); +} + +TEST_P(StreamingUnitTest, noForwardProgressOkay) { + auto inBuffer = IOBuf::create(2); + inBuffer->writableData()[0] = 'a'; + inBuffer->writableData()[0] = 'a'; + inBuffer->append(2); + auto input = inBuffer->coalesce(); + auto compressed = codec_->compress(inBuffer.get()); + + auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2)); + MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()}; + + ByteRange emptyInput; + MutableByteRange emptyOutput; + + // Compress some data to avoid empty data special casing + codec_->resetStream(); + while (!input.empty()) { + codec_->compressStream(input, output); + } + // empty input and output is okay for flush NONE and FLUSH. + codec_->compressStream(emptyInput, emptyOutput); + codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH); + + codec_->resetStream(); + input = inBuffer->coalesce(); + output = {outBuffer->writableTail(), outBuffer->tailroom()}; + while (!input.empty()) { + codec_->compressStream(input, output); + } + // empty input and output is okay for flush END. + codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END); + + codec_->resetStream(); + input = compressed->coalesce(); + input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete + output = {inBuffer->writableData(), inBuffer->length()}; + // Uncompress some data to avoid empty data special casing + while (!input.empty()) { + EXPECT_FALSE(codec_->uncompressStream(input, output)); + } + // empty input and output is okay for all flush values. + EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput)); + EXPECT_FALSE(codec_->uncompressStream( + emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH)); + EXPECT_FALSE(codec_->uncompressStream( + emptyInput, emptyOutput, StreamCodec::FlushOp::END)); +} + +TEST_P(StreamingUnitTest, stateTransitions) { + auto inBuffer = IOBuf::create(1); + inBuffer->writableData()[0] = 'a'; + inBuffer->append(1); + auto compressed = codec_->compress(inBuffer.get()); + ByteRange const in = compressed->coalesce(); + auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size())); + MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()}; + + auto compress = [&]( + StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE, + bool empty = false) { + auto input = in; + auto output = empty ? MutableByteRange{} : out; + return codec_->compressStream(input, output, flushOp); + }; + auto uncompress = [&]( + StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE, + bool empty = false) { + auto input = in; + auto output = empty ? MutableByteRange{} : out; + return codec_->uncompressStream(input, output, flushOp); + }; + + // compression flow + codec_->resetStream(); + EXPECT_FALSE(compress()); + EXPECT_FALSE(compress()); + EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH)); + EXPECT_FALSE(compress()); + EXPECT_TRUE(compress(StreamCodec::FlushOp::END)); + // uncompression flow + codec_->resetStream(); + EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true)); + codec_->resetStream(); + EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true)); + codec_->resetStream(); + EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true)); + codec_->resetStream(); + EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true)); + codec_->resetStream(); + EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH)); + // compress -> uncompress + codec_->resetStream(); + EXPECT_FALSE(compress()); + EXPECT_THROW(uncompress(), std::logic_error); + // uncompress -> compress + codec_->resetStream(); + EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH)); + EXPECT_THROW(compress(), std::logic_error); + // end -> compress + codec_->resetStream(); + EXPECT_FALSE(compress()); + EXPECT_TRUE(compress(StreamCodec::FlushOp::END)); + EXPECT_THROW(compress(), std::logic_error); + // end -> uncompress + codec_->resetStream(); + EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH)); + EXPECT_THROW(uncompress(), std::logic_error); + // flush -> compress + codec_->resetStream(); + EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true)); + EXPECT_THROW(compress(), std::logic_error); + // flush -> end + codec_->resetStream(); + EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true)); + EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error); + // undefined -> compress + codec_->compress(inBuffer.get()); + EXPECT_THROW(compress(), std::logic_error); + codec_->uncompress(compressed.get()); + EXPECT_THROW(compress(), std::logic_error); + // undefined -> undefined + codec_->uncompress(compressed.get()); + codec_->compress(inBuffer.get()); +} + +INSTANTIATE_TEST_CASE_P( + StreamingUnitTest, + StreamingUnitTest, + testing::ValuesIn(availableStreamCodecs())); + +class StreamingCompressionTest + : public testing::TestWithParam> { + protected: + void SetUp() override { + auto const tup = GetParam(); + uncompressedLength_ = uint64_t(1) << std::get<0>(tup); + chunkSize_ = size_t(1) << std::get<1>(tup); + codec_ = getStreamCodec(std::get<2>(tup)); + } + + void runResetStreamTest(DataHolder const& dh); + void runCompressStreamTest(DataHolder const& dh); + void runUncompressStreamTest(DataHolder const& dh); + void runFlushTest(DataHolder const& dh); + + private: + std::vector split(ByteRange data) const; + + uint64_t uncompressedLength_; + size_t chunkSize_; + std::unique_ptr codec_; +}; + +std::vector StreamingCompressionTest::split(ByteRange data) const { + size_t const pieces = std::max(1, data.size() / chunkSize_); + std::vector result; + result.reserve(pieces + 1); + while (!data.empty()) { + size_t const pieceSize = std::min(data.size(), chunkSize_); + result.push_back(data.subpiece(0, pieceSize)); + data.uncheckedAdvance(pieceSize); + } + return result; +} + +static std::unique_ptr compressSome( + StreamCodec* codec, + ByteRange data, + uint64_t bufferSize, + StreamCodec::FlushOp flush) { + bool result; + IOBufQueue queue; + do { + auto buffer = IOBuf::create(bufferSize); + buffer->append(buffer->capacity()); + MutableByteRange output{buffer->writableData(), buffer->length()}; + + result = codec->compressStream(data, output, flush); + buffer->trimEnd(output.size()); + queue.append(std::move(buffer)); + + } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result); + EXPECT_TRUE(data.empty()); + return queue.move(); +} + +static std::pair> uncompressSome( + StreamCodec* codec, + ByteRange& data, + uint64_t bufferSize, + StreamCodec::FlushOp flush) { + bool result; + IOBufQueue queue; + do { + auto buffer = IOBuf::create(bufferSize); + buffer->append(buffer->capacity()); + MutableByteRange output{buffer->writableData(), buffer->length()}; + + result = codec->uncompressStream(data, output, flush); + buffer->trimEnd(output.size()); + queue.append(std::move(buffer)); + + } while (queue.tailroom() == 0 && !result); + return std::make_pair(result, queue.move()); +} + +void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) { + auto const input = dh.data(uncompressedLength_); + // Compress some but leave state unclean + codec_->resetStream(uncompressedLength_); + compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE); + // Reset stream and compress all + codec_->resetStream(); + auto compressed = + compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END); + auto const uncompressed = codec_->uncompress(compressed.get(), input.size()); + EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); +} + +TEST_P(StreamingCompressionTest, resetStream) { + runResetStreamTest(constantDataHolder); + runResetStreamTest(randomDataHolder); +} + +void StreamingCompressionTest::runCompressStreamTest( + const folly::io::test::DataHolder& dh) { + auto const inputs = split(dh.data(uncompressedLength_)); + + IOBufQueue queue; + codec_->resetStream(uncompressedLength_); + // Compress many inputs in a row + for (auto const input : inputs) { + queue.append(compressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE)); + } + // Finish the operation with empty input. + ByteRange empty; + queue.append( + compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END)); + + auto const uncompressed = codec_->uncompress(queue.front()); + EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); +} + +TEST_P(StreamingCompressionTest, compressStream) { + runCompressStreamTest(constantDataHolder); + runCompressStreamTest(randomDataHolder); +} + +void StreamingCompressionTest::runUncompressStreamTest( + const folly::io::test::DataHolder& dh) { + auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_)); + // Concatenate 3 compressed frames in a row + auto compressed = codec_->compress(data.get()); + compressed->prependChain(codec_->compress(data.get())); + compressed->prependChain(codec_->compress(data.get())); + // Pass all 3 compressed frames in one input buffer + auto input = compressed->coalesce(); + // Uncompress the first frame + codec_->resetStream(data->computeChainDataLength()); + { + auto const result = uncompressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH); + ASSERT_TRUE(result.first); + ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); + } + // Uncompress the second frame + codec_->resetStream(); + { + auto const result = uncompressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END); + ASSERT_TRUE(result.first); + ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); + } + // Uncompress the third frame + codec_->resetStream(); + { + auto const result = uncompressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH); + ASSERT_TRUE(result.first); + ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); + } + EXPECT_TRUE(input.empty()); +} + +TEST_P(StreamingCompressionTest, uncompressStream) { + runUncompressStreamTest(constantDataHolder); + runUncompressStreamTest(randomDataHolder); +} + +void StreamingCompressionTest::runFlushTest(DataHolder const& dh) { + auto const inputs = split(dh.data(uncompressedLength_)); + auto uncodec = getStreamCodec(codec_->type()); + + codec_->resetStream(); + for (auto input : inputs) { + // Compress some data and flush the stream + auto compressed = compressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH); + auto compressedRange = compressed->coalesce(); + // Uncompress the compressed data + auto result = uncompressSome( + uncodec.get(), + compressedRange, + chunkSize_, + StreamCodec::FlushOp::FLUSH); + // All compressed data should have been consumed + EXPECT_TRUE(compressedRange.empty()); + // The frame isn't complete + EXPECT_FALSE(result.first); + // The uncompressed data should be exactly the input data + EXPECT_EQ(input.size(), result.second->computeChainDataLength()); + auto const data = IOBuf::wrapBuffer(input); + EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); + } +} + +TEST_P(StreamingCompressionTest, testFlush) { + runFlushTest(constantDataHolder); + runFlushTest(randomDataHolder); +} + +INSTANTIATE_TEST_CASE_P( + StreamingCompressionTest, + StreamingCompressionTest, + testing::Combine( + testing::Values(0, 1, 12, 22, 27), + testing::Values(12, 17, 20), + testing::ValuesIn(availableStreamCodecs()))); + +class AutomaticCodecTest : public testing::TestWithParam { + protected: + void SetUp() override { + codec_ = getCodec(GetParam()); + auto_ = getAutoUncompressionCodec(); + } + + void runSimpleTest(const DataHolder& dh); + + std::unique_ptr codec_; + std::unique_ptr auto_; +}; + +void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) { + constexpr uint64_t uncompressedLength = 1000; + auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength)); + auto compressed = codec_->compress(original.get()); + + if (!codec_->needsUncompressedLength()) { + auto uncompressed = auto_->uncompress(compressed.get()); + EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); + EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get())); + } + { + auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength); + EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); + EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get())); + } + ASSERT_GE(compressed->computeChainDataLength(), 8); + for (size_t i = 0; i < 8; ++i) { + auto split = compressed->clone(); + auto rest = compressed->clone(); + split->trimEnd(split->length() - i); + rest->trimStart(i); + split->appendChain(std::move(rest)); + auto uncompressed = auto_->uncompress(split.get(), uncompressedLength); + EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); + EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get())); + } +} + +TEST_P(AutomaticCodecTest, RandomData) { + runSimpleTest(randomDataHolder); +} + +TEST_P(AutomaticCodecTest, ConstantData) { + runSimpleTest(constantDataHolder); +} + +TEST_P(AutomaticCodecTest, ValidPrefixes) { + const auto prefixes = codec_->validPrefixes(); + for (const auto& prefix : prefixes) { + EXPECT_FALSE(prefix.empty()); + // Ensure that all strings are at least 8 bytes for LZMA2. + // The bytes after the prefix should be ignored by `canUncompress()`. + IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8}; + data.append(8); + EXPECT_TRUE(codec_->canUncompress(&data)); + EXPECT_TRUE(auto_->canUncompress(&data)); + } +} + +TEST_P(AutomaticCodecTest, NeedsUncompressedLength) { + if (codec_->needsUncompressedLength()) { + EXPECT_TRUE(auto_->needsUncompressedLength()); + } +} + +TEST_P(AutomaticCodecTest, maxUncompressedLength) { + EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength()); +} + +TEST_P(AutomaticCodecTest, DefaultCodec) { + const uint64_t length = 42; + std::vector> codecs; + codecs.push_back(getCodec(CodecType::ZSTD)); + auto automatic = getAutoUncompressionCodec(std::move(codecs)); + auto original = IOBuf::wrapBuffer(constantDataHolder.data(length)); + auto compressed = codec_->compress(original.get()); + auto decompressed = automatic->uncompress(compressed.get()); + + EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get())); +} + +namespace { +class CustomCodec : public Codec { + public: + static std::unique_ptr create(std::string prefix, CodecType type) { + return std::make_unique(std::move(prefix), type); + } + explicit CustomCodec(std::string prefix, CodecType type) + : Codec(CodecType::USER_DEFINED), + prefix_(std::move(prefix)), + codec_(getCodec(type)) {} + + private: + std::vector validPrefixes() const override { + return {prefix_}; + } + + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override { + return codec_->maxCompressedLength(uncompressedLength) + prefix_.size(); + } + + bool canUncompress(const IOBuf* data, Optional) const override { + auto clone = data->cloneCoalescedAsValue(); + if (clone.length() < prefix_.size()) { + return false; + } + return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0; + } + + std::unique_ptr doCompress(const IOBuf* data) override { + auto result = IOBuf::copyBuffer(prefix_); + result->appendChain(codec_->compress(data)); + EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength())); + return result; + } + + std::unique_ptr doUncompress( + const IOBuf* data, + Optional uncompressedLength) override { + EXPECT_TRUE(canUncompress(data, uncompressedLength)); + auto clone = data->cloneCoalescedAsValue(); + clone.trimStart(prefix_.size()); + return codec_->uncompress(&clone, uncompressedLength); + } + + std::string prefix_; + std::unique_ptr codec_; +}; +} + +TEST_P(AutomaticCodecTest, CustomCodec) { + const uint64_t length = 42; + auto ab = CustomCodec::create("ab", CodecType::ZSTD); + std::vector> codecs; + codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD)); + auto automatic = getAutoUncompressionCodec(std::move(codecs)); + auto original = IOBuf::wrapBuffer(constantDataHolder.data(length)); + + auto abCompressed = ab->compress(original.get()); + auto abDecompressed = automatic->uncompress(abCompressed.get()); + EXPECT_TRUE(automatic->canUncompress(abCompressed.get())); + EXPECT_FALSE(auto_->canUncompress(abCompressed.get())); + EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get())); + + auto compressed = codec_->compress(original.get()); + auto decompressed = automatic->uncompress(compressed.get()); + EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get())); +} + +TEST_P(AutomaticCodecTest, CustomDefaultCodec) { + const uint64_t length = 42; + auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION); + std::vector> codecs; + codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION)); + codecs.push_back(getCodec(CodecType::LZ4_FRAME)); + auto automatic = getAutoUncompressionCodec(std::move(codecs)); + auto original = IOBuf::wrapBuffer(constantDataHolder.data(length)); + + auto noneCompressed = none->compress(original.get()); + auto noneDecompressed = automatic->uncompress(noneCompressed.get()); + EXPECT_TRUE(automatic->canUncompress(noneCompressed.get())); + EXPECT_FALSE(auto_->canUncompress(noneCompressed.get())); + EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get())); + + auto compressed = codec_->compress(original.get()); + auto decompressed = automatic->uncompress(compressed.get()); + EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get())); +} + +TEST_P(AutomaticCodecTest, canUncompressOneBytes) { + // No default codec can uncompress 1 bytes. + IOBuf buf{IOBuf::CREATE, 1}; + buf.append(1); + EXPECT_FALSE(codec_->canUncompress(&buf, 1)); + EXPECT_FALSE(codec_->canUncompress(&buf, folly::none)); + EXPECT_FALSE(auto_->canUncompress(&buf, 1)); + EXPECT_FALSE(auto_->canUncompress(&buf, folly::none)); +} + +INSTANTIATE_TEST_CASE_P( + AutomaticCodecTest, + AutomaticCodecTest, + testing::Values( + CodecType::LZ4_FRAME, + CodecType::ZSTD, + CodecType::ZLIB, + CodecType::GZIP, + CodecType::LZMA2, + CodecType::BZIP2)); + +TEST(ValidPrefixesTest, CustomCodec) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION)); + const auto none = getAutoUncompressionCodec(std::move(codecs)); + const auto prefixes = none->validPrefixes(); + const auto it = std::find(prefixes.begin(), prefixes.end(), "none"); + EXPECT_TRUE(it != prefixes.end()); +} + +#define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \ + do { \ + if (kIsDebug) { \ + EXPECT_THROW((statement), expected_exception); \ + } else { \ + EXPECT_NO_THROW((statement)); \ + } \ + } while (false) + +TEST(CheckCompatibleTest, SimplePrefixSecond) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION)); + codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION)); + EXPECT_THROW_IF_DEBUG( + getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); +} + +TEST(CheckCompatibleTest, SimplePrefixFirst) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION)); + codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION)); + EXPECT_THROW_IF_DEBUG( + getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); +} + +TEST(CheckCompatibleTest, Empty) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION)); + EXPECT_THROW_IF_DEBUG( + getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); +} + +TEST(CheckCompatibleTest, ZstdPrefix) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD)); + EXPECT_THROW_IF_DEBUG( + getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); +} + +TEST(CheckCompatibleTest, ZstdDuplicate) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD)); + EXPECT_THROW_IF_DEBUG( + getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); +} + +TEST(CheckCompatibleTest, ZlibIsPrefix) { + std::vector> codecs; + codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD)); + EXPECT_THROW_IF_DEBUG( + getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument); +} + +#if FOLLY_HAVE_LIBZSTD + +TEST(ZstdTest, BackwardCompatible) { + auto codec = getCodec(CodecType::ZSTD); + { + auto const data = IOBuf::wrapBuffer(randomDataHolder.data(size_t(1) << 20)); + auto compressed = codec->compress(data.get()); + compressed->coalesce(); + EXPECT_EQ( + data->length(), + ZSTD_getDecompressedSize(compressed->data(), compressed->length())); + } + { + auto const data = + IOBuf::wrapBuffer(randomDataHolder.data(size_t(100) << 20)); + auto compressed = codec->compress(data.get()); + compressed->coalesce(); + EXPECT_EQ( + data->length(), + ZSTD_getDecompressedSize(compressed->data(), compressed->length())); + } +} +#endif }}} // namespaces int main(int argc, char *argv[]) {