a1725765d067ccce0ac7d674206a3ede7ba186f5
[folly.git] / folly / compression / test / CompressionTest.cpp
1 /*
2  * Copyright 2013-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/compression/Compression.h>
18
19 #include <algorithm>
20 #include <random>
21 #include <set>
22 #include <thread>
23 #include <unordered_map>
24 #include <utility>
25
26 #include <boost/noncopyable.hpp>
27 #include <glog/logging.h>
28
29 #include <folly/Memory.h>
30 #include <folly/Random.h>
31 #include <folly/Varint.h>
32 #include <folly/hash/Hash.h>
33 #include <folly/io/IOBufQueue.h>
34 #include <folly/portability/GTest.h>
35
36 #if FOLLY_HAVE_LIBZSTD
37 #include <zstd.h>
38 #endif
39
40 #if FOLLY_HAVE_LIBZ
41 #include <folly/compression/Zlib.h>
42
43 namespace zlib = folly::io::zlib;
44 #endif
45
46 namespace folly {
47 namespace io {
48 namespace test {
49
50 class DataHolder : private boost::noncopyable {
51  public:
52   uint64_t hash(size_t size) const;
53   ByteRange data(size_t size) const;
54
55  protected:
56   explicit DataHolder(size_t sizeLog2);
57   const size_t size_;
58   std::unique_ptr<uint8_t[]> data_;
59   mutable std::unordered_map<uint64_t, uint64_t> hashCache_;
60 };
61
62 DataHolder::DataHolder(size_t sizeLog2)
63   : size_(size_t(1) << sizeLog2),
64     data_(new uint8_t[size_]) {
65 }
66
67 uint64_t DataHolder::hash(size_t size) const {
68   CHECK_LE(size, size_);
69   auto p = hashCache_.find(size);
70   if (p != hashCache_.end()) {
71     return p->second;
72   }
73
74   uint64_t h = folly::hash::fnv64_buf(data_.get(), size);
75   hashCache_[size] = h;
76   return h;
77 }
78
79 ByteRange DataHolder::data(size_t size) const {
80   CHECK_LE(size, size_);
81   return ByteRange(data_.get(), size);
82 }
83
84 uint64_t hashIOBuf(const IOBuf* buf) {
85   uint64_t h = folly::hash::FNV_64_HASH_START;
86   for (auto& range : *buf) {
87     h = folly::hash::fnv64_buf(range.data(), range.size(), h);
88   }
89   return h;
90 }
91
92 class RandomDataHolder : public DataHolder {
93  public:
94   explicit RandomDataHolder(size_t sizeLog2);
95 };
96
97 RandomDataHolder::RandomDataHolder(size_t sizeLog2)
98   : DataHolder(sizeLog2) {
99   static constexpr size_t numThreadsLog2 = 3;
100   static constexpr size_t numThreads = size_t(1) << numThreadsLog2;
101
102   uint32_t seed = randomNumberSeed();
103
104   std::vector<std::thread> threads;
105   threads.reserve(numThreads);
106   for (size_t t = 0; t < numThreads; ++t) {
107     threads.emplace_back([this, seed, t, sizeLog2] {
108       std::mt19937 rng(seed + t);
109       size_t countLog2 = sizeLog2 - numThreadsLog2;
110       size_t start = size_t(t) << countLog2;
111       for (size_t i = 0; i < countLog2; ++i) {
112         this->data_[start + i] = rng();
113       }
114     });
115   }
116
117   for (auto& t : threads) {
118     t.join();
119   }
120 }
121
122 class ConstantDataHolder : public DataHolder {
123  public:
124   explicit ConstantDataHolder(size_t sizeLog2);
125 };
126
127 ConstantDataHolder::ConstantDataHolder(size_t sizeLog2)
128   : DataHolder(sizeLog2) {
129   memset(data_.get(), 'a', size_);
130 }
131
132 constexpr size_t dataSizeLog2 = 27;  // 128MiB
133 RandomDataHolder randomDataHolder(dataSizeLog2);
134 ConstantDataHolder constantDataHolder(dataSizeLog2);
135
136 // The intersection of the provided codecs & those that are compiled in.
137 static std::vector<CodecType> supportedCodecs(std::vector<CodecType> const& v) {
138   std::vector<CodecType> supported;
139
140   std::copy_if(
141       std::begin(v),
142       std::end(v),
143       std::back_inserter(supported),
144       hasCodec);
145
146   return supported;
147 }
148
149 // All compiled-in compression codecs.
150 static std::vector<CodecType> availableCodecs() {
151   std::vector<CodecType> codecs;
152
153   for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
154     auto type = static_cast<CodecType>(i);
155     if (hasCodec(type)) {
156       codecs.push_back(type);
157     }
158   }
159
160   return codecs;
161 }
162
163 static std::vector<CodecType> availableStreamCodecs() {
164   std::vector<CodecType> codecs;
165
166   for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
167     auto type = static_cast<CodecType>(i);
168     if (hasStreamCodec(type)) {
169       codecs.push_back(type);
170     }
171   }
172
173   return codecs;
174 }
175
176 TEST(CompressionTestNeedsUncompressedLength, Simple) {
177   static const struct {
178     CodecType type;
179     bool needsUncompressedLength;
180   } expectations[] = {
181       {CodecType::NO_COMPRESSION, false},
182       {CodecType::LZ4, true},
183       {CodecType::SNAPPY, false},
184       {CodecType::ZLIB, false},
185       {CodecType::LZ4_VARINT_SIZE, false},
186       {CodecType::LZMA2, false},
187       {CodecType::LZMA2_VARINT_SIZE, false},
188       {CodecType::ZSTD, false},
189       {CodecType::GZIP, false},
190       {CodecType::LZ4_FRAME, false},
191       {CodecType::BZIP2, false},
192   };
193
194   for (auto const& test : expectations) {
195     if (hasCodec(test.type)) {
196       EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(),
197                 test.needsUncompressedLength);
198     }
199   }
200 }
201
202 class CompressionTest
203     : public testing::TestWithParam<std::tr1::tuple<int, int, CodecType>> {
204  protected:
205   void SetUp() override {
206     auto tup = GetParam();
207     int lengthLog = std::tr1::get<0>(tup);
208     // Small hack to test empty data
209     uncompressedLength_ =
210         (lengthLog < 0) ? 0 : uint64_t(1) << std::tr1::get<0>(tup);
211     chunks_ = std::tr1::get<1>(tup);
212     codec_ = getCodec(std::tr1::get<2>(tup));
213   }
214
215   void runSimpleIOBufTest(const DataHolder& dh);
216
217   void runSimpleStringTest(const DataHolder& dh);
218
219  private:
220   std::unique_ptr<IOBuf> split(std::unique_ptr<IOBuf> data) const;
221
222   uint64_t uncompressedLength_;
223   size_t chunks_;
224   std::unique_ptr<Codec> codec_;
225 };
226
227 void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) {
228   const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_)));
229   const auto compressed = split(codec_->compress(original.get()));
230   EXPECT_LE(
231       compressed->computeChainDataLength(),
232       codec_->maxCompressedLength(uncompressedLength_));
233   if (!codec_->needsUncompressedLength()) {
234     auto uncompressed = codec_->uncompress(compressed.get());
235     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
236     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
237   }
238   {
239     auto uncompressed = codec_->uncompress(compressed.get(),
240                                            uncompressedLength_);
241     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
242     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
243   }
244 }
245
246 void CompressionTest::runSimpleStringTest(const DataHolder& dh) {
247   const auto original = std::string(
248       reinterpret_cast<const char*>(dh.data(uncompressedLength_).data()),
249       uncompressedLength_);
250   const auto compressed = codec_->compress(original);
251   EXPECT_LE(
252       compressed.length(), codec_->maxCompressedLength(uncompressedLength_));
253
254   if (!codec_->needsUncompressedLength()) {
255     auto uncompressed = codec_->uncompress(compressed);
256     EXPECT_EQ(uncompressedLength_, uncompressed.length());
257     EXPECT_EQ(uncompressed, original);
258   }
259   {
260     auto uncompressed = codec_->uncompress(compressed, uncompressedLength_);
261     EXPECT_EQ(uncompressedLength_, uncompressed.length());
262     EXPECT_EQ(uncompressed, original);
263   }
264 }
265
266 // Uniformly split data into (potentially empty) chunks.
267 std::unique_ptr<IOBuf> CompressionTest::split(
268     std::unique_ptr<IOBuf> data) const {
269   if (data->isChained()) {
270     data->coalesce();
271   }
272
273   const size_t size = data->computeChainDataLength();
274
275   std::multiset<size_t> splits;
276   for (size_t i = 1; i < chunks_; ++i) {
277     splits.insert(Random::rand64(size));
278   }
279
280   folly::IOBufQueue result;
281
282   size_t offset = 0;
283   for (size_t split : splits) {
284     result.append(IOBuf::copyBuffer(data->data() + offset, split - offset));
285     offset = split;
286   }
287   result.append(IOBuf::copyBuffer(data->data() + offset, size - offset));
288
289   return result.move();
290 }
291
292 TEST_P(CompressionTest, RandomData) {
293   runSimpleIOBufTest(randomDataHolder);
294 }
295
296 TEST_P(CompressionTest, ConstantData) {
297   runSimpleIOBufTest(constantDataHolder);
298 }
299
300 TEST_P(CompressionTest, RandomDataString) {
301   runSimpleStringTest(randomDataHolder);
302 }
303
304 TEST_P(CompressionTest, ConstantDataString) {
305   runSimpleStringTest(constantDataHolder);
306 }
307
308 INSTANTIATE_TEST_CASE_P(
309     CompressionTest,
310     CompressionTest,
311     testing::Combine(
312         testing::Values(-1, 0, 1, 12, 22, 25, 27),
313         testing::Values(1, 2, 3, 8, 65),
314         testing::ValuesIn(availableCodecs())));
315
316 class CompressionVarintTest
317     : public testing::TestWithParam<std::tr1::tuple<int, CodecType>> {
318  protected:
319   void SetUp() override {
320     auto tup = GetParam();
321     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
322     codec_ = getCodec(std::tr1::get<1>(tup));
323   }
324
325   void runSimpleTest(const DataHolder& dh);
326
327   uint64_t uncompressedLength_;
328   std::unique_ptr<Codec> codec_;
329 };
330
331 inline uint64_t oneBasedMsbPos(uint64_t number) {
332   uint64_t pos = 0;
333   for (; number > 0; ++pos, number >>= 1) {
334   }
335   return pos;
336 }
337
338 void CompressionVarintTest::runSimpleTest(const DataHolder& dh) {
339   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
340   auto compressed = codec_->compress(original.get());
341   auto breakPoint =
342       1UL +
343       Random::rand64(
344           std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL);
345   auto tinyBuf = IOBuf::copyBuffer(compressed->data(),
346                                    std::min(compressed->length(), breakPoint));
347   compressed->trimStart(breakPoint);
348   tinyBuf->prependChain(std::move(compressed));
349   compressed = std::move(tinyBuf);
350
351   auto uncompressed = codec_->uncompress(compressed.get());
352
353   EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
354   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
355 }
356
357 TEST_P(CompressionVarintTest, RandomData) {
358   runSimpleTest(randomDataHolder);
359 }
360
361 TEST_P(CompressionVarintTest, ConstantData) {
362   runSimpleTest(constantDataHolder);
363 }
364
365 INSTANTIATE_TEST_CASE_P(
366     CompressionVarintTest,
367     CompressionVarintTest,
368     testing::Combine(
369         testing::Values(0, 1, 12, 22, 25, 27),
370         testing::ValuesIn(supportedCodecs({
371             CodecType::LZ4_VARINT_SIZE,
372             CodecType::LZMA2_VARINT_SIZE,
373         }))));
374
375 TEST(LZMATest, UncompressBadVarint) {
376   if (hasStreamCodec(CodecType::LZMA2_VARINT_SIZE)) {
377     std::string const str(kMaxVarintLength64 * 2, '\xff');
378     ByteRange input((folly::StringPiece(str)));
379     auto codec = getStreamCodec(CodecType::LZMA2_VARINT_SIZE);
380     auto buffer = IOBuf::create(16);
381     buffer->append(buffer->capacity());
382     MutableByteRange output{buffer->writableData(), buffer->length()};
383     EXPECT_THROW(codec->uncompressStream(input, output), std::runtime_error);
384   }
385 }
386
387 class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
388  protected:
389   void SetUp() override { codec_ = getCodec(GetParam()); }
390
391   void runSimpleTest(const DataHolder& dh);
392
393   std::unique_ptr<Codec> codec_;
394 };
395
396 void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) {
397   constexpr uint64_t uncompressedLength = 42;
398   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
399   auto compressed = codec_->compress(original.get());
400
401   if (!codec_->needsUncompressedLength()) {
402     auto uncompressed = codec_->uncompress(compressed.get());
403     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
404     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
405   }
406   {
407     auto uncompressed = codec_->uncompress(compressed.get(),
408                                            uncompressedLength);
409     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
410     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
411   }
412
413   EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1),
414                std::runtime_error);
415
416   auto corrupted = compressed->clone();
417   corrupted->unshare();
418   // Truncate the last character
419   corrupted->prev()->trimEnd(1);
420   if (!codec_->needsUncompressedLength()) {
421     EXPECT_THROW(codec_->uncompress(corrupted.get()),
422                  std::runtime_error);
423   }
424
425   EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
426                std::runtime_error);
427
428   corrupted = compressed->clone();
429   corrupted->unshare();
430   // Corrupt the first character
431   ++(corrupted->writableData()[0]);
432
433   if (!codec_->needsUncompressedLength()) {
434     EXPECT_THROW(codec_->uncompress(corrupted.get()),
435                  std::runtime_error);
436   }
437
438   EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
439                std::runtime_error);
440 }
441
442 TEST_P(CompressionCorruptionTest, RandomData) {
443   runSimpleTest(randomDataHolder);
444 }
445
446 TEST_P(CompressionCorruptionTest, ConstantData) {
447   runSimpleTest(constantDataHolder);
448 }
449
450 INSTANTIATE_TEST_CASE_P(
451     CompressionCorruptionTest,
452     CompressionCorruptionTest,
453     testing::ValuesIn(
454         // NO_COMPRESSION can't detect corruption
455         // LZ4 can't detect corruption reliably (sigh)
456         supportedCodecs({
457             CodecType::SNAPPY,
458             CodecType::ZLIB,
459             CodecType::LZMA2,
460             CodecType::ZSTD,
461             CodecType::LZ4_FRAME,
462             CodecType::BZIP2,
463         })));
464
465 class StreamingUnitTest : public testing::TestWithParam<CodecType> {
466  protected:
467   void SetUp() override {
468     codec_ = getStreamCodec(GetParam());
469   }
470
471   std::unique_ptr<StreamCodec> codec_;
472 };
473
474 TEST(StreamingUnitTest, needsDataLength) {
475   static const struct {
476     CodecType type;
477     bool needsDataLength;
478   } expectations[] = {
479       {CodecType::ZLIB, false},
480       {CodecType::GZIP, false},
481       {CodecType::LZMA2, false},
482       {CodecType::LZMA2_VARINT_SIZE, true},
483       {CodecType::ZSTD, false},
484   };
485
486   for (auto const& test : expectations) {
487     if (hasStreamCodec(test.type)) {
488       EXPECT_EQ(
489           getStreamCodec(test.type)->needsDataLength(), test.needsDataLength);
490     }
491   }
492 }
493
494 TEST_P(StreamingUnitTest, maxCompressedLength) {
495   for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
496     EXPECT_GE(codec_->maxCompressedLength(length), length);
497   }
498 }
499
500 TEST_P(StreamingUnitTest, getUncompressedLength) {
501   auto const empty = IOBuf::create(0);
502   EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
503   EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
504   EXPECT_ANY_THROW(codec_->getUncompressedLength(empty.get(), 1));
505
506   auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
507   auto const compressed = codec_->compress(data.get());
508
509   if (auto const length = codec_->getUncompressedLength(data.get())) {
510     EXPECT_EQ(100, *length);
511   }
512   EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
513   // If the uncompressed length is stored in the frame, then make sure it throws
514   // when it is given the wrong length.
515   if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
516     EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
517   }
518 }
519
520 TEST_P(StreamingUnitTest, emptyData) {
521   ByteRange input{};
522   auto buffer = IOBuf::create(codec_->maxCompressedLength(0));
523   buffer->append(buffer->capacity());
524   MutableByteRange output;
525
526   // Test compressing empty data in one pass
527   if (!codec_->needsDataLength()) {
528     output = {buffer->writableData(), buffer->length()};
529     EXPECT_TRUE(
530         codec_->compressStream(input, output, StreamCodec::FlushOp::END));
531   }
532   codec_->resetStream(0);
533   output = {buffer->writableData(), buffer->length()};
534   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
535
536   // Test uncompressing the compressed empty data is equivalent to the empty
537   // string
538   {
539     size_t compressedSize = buffer->length() - output.size();
540     auto const compressed =
541         IOBuf::copyBuffer(buffer->writableData(), compressedSize);
542     auto inputRange = compressed->coalesce();
543     codec_->resetStream(0);
544     output = {buffer->writableData(), buffer->length()};
545     EXPECT_TRUE(codec_->uncompressStream(
546         inputRange, output, StreamCodec::FlushOp::END));
547     EXPECT_EQ(output.size(), buffer->length());
548   }
549
550   // Test compressing empty data with multiple calls to compressStream()
551   {
552     auto largeBuffer = IOBuf::create(codec_->maxCompressedLength(0) * 2);
553     largeBuffer->append(largeBuffer->capacity());
554     codec_->resetStream(0);
555     output = {largeBuffer->writableData(), largeBuffer->length()};
556     EXPECT_FALSE(codec_->compressStream(input, output));
557     EXPECT_TRUE(
558         codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
559     EXPECT_TRUE(
560         codec_->compressStream(input, output, StreamCodec::FlushOp::END));
561   }
562
563   // Test uncompressing empty data
564   output = {};
565   codec_->resetStream();
566   EXPECT_TRUE(codec_->uncompressStream(input, output));
567   codec_->resetStream();
568   EXPECT_TRUE(
569       codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
570   codec_->resetStream();
571   EXPECT_TRUE(
572       codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
573   codec_->resetStream(0);
574   EXPECT_TRUE(codec_->uncompressStream(input, output));
575   codec_->resetStream(0);
576   EXPECT_TRUE(
577       codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
578   codec_->resetStream(0);
579   EXPECT_TRUE(
580       codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
581 }
582
583 TEST_P(StreamingUnitTest, noForwardProgress) {
584   auto inBuffer = IOBuf::create(2);
585   inBuffer->writableData()[0] = 'a';
586   inBuffer->writableData()[1] = 'a';
587   inBuffer->append(2);
588   const auto compressed = codec_->compress(inBuffer.get());
589   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
590
591   ByteRange emptyInput;
592   MutableByteRange emptyOutput;
593
594   const std::array<StreamCodec::FlushOp, 3> flushOps = {{
595       StreamCodec::FlushOp::NONE,
596       StreamCodec::FlushOp::FLUSH,
597       StreamCodec::FlushOp::END,
598   }};
599
600   // No progress is not okay twice in a row for all flush operations when
601   // compressing
602   for (const auto flushOp : flushOps) {
603     if (codec_->needsDataLength()) {
604       codec_->resetStream(inBuffer->computeChainDataLength());
605     } else {
606       codec_->resetStream();
607     }
608     auto input = inBuffer->coalesce();
609     MutableByteRange output = {outBuffer->writableTail(),
610                                outBuffer->tailroom()};
611     // Compress some data to avoid empty data special casing
612     while (!input.empty()) {
613       codec_->compressStream(input, output);
614     }
615     EXPECT_FALSE(codec_->compressStream(emptyInput, emptyOutput, flushOp));
616     EXPECT_THROW(
617         codec_->compressStream(emptyInput, emptyOutput, flushOp),
618         std::runtime_error);
619   }
620
621   // No progress is not okay twice in a row for all flush operations when
622   // uncompressing
623   for (const auto flushOp : flushOps) {
624     codec_->resetStream();
625     auto input = compressed->coalesce();
626     // Remove the last byte so the operation is incomplete
627     input.uncheckedSubtract(1);
628     MutableByteRange output = {inBuffer->writableData(), inBuffer->length()};
629     // Uncompress some data to avoid empty data special casing
630     while (!input.empty()) {
631       EXPECT_FALSE(codec_->uncompressStream(input, output));
632     }
633     EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput, flushOp));
634     EXPECT_THROW(
635         codec_->uncompressStream(emptyInput, emptyOutput, flushOp),
636         std::runtime_error);
637   }
638 }
639
640 TEST_P(StreamingUnitTest, stateTransitions) {
641   auto inBuffer = IOBuf::create(2);
642   inBuffer->writableData()[0] = 'a';
643   inBuffer->writableData()[1] = 'a';
644   inBuffer->append(2);
645   auto compressed = codec_->compress(inBuffer.get());
646   ByteRange const in = compressed->coalesce();
647   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
648   MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
649
650   auto compress = [&](
651       StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
652       bool empty = false) {
653     auto input = in;
654     auto output = empty ? MutableByteRange{} : out;
655     return codec_->compressStream(input, output, flushOp);
656   };
657   auto compress_all = [&](bool expect,
658                           StreamCodec::FlushOp flushOp =
659                               StreamCodec::FlushOp::NONE,
660                           bool empty = false) {
661     auto input = in;
662     auto output = empty ? MutableByteRange{} : out;
663     while (!input.empty()) {
664       if (expect) {
665         EXPECT_TRUE(codec_->compressStream(input, output, flushOp));
666       } else {
667         EXPECT_FALSE(codec_->compressStream(input, output, flushOp));
668       }
669     }
670   };
671   auto uncompress = [&](
672       StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
673       bool empty = false) {
674     auto input = in;
675     auto output = empty ? MutableByteRange{} : out;
676     return codec_->uncompressStream(input, output, flushOp);
677   };
678
679   // compression flow
680   if (!codec_->needsDataLength()) {
681     codec_->resetStream();
682     EXPECT_FALSE(compress());
683     EXPECT_FALSE(compress());
684     EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
685     EXPECT_FALSE(compress());
686     EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
687   }
688   codec_->resetStream(in.size() * 5);
689   compress_all(false);
690   compress_all(false);
691   compress_all(true, StreamCodec::FlushOp::FLUSH);
692   compress_all(false);
693   compress_all(true, StreamCodec::FlushOp::END);
694
695   // uncompression flow
696   codec_->resetStream();
697   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
698   codec_->resetStream();
699   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
700   codec_->resetStream();
701   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
702   codec_->resetStream();
703   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
704   codec_->resetStream();
705   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
706   // compress -> uncompress
707   codec_->resetStream(in.size());
708   EXPECT_FALSE(compress());
709   EXPECT_THROW(uncompress(), std::logic_error);
710   // uncompress -> compress
711   codec_->resetStream(inBuffer->computeChainDataLength());
712   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
713   EXPECT_THROW(compress(), std::logic_error);
714   // end -> compress
715   if (!codec_->needsDataLength()) {
716     codec_->resetStream();
717     EXPECT_FALSE(compress());
718     EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
719     EXPECT_THROW(compress(), std::logic_error);
720   }
721   codec_->resetStream(in.size() * 2);
722   compress_all(false);
723   compress_all(true, StreamCodec::FlushOp::END);
724   EXPECT_THROW(compress(), std::logic_error);
725   // end -> uncompress
726   codec_->resetStream();
727   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
728   EXPECT_THROW(uncompress(), std::logic_error);
729   // flush -> compress
730   codec_->resetStream(in.size());
731   EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
732   EXPECT_THROW(compress(), std::logic_error);
733   // flush -> end
734   codec_->resetStream(in.size());
735   EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
736   EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
737   // undefined -> compress
738   codec_->compress(inBuffer.get());
739   EXPECT_THROW(compress(), std::logic_error);
740   codec_->uncompress(compressed.get(), inBuffer->computeChainDataLength());
741   EXPECT_THROW(compress(), std::logic_error);
742   // undefined -> undefined
743   codec_->uncompress(compressed.get());
744   codec_->compress(inBuffer.get());
745 }
746
747 INSTANTIATE_TEST_CASE_P(
748     StreamingUnitTest,
749     StreamingUnitTest,
750     testing::ValuesIn(availableStreamCodecs()));
751
752 class StreamingCompressionTest
753     : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
754  protected:
755   void SetUp() override {
756     auto const tup = GetParam();
757     uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
758     chunkSize_ = size_t(1) << std::get<1>(tup);
759     codec_ = getStreamCodec(std::get<2>(tup));
760   }
761
762   void runResetStreamTest(DataHolder const& dh);
763   void runCompressStreamTest(DataHolder const& dh);
764   void runUncompressStreamTest(DataHolder const& dh);
765   void runFlushTest(DataHolder const& dh);
766
767  private:
768   std::vector<ByteRange> split(ByteRange data) const;
769
770   uint64_t uncompressedLength_;
771   size_t chunkSize_;
772   std::unique_ptr<StreamCodec> codec_;
773 };
774
775 std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
776   size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
777   std::vector<ByteRange> result;
778   result.reserve(pieces + 1);
779   while (!data.empty()) {
780     size_t const pieceSize = std::min(data.size(), chunkSize_);
781     result.push_back(data.subpiece(0, pieceSize));
782     data.uncheckedAdvance(pieceSize);
783   }
784   return result;
785 }
786
787 static std::unique_ptr<IOBuf> compressSome(
788     StreamCodec* codec,
789     ByteRange data,
790     uint64_t bufferSize,
791     StreamCodec::FlushOp flush) {
792   bool result;
793   IOBufQueue queue;
794   do {
795     auto buffer = IOBuf::create(bufferSize);
796     buffer->append(buffer->capacity());
797     MutableByteRange output{buffer->writableData(), buffer->length()};
798
799     result = codec->compressStream(data, output, flush);
800     buffer->trimEnd(output.size());
801     queue.append(std::move(buffer));
802
803   } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
804   EXPECT_TRUE(data.empty());
805   return queue.move();
806 }
807
808 static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
809     StreamCodec* codec,
810     ByteRange& data,
811     uint64_t bufferSize,
812     StreamCodec::FlushOp flush) {
813   bool result;
814   IOBufQueue queue;
815   do {
816     auto buffer = IOBuf::create(bufferSize);
817     buffer->append(buffer->capacity());
818     MutableByteRange output{buffer->writableData(), buffer->length()};
819
820     result = codec->uncompressStream(data, output, flush);
821     buffer->trimEnd(output.size());
822     queue.append(std::move(buffer));
823
824   } while (queue.tailroom() == 0 && !result);
825   return std::make_pair(result, queue.move());
826 }
827
828 void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
829   auto const input = dh.data(uncompressedLength_);
830   // Compress some but leave state unclean
831   codec_->resetStream(uncompressedLength_);
832   compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
833   // Reset stream and compress all
834   if (codec_->needsDataLength()) {
835     codec_->resetStream(uncompressedLength_);
836   } else {
837     codec_->resetStream();
838   }
839   auto compressed =
840       compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
841   auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
842   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
843 }
844
845 TEST_P(StreamingCompressionTest, resetStream) {
846   runResetStreamTest(constantDataHolder);
847   runResetStreamTest(randomDataHolder);
848 }
849
850 void StreamingCompressionTest::runCompressStreamTest(
851     const folly::io::test::DataHolder& dh) {
852   auto const inputs = split(dh.data(uncompressedLength_));
853
854   IOBufQueue queue;
855   codec_->resetStream(uncompressedLength_);
856   // Compress many inputs in a row
857   for (auto const input : inputs) {
858     queue.append(compressSome(
859         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
860   }
861   // Finish the operation with empty input.
862   ByteRange empty;
863   queue.append(
864       compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
865
866   auto const uncompressed = codec_->uncompress(queue.front());
867   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
868 }
869
870 TEST_P(StreamingCompressionTest, compressStream) {
871   runCompressStreamTest(constantDataHolder);
872   runCompressStreamTest(randomDataHolder);
873 }
874
875 void StreamingCompressionTest::runUncompressStreamTest(
876     const folly::io::test::DataHolder& dh) {
877   auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
878   // Concatenate 3 compressed frames in a row
879   auto compressed = codec_->compress(data.get());
880   compressed->prependChain(codec_->compress(data.get()));
881   compressed->prependChain(codec_->compress(data.get()));
882   // Pass all 3 compressed frames in one input buffer
883   auto input = compressed->coalesce();
884   // Uncompress the first frame
885   codec_->resetStream(data->computeChainDataLength());
886   {
887     auto const result = uncompressSome(
888         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
889     ASSERT_TRUE(result.first);
890     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
891   }
892   // Uncompress the second frame
893   codec_->resetStream();
894   {
895     auto const result = uncompressSome(
896         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
897     ASSERT_TRUE(result.first);
898     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
899   }
900   // Uncompress the third frame
901   codec_->resetStream();
902   {
903     auto const result = uncompressSome(
904         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
905     ASSERT_TRUE(result.first);
906     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
907   }
908   EXPECT_TRUE(input.empty());
909 }
910
911 TEST_P(StreamingCompressionTest, uncompressStream) {
912   runUncompressStreamTest(constantDataHolder);
913   runUncompressStreamTest(randomDataHolder);
914 }
915
916 void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
917   auto const inputs = split(dh.data(uncompressedLength_));
918   auto uncodec = getStreamCodec(codec_->type());
919
920   if (codec_->needsDataLength()) {
921     codec_->resetStream(uncompressedLength_);
922   } else {
923     codec_->resetStream();
924   }
925   for (auto input : inputs) {
926     // Compress some data and flush the stream
927     auto compressed = compressSome(
928         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
929     auto compressedRange = compressed->coalesce();
930     // Uncompress the compressed data
931     auto result = uncompressSome(
932         uncodec.get(),
933         compressedRange,
934         chunkSize_,
935         StreamCodec::FlushOp::FLUSH);
936     // All compressed data should have been consumed
937     EXPECT_TRUE(compressedRange.empty());
938     // The frame isn't complete
939     EXPECT_FALSE(result.first);
940     // The uncompressed data should be exactly the input data
941     EXPECT_EQ(input.size(), result.second->computeChainDataLength());
942     auto const data = IOBuf::wrapBuffer(input);
943     EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
944   }
945 }
946
947 TEST_P(StreamingCompressionTest, testFlush) {
948   runFlushTest(constantDataHolder);
949   runFlushTest(randomDataHolder);
950 }
951
952 INSTANTIATE_TEST_CASE_P(
953     StreamingCompressionTest,
954     StreamingCompressionTest,
955     testing::Combine(
956         testing::Values(0, 1, 12, 22, 27),
957         testing::Values(12, 17, 20),
958         testing::ValuesIn(availableStreamCodecs())));
959
960 namespace {
961
962 // Codec types included in the codec returned by getAutoUncompressionCodec() by
963 // default.
964 std::vector<CodecType> autoUncompressionCodecTypes = {{
965     CodecType::LZ4_FRAME,
966     CodecType::ZSTD,
967     CodecType::ZLIB,
968     CodecType::GZIP,
969     CodecType::LZMA2,
970     CodecType::BZIP2,
971 }};
972
973 } // namespace
974
975 class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
976  protected:
977   void SetUp() override {
978     codecType_ = GetParam();
979     codec_ = getCodec(codecType_);
980     autoType_ = std::any_of(
981         autoUncompressionCodecTypes.begin(),
982         autoUncompressionCodecTypes.end(),
983         [&](CodecType o) { return codecType_ == o; });
984     // Add the codec with type codecType_ as the terminal codec if it is not in
985     // autoUncompressionCodecTypes.
986     auto_ = getAutoUncompressionCodec({}, getTerminalCodec());
987   }
988
989   void runSimpleTest(const DataHolder& dh);
990
991   std::unique_ptr<Codec> getTerminalCodec() {
992     return (autoType_ ? nullptr : getCodec(codecType_));
993   }
994
995   std::unique_ptr<Codec> codec_;
996   std::unique_ptr<Codec> auto_;
997   CodecType codecType_;
998   // true if codecType_ is in autoUncompressionCodecTypes
999   bool autoType_;
1000 };
1001
1002 void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) {
1003   constexpr uint64_t uncompressedLength = 1000;
1004   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
1005   auto compressed = codec_->compress(original.get());
1006
1007   if (!codec_->needsUncompressedLength()) {
1008     auto uncompressed = auto_->uncompress(compressed.get());
1009     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
1010     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
1011   }
1012   {
1013     auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength);
1014     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
1015     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
1016   }
1017   ASSERT_GE(compressed->computeChainDataLength(), 8);
1018   for (size_t i = 0; i < 8; ++i) {
1019     auto split = compressed->clone();
1020     auto rest = compressed->clone();
1021     split->trimEnd(split->length() - i);
1022     rest->trimStart(i);
1023     split->appendChain(std::move(rest));
1024     auto uncompressed = auto_->uncompress(split.get(), uncompressedLength);
1025     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
1026     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
1027   }
1028 }
1029
1030 TEST_P(AutomaticCodecTest, RandomData) {
1031   runSimpleTest(randomDataHolder);
1032 }
1033
1034 TEST_P(AutomaticCodecTest, ConstantData) {
1035   runSimpleTest(constantDataHolder);
1036 }
1037
1038 TEST_P(AutomaticCodecTest, ValidPrefixes) {
1039   const auto prefixes = codec_->validPrefixes();
1040   for (const auto& prefix : prefixes) {
1041     EXPECT_FALSE(prefix.empty());
1042     // Ensure that all strings are at least 8 bytes for LZMA2.
1043     // The bytes after the prefix should be ignored by `canUncompress()`.
1044     IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8};
1045     data.append(8);
1046     EXPECT_TRUE(codec_->canUncompress(&data));
1047     EXPECT_TRUE(auto_->canUncompress(&data));
1048   }
1049 }
1050
1051 TEST_P(AutomaticCodecTest, NeedsUncompressedLength) {
1052   if (codec_->needsUncompressedLength()) {
1053     EXPECT_TRUE(auto_->needsUncompressedLength());
1054   }
1055 }
1056
1057 TEST_P(AutomaticCodecTest, maxUncompressedLength) {
1058   EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength());
1059 }
1060
1061 TEST_P(AutomaticCodecTest, DefaultCodec) {
1062   const uint64_t length = 42;
1063   std::vector<std::unique_ptr<Codec>> codecs;
1064   codecs.push_back(getCodec(CodecType::ZSTD));
1065   auto automatic =
1066       getAutoUncompressionCodec(std::move(codecs), getTerminalCodec());
1067   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1068   auto compressed = codec_->compress(original.get());
1069   std::unique_ptr<IOBuf> decompressed;
1070
1071   if (automatic->needsUncompressedLength()) {
1072     decompressed = automatic->uncompress(compressed.get(), length);
1073   } else {
1074     decompressed = automatic->uncompress(compressed.get());
1075   }
1076
1077   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1078 }
1079
1080 namespace {
1081 class CustomCodec : public Codec {
1082  public:
1083   static std::unique_ptr<Codec> create(std::string prefix, CodecType type) {
1084     return std::make_unique<CustomCodec>(std::move(prefix), type);
1085   }
1086   explicit CustomCodec(std::string prefix, CodecType type)
1087       : Codec(CodecType::USER_DEFINED),
1088         prefix_(std::move(prefix)),
1089         codec_(getCodec(type)) {}
1090
1091  private:
1092   std::vector<std::string> validPrefixes() const override {
1093     return {prefix_};
1094   }
1095
1096   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
1097     return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
1098   }
1099
1100   bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
1101     auto clone = data->cloneCoalescedAsValue();
1102     if (clone.length() < prefix_.size()) {
1103       return false;
1104     }
1105     return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0;
1106   }
1107
1108   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override {
1109     auto result = IOBuf::copyBuffer(prefix_);
1110     result->appendChain(codec_->compress(data));
1111     EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength()));
1112     return result;
1113   }
1114
1115   std::unique_ptr<IOBuf> doUncompress(
1116       const IOBuf* data,
1117       Optional<uint64_t> uncompressedLength) override {
1118     EXPECT_TRUE(canUncompress(data, uncompressedLength));
1119     auto clone = data->cloneCoalescedAsValue();
1120     clone.trimStart(prefix_.size());
1121     return codec_->uncompress(&clone, uncompressedLength);
1122   }
1123
1124   std::string prefix_;
1125   std::unique_ptr<Codec> codec_;
1126 };
1127 } // namespace
1128
1129 TEST_P(AutomaticCodecTest, CustomCodec) {
1130   const uint64_t length = 42;
1131   auto ab = CustomCodec::create("ab", CodecType::ZSTD);
1132   std::vector<std::unique_ptr<Codec>> codecs;
1133   codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD));
1134   auto automatic =
1135       getAutoUncompressionCodec(std::move(codecs), getTerminalCodec());
1136   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1137
1138   auto abCompressed = ab->compress(original.get());
1139   std::unique_ptr<IOBuf> abDecompressed;
1140   if (automatic->needsUncompressedLength()) {
1141     abDecompressed = automatic->uncompress(abCompressed.get(), length);
1142   } else {
1143     abDecompressed = automatic->uncompress(abCompressed.get());
1144   }
1145   EXPECT_TRUE(automatic->canUncompress(abCompressed.get()));
1146   EXPECT_FALSE(auto_->canUncompress(abCompressed.get()));
1147   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get()));
1148
1149   auto compressed = codec_->compress(original.get());
1150   std::unique_ptr<IOBuf> decompressed;
1151   if (automatic->needsUncompressedLength()) {
1152     decompressed = automatic->uncompress(compressed.get(), length);
1153   } else {
1154     decompressed = automatic->uncompress(compressed.get());
1155   }
1156   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1157 }
1158
1159 TEST_P(AutomaticCodecTest, CustomDefaultCodec) {
1160   const uint64_t length = 42;
1161   auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION);
1162   std::vector<std::unique_ptr<Codec>> codecs;
1163   codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1164   codecs.push_back(getCodec(CodecType::LZ4_FRAME));
1165   auto automatic =
1166       getAutoUncompressionCodec(std::move(codecs), getTerminalCodec());
1167   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1168
1169   auto noneCompressed = none->compress(original.get());
1170   std::unique_ptr<IOBuf> noneDecompressed;
1171   if (automatic->needsUncompressedLength()) {
1172     noneDecompressed = automatic->uncompress(noneCompressed.get(), length);
1173   } else {
1174     noneDecompressed = automatic->uncompress(noneCompressed.get());
1175   }
1176   EXPECT_TRUE(automatic->canUncompress(noneCompressed.get()));
1177   EXPECT_FALSE(auto_->canUncompress(noneCompressed.get()));
1178   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get()));
1179
1180   auto compressed = codec_->compress(original.get());
1181   std::unique_ptr<IOBuf> decompressed;
1182   if (automatic->needsUncompressedLength()) {
1183     decompressed = automatic->uncompress(compressed.get(), length);
1184   } else {
1185     decompressed = automatic->uncompress(compressed.get());
1186   }
1187   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1188 }
1189
1190 TEST_P(AutomaticCodecTest, canUncompressOneBytes) {
1191   // No default codec can uncompress 1 bytes.
1192   IOBuf buf{IOBuf::CREATE, 1};
1193   buf.append(1);
1194   EXPECT_FALSE(codec_->canUncompress(&buf, 1));
1195   EXPECT_FALSE(codec_->canUncompress(&buf, folly::none));
1196   EXPECT_FALSE(auto_->canUncompress(&buf, 1));
1197   EXPECT_FALSE(auto_->canUncompress(&buf, folly::none));
1198 }
1199
1200 INSTANTIATE_TEST_CASE_P(
1201     AutomaticCodecTest,
1202     AutomaticCodecTest,
1203     testing::ValuesIn(availableCodecs()));
1204
1205 namespace {
1206
1207 // Codec that always "uncompresses" to the same string.
1208 class ConstantCodec : public Codec {
1209  public:
1210   static std::unique_ptr<Codec> create(
1211       std::string uncompressed,
1212       CodecType type) {
1213     return std::make_unique<ConstantCodec>(std::move(uncompressed), type);
1214   }
1215   explicit ConstantCodec(std::string uncompressed, CodecType type)
1216       : Codec(type), uncompressed_(std::move(uncompressed)) {}
1217
1218  private:
1219   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
1220     return uncompressedLength;
1221   }
1222
1223   std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
1224     throw std::runtime_error("ConstantCodec error: compress() not supported.");
1225   }
1226
1227   std::unique_ptr<IOBuf> doUncompress(const IOBuf*, Optional<uint64_t>)
1228       override {
1229     return IOBuf::copyBuffer(uncompressed_);
1230   }
1231
1232   std::string uncompressed_;
1233   std::unique_ptr<Codec> codec_;
1234 };
1235
1236 } // namespace
1237
1238 class TerminalCodecTest : public testing::TestWithParam<CodecType> {
1239  protected:
1240   void SetUp() override {
1241     codecType_ = GetParam();
1242     codec_ = getCodec(codecType_);
1243     auto_ = getAutoUncompressionCodec();
1244   }
1245
1246   CodecType codecType_;
1247   std::unique_ptr<Codec> codec_;
1248   std::unique_ptr<Codec> auto_;
1249 };
1250
1251 // Test that the terminal codec's uncompress() function is called when the
1252 // default chosen automatic codec throws.
1253 TEST_P(TerminalCodecTest, uncompressIfDefaultThrows) {
1254   std::string const original = "abc";
1255   auto const compressed = codec_->compress(original);
1256
1257   // Sanity check: the automatic codec can uncompress the original string.
1258   auto const uncompressed = auto_->uncompress(compressed);
1259   EXPECT_EQ(uncompressed, original);
1260
1261   // Truncate the compressed string.
1262   auto const truncated = compressed.substr(0, compressed.size() - 1);
1263   auto const truncatedBuf =
1264       IOBuf::wrapBuffer(truncated.data(), truncated.size());
1265   EXPECT_TRUE(auto_->canUncompress(truncatedBuf.get()));
1266   EXPECT_ANY_THROW(auto_->uncompress(truncated));
1267
1268   // Expect the terminal codec to successfully uncompress the string.
1269   std::unique_ptr<Codec> terminal = getAutoUncompressionCodec(
1270       {}, ConstantCodec::create("dummyString", CodecType::USER_DEFINED));
1271   EXPECT_TRUE(terminal->canUncompress(truncatedBuf.get()));
1272   EXPECT_EQ(terminal->uncompress(truncated), "dummyString");
1273 }
1274
1275 // If the terminal codec has one of the "default types" automatically added in
1276 // the AutomaticCodec, check that the default codec is no longer added.
1277 TEST_P(TerminalCodecTest, terminalOverridesDefaults) {
1278   std::unique_ptr<Codec> terminal = getAutoUncompressionCodec(
1279       {}, ConstantCodec::create("dummyString", codecType_));
1280   std::string const original = "abc";
1281   auto const compressed = codec_->compress(original);
1282   EXPECT_EQ(terminal->uncompress(compressed), "dummyString");
1283 }
1284
1285 INSTANTIATE_TEST_CASE_P(
1286     TerminalCodecTest,
1287     TerminalCodecTest,
1288     testing::ValuesIn(autoUncompressionCodecTypes));
1289
1290 TEST(ValidPrefixesTest, CustomCodec) {
1291   std::vector<std::unique_ptr<Codec>> codecs;
1292   codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1293   const auto none = getAutoUncompressionCodec(std::move(codecs));
1294   const auto prefixes = none->validPrefixes();
1295   const auto it = std::find(prefixes.begin(), prefixes.end(), "none");
1296   EXPECT_TRUE(it != prefixes.end());
1297 }
1298
1299 #define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \
1300   do {                                                       \
1301     if (kIsDebug) {                                          \
1302       EXPECT_THROW((statement), expected_exception);         \
1303     } else {                                                 \
1304       EXPECT_NO_THROW((statement));                          \
1305     }                                                        \
1306   } while (false)
1307
1308 TEST(CheckCompatibleTest, SimplePrefixSecond) {
1309   std::vector<std::unique_ptr<Codec>> codecs;
1310   codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1311   codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1312   EXPECT_THROW_IF_DEBUG(
1313       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1314 }
1315
1316 TEST(CheckCompatibleTest, SimplePrefixFirst) {
1317   std::vector<std::unique_ptr<Codec>> codecs;
1318   codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1319   codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1320   EXPECT_THROW_IF_DEBUG(
1321       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1322 }
1323
1324 TEST(CheckCompatibleTest, Empty) {
1325   std::vector<std::unique_ptr<Codec>> codecs;
1326   codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION));
1327   EXPECT_THROW_IF_DEBUG(
1328       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1329 }
1330
1331 TEST(CheckCompatibleTest, ZstdPrefix) {
1332   std::vector<std::unique_ptr<Codec>> codecs;
1333   codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD));
1334   EXPECT_THROW_IF_DEBUG(
1335       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1336 }
1337
1338 TEST(CheckCompatibleTest, ZstdDuplicate) {
1339   std::vector<std::unique_ptr<Codec>> codecs;
1340   codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD));
1341   EXPECT_THROW_IF_DEBUG(
1342       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1343 }
1344
1345 TEST(CheckCompatibleTest, ZlibIsPrefix) {
1346   std::vector<std::unique_ptr<Codec>> codecs;
1347   codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD));
1348   EXPECT_THROW_IF_DEBUG(
1349       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1350 }
1351
1352 #if FOLLY_HAVE_LIBZSTD
1353
1354 TEST(ZstdTest, BackwardCompatible) {
1355   auto codec = getCodec(CodecType::ZSTD);
1356   {
1357     auto const data = IOBuf::wrapBuffer(randomDataHolder.data(size_t(1) << 20));
1358     auto compressed = codec->compress(data.get());
1359     compressed->coalesce();
1360     EXPECT_EQ(
1361         data->length(),
1362         ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1363   }
1364   {
1365     auto const data =
1366         IOBuf::wrapBuffer(randomDataHolder.data(size_t(100) << 20));
1367     auto compressed = codec->compress(data.get());
1368     compressed->coalesce();
1369     EXPECT_EQ(
1370         data->length(),
1371         ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1372   }
1373 }
1374
1375 #endif
1376
1377 #if FOLLY_HAVE_LIBZ
1378
1379 using ZlibFormat = zlib::Options::Format;
1380
1381 TEST(ZlibTest, Auto) {
1382   size_t const uncompressedLength_ = (size_t)1 << 15;
1383   auto const original = std::string(
1384       reinterpret_cast<const char*>(
1385           randomDataHolder.data(uncompressedLength_).data()),
1386       uncompressedLength_);
1387   auto optionCodec = zlib::getCodec(zlib::Options(ZlibFormat::AUTO));
1388
1389   // Test the codec can uncompress zlib data.
1390   {
1391     auto codec = getCodec(CodecType::ZLIB);
1392     auto const compressed = codec->compress(original);
1393     auto const uncompressed = optionCodec->uncompress(compressed);
1394     EXPECT_EQ(original, uncompressed);
1395   }
1396
1397   // Test the codec can uncompress gzip data.
1398   {
1399     auto codec = getCodec(CodecType::GZIP);
1400     auto const compressed = codec->compress(original);
1401     auto const uncompressed = optionCodec->uncompress(compressed);
1402     EXPECT_EQ(original, uncompressed);
1403   }
1404 }
1405
1406 TEST(ZlibTest, DefaultOptions) {
1407   size_t const uncompressedLength_ = (size_t)1 << 20;
1408   auto const original = std::string(
1409       reinterpret_cast<const char*>(
1410           randomDataHolder.data(uncompressedLength_).data()),
1411       uncompressedLength_);
1412   {
1413     auto codec = getCodec(CodecType::ZLIB);
1414     auto optionCodec = zlib::getCodec(zlib::defaultZlibOptions());
1415     auto const compressed = optionCodec->compress(original);
1416     auto uncompressed = codec->uncompress(compressed);
1417     EXPECT_EQ(original, uncompressed);
1418     uncompressed = optionCodec->uncompress(compressed);
1419     EXPECT_EQ(original, uncompressed);
1420   }
1421
1422   {
1423     auto codec = getCodec(CodecType::GZIP);
1424     auto optionCodec = zlib::getCodec(zlib::defaultGzipOptions());
1425     auto const compressed = optionCodec->compress(original);
1426     auto uncompressed = codec->uncompress(compressed);
1427     EXPECT_EQ(original, uncompressed);
1428     uncompressed = optionCodec->uncompress(compressed);
1429     EXPECT_EQ(original, uncompressed);
1430   }
1431 }
1432
1433 class ZlibOptionsTest : public testing::TestWithParam<
1434                             std::tr1::tuple<ZlibFormat, int, int, int>> {
1435  protected:
1436   void SetUp() override {
1437     auto tup = GetParam();
1438     options_.format = std::tr1::get<0>(tup);
1439     options_.windowSize = std::tr1::get<1>(tup);
1440     options_.memLevel = std::tr1::get<2>(tup);
1441     options_.strategy = std::tr1::get<3>(tup);
1442     codec_ = zlib::getStreamCodec(options_);
1443   }
1444
1445   void runSimpleRoundTripTest(const DataHolder& dh);
1446
1447  private:
1448   zlib::Options options_;
1449   std::unique_ptr<StreamCodec> codec_;
1450 };
1451
1452 void ZlibOptionsTest::runSimpleRoundTripTest(const DataHolder& dh) {
1453   size_t const uncompressedLength = (size_t)1 << 16;
1454   auto const original = std::string(
1455       reinterpret_cast<const char*>(dh.data(uncompressedLength).data()),
1456       uncompressedLength);
1457
1458   auto const compressed = codec_->compress(original);
1459   auto const uncompressed = codec_->uncompress(compressed);
1460   EXPECT_EQ(uncompressed, original);
1461 }
1462
1463 TEST_P(ZlibOptionsTest, simpleRoundTripTest) {
1464   runSimpleRoundTripTest(constantDataHolder);
1465   runSimpleRoundTripTest(randomDataHolder);
1466 }
1467
1468 INSTANTIATE_TEST_CASE_P(
1469     ZlibOptionsTest,
1470     ZlibOptionsTest,
1471     testing::Combine(
1472         testing::Values(
1473             ZlibFormat::ZLIB,
1474             ZlibFormat::GZIP,
1475             ZlibFormat::RAW,
1476             ZlibFormat::AUTO),
1477         testing::Values(9, 12, 15),
1478         testing::Values(1, 8, 9),
1479         testing::Values(
1480             Z_DEFAULT_STRATEGY,
1481             Z_FILTERED,
1482             Z_HUFFMAN_ONLY,
1483             Z_RLE,
1484             Z_FIXED)));
1485
1486 #endif // FOLLY_HAVE_LIBZ
1487
1488 } // namespace test
1489 } // namespace io
1490 } // namespace folly