2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
23 #include <unordered_map>
26 #include <boost/noncopyable.hpp>
27 #include <glog/logging.h>
29 #include <folly/Benchmark.h>
30 #include <folly/Hash.h>
31 #include <folly/Memory.h>
32 #include <folly/Random.h>
33 #include <folly/Varint.h>
34 #include <folly/io/IOBufQueue.h>
35 #include <folly/portability/GTest.h>
37 namespace folly { namespace io { namespace test {
39 class DataHolder : private boost::noncopyable {
41 uint64_t hash(size_t size) const;
42 ByteRange data(size_t size) const;
45 explicit DataHolder(size_t sizeLog2);
47 std::unique_ptr<uint8_t[]> data_;
48 mutable std::unordered_map<uint64_t, uint64_t> hashCache_;
51 DataHolder::DataHolder(size_t sizeLog2)
52 : size_(size_t(1) << sizeLog2),
53 data_(new uint8_t[size_]) {
56 uint64_t DataHolder::hash(size_t size) const {
57 CHECK_LE(size, size_);
58 auto p = hashCache_.find(size);
59 if (p != hashCache_.end()) {
63 uint64_t h = folly::hash::fnv64_buf(data_.get(), size);
68 ByteRange DataHolder::data(size_t size) const {
69 CHECK_LE(size, size_);
70 return ByteRange(data_.get(), size);
73 uint64_t hashIOBuf(const IOBuf* buf) {
74 uint64_t h = folly::hash::FNV_64_HASH_START;
75 for (auto& range : *buf) {
76 h = folly::hash::fnv64_buf(range.data(), range.size(), h);
81 class RandomDataHolder : public DataHolder {
83 explicit RandomDataHolder(size_t sizeLog2);
86 RandomDataHolder::RandomDataHolder(size_t sizeLog2)
87 : DataHolder(sizeLog2) {
88 static constexpr size_t numThreadsLog2 = 3;
89 static constexpr size_t numThreads = size_t(1) << numThreadsLog2;
91 uint32_t seed = randomNumberSeed();
93 std::vector<std::thread> threads;
94 threads.reserve(numThreads);
95 for (size_t t = 0; t < numThreads; ++t) {
96 threads.emplace_back([this, seed, t, sizeLog2] {
97 std::mt19937 rng(seed + t);
98 size_t countLog2 = sizeLog2 - numThreadsLog2;
99 size_t start = size_t(t) << countLog2;
100 for (size_t i = 0; i < countLog2; ++i) {
101 this->data_[start + i] = rng();
106 for (auto& t : threads) {
111 class ConstantDataHolder : public DataHolder {
113 explicit ConstantDataHolder(size_t sizeLog2);
116 ConstantDataHolder::ConstantDataHolder(size_t sizeLog2)
117 : DataHolder(sizeLog2) {
118 memset(data_.get(), 'a', size_);
121 constexpr size_t dataSizeLog2 = 27; // 128MiB
122 RandomDataHolder randomDataHolder(dataSizeLog2);
123 ConstantDataHolder constantDataHolder(dataSizeLog2);
125 // The intersection of the provided codecs & those that are compiled in.
126 static std::vector<CodecType> supportedCodecs(std::vector<CodecType> const& v) {
127 std::vector<CodecType> supported;
132 std::back_inserter(supported),
138 // All compiled-in compression codecs.
139 static std::vector<CodecType> availableCodecs() {
140 std::vector<CodecType> codecs;
142 for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
143 auto type = static_cast<CodecType>(i);
144 if (hasCodec(type)) {
145 codecs.push_back(type);
152 static std::vector<CodecType> availableStreamCodecs() {
153 std::vector<CodecType> codecs;
155 for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
156 auto type = static_cast<CodecType>(i);
157 if (hasStreamCodec(type)) {
158 codecs.push_back(type);
165 TEST(CompressionTestNeedsUncompressedLength, Simple) {
166 static const struct { CodecType type; bool needsUncompressedLength; }
168 { CodecType::NO_COMPRESSION, false },
169 { CodecType::LZ4, true },
170 { CodecType::SNAPPY, false },
171 { CodecType::ZLIB, false },
172 { CodecType::LZ4_VARINT_SIZE, false },
173 { CodecType::LZMA2, false },
174 { CodecType::LZMA2_VARINT_SIZE, false },
175 { CodecType::ZSTD, false },
176 { CodecType::GZIP, false },
177 { CodecType::LZ4_FRAME, false },
178 { CodecType::BZIP2, false },
181 for (auto const& test : expectations) {
182 if (hasCodec(test.type)) {
183 EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(),
184 test.needsUncompressedLength);
189 class CompressionTest
190 : public testing::TestWithParam<std::tr1::tuple<int, int, CodecType>> {
192 void SetUp() override {
193 auto tup = GetParam();
194 uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
195 chunks_ = std::tr1::get<1>(tup);
196 codec_ = getCodec(std::tr1::get<2>(tup));
199 void runSimpleIOBufTest(const DataHolder& dh);
201 void runSimpleStringTest(const DataHolder& dh);
204 std::unique_ptr<IOBuf> split(std::unique_ptr<IOBuf> data) const;
206 uint64_t uncompressedLength_;
208 std::unique_ptr<Codec> codec_;
211 void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) {
212 const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_)));
213 const auto compressed = split(codec_->compress(original.get()));
214 if (!codec_->needsUncompressedLength()) {
215 auto uncompressed = codec_->uncompress(compressed.get());
216 EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
217 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
220 auto uncompressed = codec_->uncompress(compressed.get(),
221 uncompressedLength_);
222 EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
223 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
227 void CompressionTest::runSimpleStringTest(const DataHolder& dh) {
228 const auto original = std::string(
229 reinterpret_cast<const char*>(dh.data(uncompressedLength_).data()),
230 uncompressedLength_);
231 const auto compressed = codec_->compress(original);
232 if (!codec_->needsUncompressedLength()) {
233 auto uncompressed = codec_->uncompress(compressed);
234 EXPECT_EQ(uncompressedLength_, uncompressed.length());
235 EXPECT_EQ(uncompressed, original);
238 auto uncompressed = codec_->uncompress(compressed, uncompressedLength_);
239 EXPECT_EQ(uncompressedLength_, uncompressed.length());
240 EXPECT_EQ(uncompressed, original);
244 // Uniformly split data into (potentially empty) chunks.
245 std::unique_ptr<IOBuf> CompressionTest::split(
246 std::unique_ptr<IOBuf> data) const {
247 if (data->isChained()) {
251 const size_t size = data->computeChainDataLength();
253 std::multiset<size_t> splits;
254 for (size_t i = 1; i < chunks_; ++i) {
255 splits.insert(Random::rand64(size));
258 folly::IOBufQueue result;
261 for (size_t split : splits) {
262 result.append(IOBuf::copyBuffer(data->data() + offset, split - offset));
265 result.append(IOBuf::copyBuffer(data->data() + offset, size - offset));
267 return result.move();
270 TEST_P(CompressionTest, RandomData) {
271 runSimpleIOBufTest(randomDataHolder);
274 TEST_P(CompressionTest, ConstantData) {
275 runSimpleIOBufTest(constantDataHolder);
278 TEST_P(CompressionTest, RandomDataString) {
279 runSimpleStringTest(randomDataHolder);
282 TEST_P(CompressionTest, ConstantDataString) {
283 runSimpleStringTest(constantDataHolder);
286 INSTANTIATE_TEST_CASE_P(
290 testing::Values(0, 1, 12, 22, 25, 27),
291 testing::Values(1, 2, 3, 8, 65),
292 testing::ValuesIn(availableCodecs())));
294 class CompressionVarintTest
295 : public testing::TestWithParam<std::tr1::tuple<int, CodecType>> {
297 void SetUp() override {
298 auto tup = GetParam();
299 uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
300 codec_ = getCodec(std::tr1::get<1>(tup));
303 void runSimpleTest(const DataHolder& dh);
305 uint64_t uncompressedLength_;
306 std::unique_ptr<Codec> codec_;
309 inline uint64_t oneBasedMsbPos(uint64_t number) {
311 for (; number > 0; ++pos, number >>= 1) {
316 void CompressionVarintTest::runSimpleTest(const DataHolder& dh) {
317 auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
318 auto compressed = codec_->compress(original.get());
322 std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL);
323 auto tinyBuf = IOBuf::copyBuffer(compressed->data(),
324 std::min(compressed->length(), breakPoint));
325 compressed->trimStart(breakPoint);
326 tinyBuf->prependChain(std::move(compressed));
327 compressed = std::move(tinyBuf);
329 auto uncompressed = codec_->uncompress(compressed.get());
331 EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
332 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
335 TEST_P(CompressionVarintTest, RandomData) {
336 runSimpleTest(randomDataHolder);
339 TEST_P(CompressionVarintTest, ConstantData) {
340 runSimpleTest(constantDataHolder);
343 INSTANTIATE_TEST_CASE_P(
344 CompressionVarintTest,
345 CompressionVarintTest,
347 testing::Values(0, 1, 12, 22, 25, 27),
348 testing::ValuesIn(supportedCodecs({
349 CodecType::LZ4_VARINT_SIZE,
350 CodecType::LZMA2_VARINT_SIZE,
353 class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
355 void SetUp() override { codec_ = getCodec(GetParam()); }
357 void runSimpleTest(const DataHolder& dh);
359 std::unique_ptr<Codec> codec_;
362 void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) {
363 constexpr uint64_t uncompressedLength = 42;
364 auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
365 auto compressed = codec_->compress(original.get());
367 if (!codec_->needsUncompressedLength()) {
368 auto uncompressed = codec_->uncompress(compressed.get());
369 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
370 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
373 auto uncompressed = codec_->uncompress(compressed.get(),
375 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
376 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
379 EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1),
382 // Corrupt the first character
383 ++(compressed->writableData()[0]);
385 if (!codec_->needsUncompressedLength()) {
386 EXPECT_THROW(codec_->uncompress(compressed.get()),
390 EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength),
394 TEST_P(CompressionCorruptionTest, RandomData) {
395 runSimpleTest(randomDataHolder);
398 TEST_P(CompressionCorruptionTest, ConstantData) {
399 runSimpleTest(constantDataHolder);
402 INSTANTIATE_TEST_CASE_P(
403 CompressionCorruptionTest,
404 CompressionCorruptionTest,
406 // NO_COMPRESSION can't detect corruption
407 // LZ4 can't detect corruption reliably (sigh)
413 CodecType::LZ4_FRAME,
417 class StreamingUnitTest : public testing::TestWithParam<CodecType> {
419 void SetUp() override {
420 codec_ = getStreamCodec(GetParam());
423 std::unique_ptr<StreamCodec> codec_;
426 TEST_P(StreamingUnitTest, maxCompressedLength) {
427 EXPECT_EQ(0, codec_->maxCompressedLength(0));
428 for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
429 EXPECT_GE(codec_->maxCompressedLength(length), length);
433 TEST_P(StreamingUnitTest, getUncompressedLength) {
434 auto const empty = IOBuf::create(0);
435 EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
436 EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
438 auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
439 auto const compressed = codec_->compress(data.get());
441 EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0));
442 if (auto const length = codec_->getUncompressedLength(data.get())) {
443 EXPECT_EQ(100, *length);
445 EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
446 // If the uncompressed length is stored in the frame, then make sure it throws
447 // when it is given the wrong length.
448 if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
449 EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
453 TEST_P(StreamingUnitTest, emptyData) {
455 auto buffer = IOBuf::create(1);
456 buffer->append(buffer->capacity());
457 MutableByteRange output{};
459 // Test compressing empty data in one pass
460 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
461 codec_->resetStream(0);
462 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
463 codec_->resetStream();
464 output = {buffer->writableData(), buffer->length()};
465 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
466 EXPECT_EQ(buffer->length(), output.size());
468 // Test compressing empty data with multiple calls to compressStream()
469 codec_->resetStream();
471 EXPECT_FALSE(codec_->compressStream(input, output));
473 codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
474 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
475 codec_->resetStream();
476 output = {buffer->writableData(), buffer->length()};
477 EXPECT_FALSE(codec_->compressStream(input, output));
479 codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
480 EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
481 EXPECT_EQ(buffer->length(), output.size());
483 // Test uncompressing empty data
485 codec_->resetStream();
486 EXPECT_TRUE(codec_->uncompressStream(input, output));
487 codec_->resetStream();
489 codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
490 codec_->resetStream();
492 codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
493 codec_->resetStream(0);
494 EXPECT_TRUE(codec_->uncompressStream(input, output));
495 codec_->resetStream(0);
497 codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
498 codec_->resetStream(0);
500 codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
503 TEST_P(StreamingUnitTest, noForwardProgressOkay) {
504 auto inBuffer = IOBuf::create(2);
505 inBuffer->writableData()[0] = 'a';
506 inBuffer->writableData()[0] = 'a';
508 auto input = inBuffer->coalesce();
509 auto compressed = codec_->compress(inBuffer.get());
511 auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
512 MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()};
514 ByteRange emptyInput;
515 MutableByteRange emptyOutput;
517 // Compress some data to avoid empty data special casing
518 codec_->resetStream();
519 while (!input.empty()) {
520 codec_->compressStream(input, output);
522 // empty input and output is okay for flush NONE and FLUSH.
523 codec_->compressStream(emptyInput, emptyOutput);
524 codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH);
526 codec_->resetStream();
527 input = inBuffer->coalesce();
528 output = {outBuffer->writableTail(), outBuffer->tailroom()};
529 while (!input.empty()) {
530 codec_->compressStream(input, output);
532 // empty input and output is okay for flush END.
533 codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END);
535 codec_->resetStream();
536 input = compressed->coalesce();
537 input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete
538 output = {inBuffer->writableData(), inBuffer->length()};
539 // Uncompress some data to avoid empty data special casing
540 while (!input.empty()) {
541 EXPECT_FALSE(codec_->uncompressStream(input, output));
543 // empty input and output is okay for all flush values.
544 EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput));
545 EXPECT_FALSE(codec_->uncompressStream(
546 emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH));
547 EXPECT_FALSE(codec_->uncompressStream(
548 emptyInput, emptyOutput, StreamCodec::FlushOp::END));
551 TEST_P(StreamingUnitTest, stateTransitions) {
552 auto inBuffer = IOBuf::create(1);
553 inBuffer->writableData()[0] = 'a';
555 auto compressed = codec_->compress(inBuffer.get());
556 ByteRange const in = compressed->coalesce();
557 auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
558 MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
561 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
562 bool empty = false) {
564 auto output = empty ? MutableByteRange{} : out;
565 return codec_->compressStream(input, output, flushOp);
567 auto uncompress = [&](
568 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
569 bool empty = false) {
571 auto output = empty ? MutableByteRange{} : out;
572 return codec_->uncompressStream(input, output, flushOp);
576 codec_->resetStream();
577 EXPECT_FALSE(compress());
578 EXPECT_FALSE(compress());
579 EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
580 EXPECT_FALSE(compress());
581 EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
582 // uncompression flow
583 codec_->resetStream();
584 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
585 codec_->resetStream();
586 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
587 codec_->resetStream();
588 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
589 codec_->resetStream();
590 EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
591 codec_->resetStream();
592 EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
593 // compress -> uncompress
594 codec_->resetStream();
595 EXPECT_FALSE(compress());
596 EXPECT_THROW(uncompress(), std::logic_error);
597 // uncompress -> compress
598 codec_->resetStream();
599 EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
600 EXPECT_THROW(compress(), std::logic_error);
602 codec_->resetStream();
603 EXPECT_FALSE(compress());
604 EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
605 EXPECT_THROW(compress(), std::logic_error);
607 codec_->resetStream();
608 EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
609 EXPECT_THROW(uncompress(), std::logic_error);
611 codec_->resetStream();
612 EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
613 EXPECT_THROW(compress(), std::logic_error);
615 codec_->resetStream();
616 EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
617 EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
618 // undefined -> compress
619 codec_->compress(inBuffer.get());
620 EXPECT_THROW(compress(), std::logic_error);
621 codec_->uncompress(compressed.get());
622 EXPECT_THROW(compress(), std::logic_error);
623 // undefined -> undefined
624 codec_->uncompress(compressed.get());
625 codec_->compress(inBuffer.get());
628 INSTANTIATE_TEST_CASE_P(
631 testing::ValuesIn(availableStreamCodecs()));
633 class StreamingCompressionTest
634 : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
636 void SetUp() override {
637 auto const tup = GetParam();
638 uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
639 chunkSize_ = size_t(1) << std::get<1>(tup);
640 codec_ = getStreamCodec(std::get<2>(tup));
643 void runResetStreamTest(DataHolder const& dh);
644 void runCompressStreamTest(DataHolder const& dh);
645 void runUncompressStreamTest(DataHolder const& dh);
646 void runFlushTest(DataHolder const& dh);
649 std::vector<ByteRange> split(ByteRange data) const;
651 uint64_t uncompressedLength_;
653 std::unique_ptr<StreamCodec> codec_;
656 std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
657 size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
658 std::vector<ByteRange> result;
659 result.reserve(pieces + 1);
660 while (!data.empty()) {
661 size_t const pieceSize = std::min(data.size(), chunkSize_);
662 result.push_back(data.subpiece(0, pieceSize));
663 data.uncheckedAdvance(pieceSize);
668 static std::unique_ptr<IOBuf> compressSome(
672 StreamCodec::FlushOp flush) {
676 auto buffer = IOBuf::create(bufferSize);
677 buffer->append(buffer->capacity());
678 MutableByteRange output{buffer->writableData(), buffer->length()};
680 result = codec->compressStream(data, output, flush);
681 buffer->trimEnd(output.size());
682 queue.append(std::move(buffer));
684 } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
685 EXPECT_TRUE(data.empty());
689 static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
693 StreamCodec::FlushOp flush) {
697 auto buffer = IOBuf::create(bufferSize);
698 buffer->append(buffer->capacity());
699 MutableByteRange output{buffer->writableData(), buffer->length()};
701 result = codec->uncompressStream(data, output, flush);
702 buffer->trimEnd(output.size());
703 queue.append(std::move(buffer));
705 } while (queue.tailroom() == 0 && !result);
706 return std::make_pair(result, queue.move());
709 void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
710 auto const input = dh.data(uncompressedLength_);
711 // Compress some but leave state unclean
712 codec_->resetStream(uncompressedLength_);
713 compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
714 // Reset stream and compress all
715 codec_->resetStream();
717 compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
718 auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
719 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
722 TEST_P(StreamingCompressionTest, resetStream) {
723 runResetStreamTest(constantDataHolder);
724 runResetStreamTest(randomDataHolder);
727 void StreamingCompressionTest::runCompressStreamTest(
728 const folly::io::test::DataHolder& dh) {
729 auto const inputs = split(dh.data(uncompressedLength_));
732 codec_->resetStream(uncompressedLength_);
733 // Compress many inputs in a row
734 for (auto const input : inputs) {
735 queue.append(compressSome(
736 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
738 // Finish the operation with empty input.
741 compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
743 auto const uncompressed = codec_->uncompress(queue.front());
744 EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
747 TEST_P(StreamingCompressionTest, compressStream) {
748 runCompressStreamTest(constantDataHolder);
749 runCompressStreamTest(randomDataHolder);
752 void StreamingCompressionTest::runUncompressStreamTest(
753 const folly::io::test::DataHolder& dh) {
754 auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
755 // Concatenate 3 compressed frames in a row
756 auto compressed = codec_->compress(data.get());
757 compressed->prependChain(codec_->compress(data.get()));
758 compressed->prependChain(codec_->compress(data.get()));
759 // Pass all 3 compressed frames in one input buffer
760 auto input = compressed->coalesce();
761 // Uncompress the first frame
762 codec_->resetStream(data->computeChainDataLength());
764 auto const result = uncompressSome(
765 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
766 ASSERT_TRUE(result.first);
767 ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
769 // Uncompress the second frame
770 codec_->resetStream();
772 auto const result = uncompressSome(
773 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
774 ASSERT_TRUE(result.first);
775 ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
777 // Uncompress the third frame
778 codec_->resetStream();
780 auto const result = uncompressSome(
781 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
782 ASSERT_TRUE(result.first);
783 ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
785 EXPECT_TRUE(input.empty());
788 TEST_P(StreamingCompressionTest, uncompressStream) {
789 runUncompressStreamTest(constantDataHolder);
790 runUncompressStreamTest(randomDataHolder);
793 void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
794 auto const inputs = split(dh.data(uncompressedLength_));
795 auto uncodec = getStreamCodec(codec_->type());
797 codec_->resetStream();
798 for (auto input : inputs) {
799 // Compress some data and flush the stream
800 auto compressed = compressSome(
801 codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
802 auto compressedRange = compressed->coalesce();
803 // Uncompress the compressed data
804 auto result = uncompressSome(
808 StreamCodec::FlushOp::FLUSH);
809 // All compressed data should have been consumed
810 EXPECT_TRUE(compressedRange.empty());
811 // The frame isn't complete
812 EXPECT_FALSE(result.first);
813 // The uncompressed data should be exactly the input data
814 EXPECT_EQ(input.size(), result.second->computeChainDataLength());
815 auto const data = IOBuf::wrapBuffer(input);
816 EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
820 TEST_P(StreamingCompressionTest, testFlush) {
821 runFlushTest(constantDataHolder);
822 runFlushTest(randomDataHolder);
825 INSTANTIATE_TEST_CASE_P(
826 StreamingCompressionTest,
827 StreamingCompressionTest,
829 testing::Values(0, 1, 12, 22, 27),
830 testing::Values(12, 17, 20),
831 testing::ValuesIn(availableStreamCodecs())));
833 class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
835 void SetUp() override {
836 codec_ = getCodec(GetParam());
837 auto_ = getAutoUncompressionCodec();
840 void runSimpleTest(const DataHolder& dh);
842 std::unique_ptr<Codec> codec_;
843 std::unique_ptr<Codec> auto_;
846 void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) {
847 constexpr uint64_t uncompressedLength = 1000;
848 auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
849 auto compressed = codec_->compress(original.get());
851 if (!codec_->needsUncompressedLength()) {
852 auto uncompressed = auto_->uncompress(compressed.get());
853 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
854 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
857 auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength);
858 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
859 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
861 ASSERT_GE(compressed->computeChainDataLength(), 8);
862 for (size_t i = 0; i < 8; ++i) {
863 auto split = compressed->clone();
864 auto rest = compressed->clone();
865 split->trimEnd(split->length() - i);
867 split->appendChain(std::move(rest));
868 auto uncompressed = auto_->uncompress(split.get(), uncompressedLength);
869 EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
870 EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
874 TEST_P(AutomaticCodecTest, RandomData) {
875 runSimpleTest(randomDataHolder);
878 TEST_P(AutomaticCodecTest, ConstantData) {
879 runSimpleTest(constantDataHolder);
882 TEST_P(AutomaticCodecTest, ValidPrefixes) {
883 const auto prefixes = codec_->validPrefixes();
884 for (const auto& prefix : prefixes) {
885 EXPECT_FALSE(prefix.empty());
886 // Ensure that all strings are at least 8 bytes for LZMA2.
887 // The bytes after the prefix should be ignored by `canUncompress()`.
888 IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8};
890 EXPECT_TRUE(codec_->canUncompress(&data));
891 EXPECT_TRUE(auto_->canUncompress(&data));
895 TEST_P(AutomaticCodecTest, NeedsUncompressedLength) {
896 if (codec_->needsUncompressedLength()) {
897 EXPECT_TRUE(auto_->needsUncompressedLength());
901 TEST_P(AutomaticCodecTest, maxUncompressedLength) {
902 EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength());
905 TEST_P(AutomaticCodecTest, DefaultCodec) {
906 const uint64_t length = 42;
907 std::vector<std::unique_ptr<Codec>> codecs;
908 codecs.push_back(getCodec(CodecType::ZSTD));
909 auto automatic = getAutoUncompressionCodec(std::move(codecs));
910 auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
911 auto compressed = codec_->compress(original.get());
912 auto decompressed = automatic->uncompress(compressed.get());
914 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
918 class CustomCodec : public Codec {
920 static std::unique_ptr<Codec> create(std::string prefix, CodecType type) {
921 return std::make_unique<CustomCodec>(std::move(prefix), type);
923 explicit CustomCodec(std::string prefix, CodecType type)
924 : Codec(CodecType::USER_DEFINED),
925 prefix_(std::move(prefix)),
926 codec_(getCodec(type)) {}
929 std::vector<std::string> validPrefixes() const override {
933 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
934 return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
937 bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
938 auto clone = data->cloneCoalescedAsValue();
939 if (clone.length() < prefix_.size()) {
942 return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0;
945 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override {
946 auto result = IOBuf::copyBuffer(prefix_);
947 result->appendChain(codec_->compress(data));
948 EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength()));
952 std::unique_ptr<IOBuf> doUncompress(
954 Optional<uint64_t> uncompressedLength) override {
955 EXPECT_TRUE(canUncompress(data, uncompressedLength));
956 auto clone = data->cloneCoalescedAsValue();
957 clone.trimStart(prefix_.size());
958 return codec_->uncompress(&clone, uncompressedLength);
962 std::unique_ptr<Codec> codec_;
966 TEST_P(AutomaticCodecTest, CustomCodec) {
967 const uint64_t length = 42;
968 auto ab = CustomCodec::create("ab", CodecType::ZSTD);
969 std::vector<std::unique_ptr<Codec>> codecs;
970 codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD));
971 auto automatic = getAutoUncompressionCodec(std::move(codecs));
972 auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
974 auto abCompressed = ab->compress(original.get());
975 auto abDecompressed = automatic->uncompress(abCompressed.get());
976 EXPECT_TRUE(automatic->canUncompress(abCompressed.get()));
977 EXPECT_FALSE(auto_->canUncompress(abCompressed.get()));
978 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get()));
980 auto compressed = codec_->compress(original.get());
981 auto decompressed = automatic->uncompress(compressed.get());
982 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
985 TEST_P(AutomaticCodecTest, CustomDefaultCodec) {
986 const uint64_t length = 42;
987 auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION);
988 std::vector<std::unique_ptr<Codec>> codecs;
989 codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
990 codecs.push_back(getCodec(CodecType::LZ4_FRAME));
991 auto automatic = getAutoUncompressionCodec(std::move(codecs));
992 auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
994 auto noneCompressed = none->compress(original.get());
995 auto noneDecompressed = automatic->uncompress(noneCompressed.get());
996 EXPECT_TRUE(automatic->canUncompress(noneCompressed.get()));
997 EXPECT_FALSE(auto_->canUncompress(noneCompressed.get()));
998 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get()));
1000 auto compressed = codec_->compress(original.get());
1001 auto decompressed = automatic->uncompress(compressed.get());
1002 EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1005 TEST_P(AutomaticCodecTest, canUncompressOneBytes) {
1006 // No default codec can uncompress 1 bytes.
1007 IOBuf buf{IOBuf::CREATE, 1};
1009 EXPECT_FALSE(codec_->canUncompress(&buf, 1));
1010 EXPECT_FALSE(codec_->canUncompress(&buf, folly::none));
1011 EXPECT_FALSE(auto_->canUncompress(&buf, 1));
1012 EXPECT_FALSE(auto_->canUncompress(&buf, folly::none));
1015 INSTANTIATE_TEST_CASE_P(
1019 CodecType::LZ4_FRAME,
1026 TEST(ValidPrefixesTest, CustomCodec) {
1027 std::vector<std::unique_ptr<Codec>> codecs;
1028 codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1029 const auto none = getAutoUncompressionCodec(std::move(codecs));
1030 const auto prefixes = none->validPrefixes();
1031 const auto it = std::find(prefixes.begin(), prefixes.end(), "none");
1032 EXPECT_TRUE(it != prefixes.end());
1035 #define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \
1038 EXPECT_THROW((statement), expected_exception); \
1040 EXPECT_NO_THROW((statement)); \
1044 TEST(CheckCompatibleTest, SimplePrefixSecond) {
1045 std::vector<std::unique_ptr<Codec>> codecs;
1046 codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1047 codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1048 EXPECT_THROW_IF_DEBUG(
1049 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1052 TEST(CheckCompatibleTest, SimplePrefixFirst) {
1053 std::vector<std::unique_ptr<Codec>> codecs;
1054 codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1055 codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1056 EXPECT_THROW_IF_DEBUG(
1057 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1060 TEST(CheckCompatibleTest, Empty) {
1061 std::vector<std::unique_ptr<Codec>> codecs;
1062 codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION));
1063 EXPECT_THROW_IF_DEBUG(
1064 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1067 TEST(CheckCompatibleTest, ZstdPrefix) {
1068 std::vector<std::unique_ptr<Codec>> codecs;
1069 codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD));
1070 EXPECT_THROW_IF_DEBUG(
1071 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1074 TEST(CheckCompatibleTest, ZstdDuplicate) {
1075 std::vector<std::unique_ptr<Codec>> codecs;
1076 codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD));
1077 EXPECT_THROW_IF_DEBUG(
1078 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1081 TEST(CheckCompatibleTest, ZlibIsPrefix) {
1082 std::vector<std::unique_ptr<Codec>> codecs;
1083 codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD));
1084 EXPECT_THROW_IF_DEBUG(
1085 getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1089 int main(int argc, char *argv[]) {
1090 testing::InitGoogleTest(&argc, argv);
1091 gflags::ParseCommandLineFlags(&argc, &argv, true);
1093 auto ret = RUN_ALL_TESTS();
1095 folly::runBenchmarksOnFlag();