Fix decompression of truncated data
[folly.git] / folly / io / test / CompressionTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/Compression.h>
18
19 #include <algorithm>
20 #include <random>
21 #include <set>
22 #include <thread>
23 #include <unordered_map>
24 #include <utility>
25
26 #include <boost/noncopyable.hpp>
27 #include <glog/logging.h>
28
29 #include <folly/Benchmark.h>
30 #include <folly/Hash.h>
31 #include <folly/Memory.h>
32 #include <folly/Random.h>
33 #include <folly/Varint.h>
34 #include <folly/io/IOBufQueue.h>
35 #include <folly/portability/GTest.h>
36
37 #if FOLLY_HAVE_LIBZSTD
38 #include <zstd.h>
39 #endif
40
41 namespace folly { namespace io { namespace test {
42
43 class DataHolder : private boost::noncopyable {
44  public:
45   uint64_t hash(size_t size) const;
46   ByteRange data(size_t size) const;
47
48  protected:
49   explicit DataHolder(size_t sizeLog2);
50   const size_t size_;
51   std::unique_ptr<uint8_t[]> data_;
52   mutable std::unordered_map<uint64_t, uint64_t> hashCache_;
53 };
54
55 DataHolder::DataHolder(size_t sizeLog2)
56   : size_(size_t(1) << sizeLog2),
57     data_(new uint8_t[size_]) {
58 }
59
60 uint64_t DataHolder::hash(size_t size) const {
61   CHECK_LE(size, size_);
62   auto p = hashCache_.find(size);
63   if (p != hashCache_.end()) {
64     return p->second;
65   }
66
67   uint64_t h = folly::hash::fnv64_buf(data_.get(), size);
68   hashCache_[size] = h;
69   return h;
70 }
71
72 ByteRange DataHolder::data(size_t size) const {
73   CHECK_LE(size, size_);
74   return ByteRange(data_.get(), size);
75 }
76
77 uint64_t hashIOBuf(const IOBuf* buf) {
78   uint64_t h = folly::hash::FNV_64_HASH_START;
79   for (auto& range : *buf) {
80     h = folly::hash::fnv64_buf(range.data(), range.size(), h);
81   }
82   return h;
83 }
84
85 class RandomDataHolder : public DataHolder {
86  public:
87   explicit RandomDataHolder(size_t sizeLog2);
88 };
89
90 RandomDataHolder::RandomDataHolder(size_t sizeLog2)
91   : DataHolder(sizeLog2) {
92   static constexpr size_t numThreadsLog2 = 3;
93   static constexpr size_t numThreads = size_t(1) << numThreadsLog2;
94
95   uint32_t seed = randomNumberSeed();
96
97   std::vector<std::thread> threads;
98   threads.reserve(numThreads);
99   for (size_t t = 0; t < numThreads; ++t) {
100     threads.emplace_back([this, seed, t, sizeLog2] {
101       std::mt19937 rng(seed + t);
102       size_t countLog2 = sizeLog2 - numThreadsLog2;
103       size_t start = size_t(t) << countLog2;
104       for (size_t i = 0; i < countLog2; ++i) {
105         this->data_[start + i] = rng();
106       }
107     });
108   }
109
110   for (auto& t : threads) {
111     t.join();
112   }
113 }
114
115 class ConstantDataHolder : public DataHolder {
116  public:
117   explicit ConstantDataHolder(size_t sizeLog2);
118 };
119
120 ConstantDataHolder::ConstantDataHolder(size_t sizeLog2)
121   : DataHolder(sizeLog2) {
122   memset(data_.get(), 'a', size_);
123 }
124
125 constexpr size_t dataSizeLog2 = 27;  // 128MiB
126 RandomDataHolder randomDataHolder(dataSizeLog2);
127 ConstantDataHolder constantDataHolder(dataSizeLog2);
128
129 // The intersection of the provided codecs & those that are compiled in.
130 static std::vector<CodecType> supportedCodecs(std::vector<CodecType> const& v) {
131   std::vector<CodecType> supported;
132
133   std::copy_if(
134       std::begin(v),
135       std::end(v),
136       std::back_inserter(supported),
137       hasCodec);
138
139   return supported;
140 }
141
142 // All compiled-in compression codecs.
143 static std::vector<CodecType> availableCodecs() {
144   std::vector<CodecType> codecs;
145
146   for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
147     auto type = static_cast<CodecType>(i);
148     if (hasCodec(type)) {
149       codecs.push_back(type);
150     }
151   }
152
153   return codecs;
154 }
155
156 static std::vector<CodecType> availableStreamCodecs() {
157   std::vector<CodecType> codecs;
158
159   for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
160     auto type = static_cast<CodecType>(i);
161     if (hasStreamCodec(type)) {
162       codecs.push_back(type);
163     }
164   }
165
166   return codecs;
167 }
168
169 TEST(CompressionTestNeedsUncompressedLength, Simple) {
170   static const struct { CodecType type; bool needsUncompressedLength; }
171     expectations[] = {
172       { CodecType::NO_COMPRESSION, false },
173       { CodecType::LZ4, true },
174       { CodecType::SNAPPY, false },
175       { CodecType::ZLIB, false },
176       { CodecType::LZ4_VARINT_SIZE, false },
177       { CodecType::LZMA2, false },
178       { CodecType::LZMA2_VARINT_SIZE, false },
179       { CodecType::ZSTD, false },
180       { CodecType::GZIP, false },
181       { CodecType::LZ4_FRAME, false },
182       { CodecType::BZIP2, false },
183     };
184
185   for (auto const& test : expectations) {
186     if (hasCodec(test.type)) {
187       EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(),
188                 test.needsUncompressedLength);
189     }
190   }
191 }
192
193 class CompressionTest
194     : public testing::TestWithParam<std::tr1::tuple<int, int, CodecType>> {
195  protected:
196   void SetUp() override {
197     auto tup = GetParam();
198     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
199     chunks_ = std::tr1::get<1>(tup);
200     codec_ = getCodec(std::tr1::get<2>(tup));
201   }
202
203   void runSimpleIOBufTest(const DataHolder& dh);
204
205   void runSimpleStringTest(const DataHolder& dh);
206
207  private:
208   std::unique_ptr<IOBuf> split(std::unique_ptr<IOBuf> data) const;
209
210   uint64_t uncompressedLength_;
211   size_t chunks_;
212   std::unique_ptr<Codec> codec_;
213 };
214
215 void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) {
216   const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_)));
217   const auto compressed = split(codec_->compress(original.get()));
218   if (!codec_->needsUncompressedLength()) {
219     auto uncompressed = codec_->uncompress(compressed.get());
220     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
221     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
222   }
223   {
224     auto uncompressed = codec_->uncompress(compressed.get(),
225                                            uncompressedLength_);
226     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
227     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
228   }
229 }
230
231 void CompressionTest::runSimpleStringTest(const DataHolder& dh) {
232   const auto original = std::string(
233       reinterpret_cast<const char*>(dh.data(uncompressedLength_).data()),
234       uncompressedLength_);
235   const auto compressed = codec_->compress(original);
236   if (!codec_->needsUncompressedLength()) {
237     auto uncompressed = codec_->uncompress(compressed);
238     EXPECT_EQ(uncompressedLength_, uncompressed.length());
239     EXPECT_EQ(uncompressed, original);
240   }
241   {
242     auto uncompressed = codec_->uncompress(compressed, uncompressedLength_);
243     EXPECT_EQ(uncompressedLength_, uncompressed.length());
244     EXPECT_EQ(uncompressed, original);
245   }
246 }
247
248 // Uniformly split data into (potentially empty) chunks.
249 std::unique_ptr<IOBuf> CompressionTest::split(
250     std::unique_ptr<IOBuf> data) const {
251   if (data->isChained()) {
252     data->coalesce();
253   }
254
255   const size_t size = data->computeChainDataLength();
256
257   std::multiset<size_t> splits;
258   for (size_t i = 1; i < chunks_; ++i) {
259     splits.insert(Random::rand64(size));
260   }
261
262   folly::IOBufQueue result;
263
264   size_t offset = 0;
265   for (size_t split : splits) {
266     result.append(IOBuf::copyBuffer(data->data() + offset, split - offset));
267     offset = split;
268   }
269   result.append(IOBuf::copyBuffer(data->data() + offset, size - offset));
270
271   return result.move();
272 }
273
274 TEST_P(CompressionTest, RandomData) {
275   runSimpleIOBufTest(randomDataHolder);
276 }
277
278 TEST_P(CompressionTest, ConstantData) {
279   runSimpleIOBufTest(constantDataHolder);
280 }
281
282 TEST_P(CompressionTest, RandomDataString) {
283   runSimpleStringTest(randomDataHolder);
284 }
285
286 TEST_P(CompressionTest, ConstantDataString) {
287   runSimpleStringTest(constantDataHolder);
288 }
289
290 INSTANTIATE_TEST_CASE_P(
291     CompressionTest,
292     CompressionTest,
293     testing::Combine(
294         testing::Values(0, 1, 12, 22, 25, 27),
295         testing::Values(1, 2, 3, 8, 65),
296         testing::ValuesIn(availableCodecs())));
297
298 class CompressionVarintTest
299     : public testing::TestWithParam<std::tr1::tuple<int, CodecType>> {
300  protected:
301   void SetUp() override {
302     auto tup = GetParam();
303     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
304     codec_ = getCodec(std::tr1::get<1>(tup));
305   }
306
307   void runSimpleTest(const DataHolder& dh);
308
309   uint64_t uncompressedLength_;
310   std::unique_ptr<Codec> codec_;
311 };
312
313 inline uint64_t oneBasedMsbPos(uint64_t number) {
314   uint64_t pos = 0;
315   for (; number > 0; ++pos, number >>= 1) {
316   }
317   return pos;
318 }
319
320 void CompressionVarintTest::runSimpleTest(const DataHolder& dh) {
321   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
322   auto compressed = codec_->compress(original.get());
323   auto breakPoint =
324       1UL +
325       Random::rand64(
326           std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL);
327   auto tinyBuf = IOBuf::copyBuffer(compressed->data(),
328                                    std::min(compressed->length(), breakPoint));
329   compressed->trimStart(breakPoint);
330   tinyBuf->prependChain(std::move(compressed));
331   compressed = std::move(tinyBuf);
332
333   auto uncompressed = codec_->uncompress(compressed.get());
334
335   EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
336   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
337 }
338
339 TEST_P(CompressionVarintTest, RandomData) {
340   runSimpleTest(randomDataHolder);
341 }
342
343 TEST_P(CompressionVarintTest, ConstantData) {
344   runSimpleTest(constantDataHolder);
345 }
346
347 INSTANTIATE_TEST_CASE_P(
348     CompressionVarintTest,
349     CompressionVarintTest,
350     testing::Combine(
351         testing::Values(0, 1, 12, 22, 25, 27),
352         testing::ValuesIn(supportedCodecs({
353             CodecType::LZ4_VARINT_SIZE,
354             CodecType::LZMA2_VARINT_SIZE,
355             }))));
356
357 class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
358  protected:
359   void SetUp() override { codec_ = getCodec(GetParam()); }
360
361   void runSimpleTest(const DataHolder& dh);
362
363   std::unique_ptr<Codec> codec_;
364 };
365
366 void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) {
367   constexpr uint64_t uncompressedLength = 42;
368   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
369   auto compressed = codec_->compress(original.get());
370
371   if (!codec_->needsUncompressedLength()) {
372     auto uncompressed = codec_->uncompress(compressed.get());
373     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
374     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
375   }
376   {
377     auto uncompressed = codec_->uncompress(compressed.get(),
378                                            uncompressedLength);
379     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
380     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
381   }
382
383   EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1),
384                std::runtime_error);
385
386   auto corrupted = compressed->clone();
387   corrupted->unshare();
388   // Truncate the last character
389   corrupted->prev()->trimEnd(1);
390   if (!codec_->needsUncompressedLength()) {
391     EXPECT_THROW(codec_->uncompress(corrupted.get()),
392                  std::runtime_error);
393   }
394
395   EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
396                std::runtime_error);
397
398   corrupted = compressed->clone();
399   corrupted->unshare();
400   // Corrupt the first character
401   ++(corrupted->writableData()[0]);
402
403   if (!codec_->needsUncompressedLength()) {
404     EXPECT_THROW(codec_->uncompress(corrupted.get()),
405                  std::runtime_error);
406   }
407
408   EXPECT_THROW(codec_->uncompress(corrupted.get(), uncompressedLength),
409                std::runtime_error);
410 }
411
412 TEST_P(CompressionCorruptionTest, RandomData) {
413   runSimpleTest(randomDataHolder);
414 }
415
416 TEST_P(CompressionCorruptionTest, ConstantData) {
417   runSimpleTest(constantDataHolder);
418 }
419
420 INSTANTIATE_TEST_CASE_P(
421     CompressionCorruptionTest,
422     CompressionCorruptionTest,
423     testing::ValuesIn(
424         // NO_COMPRESSION can't detect corruption
425         // LZ4 can't detect corruption reliably (sigh)
426         supportedCodecs({
427             CodecType::SNAPPY,
428             CodecType::ZLIB,
429             CodecType::LZMA2,
430             CodecType::ZSTD,
431             CodecType::LZ4_FRAME,
432             CodecType::BZIP2,
433         })));
434
435 class StreamingUnitTest : public testing::TestWithParam<CodecType> {
436  protected:
437   void SetUp() override {
438     codec_ = getStreamCodec(GetParam());
439   }
440
441   std::unique_ptr<StreamCodec> codec_;
442 };
443
444 TEST_P(StreamingUnitTest, maxCompressedLength) {
445   EXPECT_EQ(0, codec_->maxCompressedLength(0));
446   for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
447     EXPECT_GE(codec_->maxCompressedLength(length), length);
448   }
449 }
450
451 TEST_P(StreamingUnitTest, getUncompressedLength) {
452   auto const empty = IOBuf::create(0);
453   EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
454   EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
455
456   auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
457   auto const compressed = codec_->compress(data.get());
458
459   EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0));
460   if (auto const length = codec_->getUncompressedLength(data.get())) {
461     EXPECT_EQ(100, *length);
462   }
463   EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
464   // If the uncompressed length is stored in the frame, then make sure it throws
465   // when it is given the wrong length.
466   if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
467     EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
468   }
469 }
470
471 TEST_P(StreamingUnitTest, emptyData) {
472   ByteRange input{};
473   auto buffer = IOBuf::create(1);
474   buffer->append(buffer->capacity());
475   MutableByteRange output{};
476
477   // Test compressing empty data in one pass
478   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
479   codec_->resetStream(0);
480   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
481   codec_->resetStream();
482   output = {buffer->writableData(), buffer->length()};
483   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
484   EXPECT_EQ(buffer->length(), output.size());
485
486   // Test compressing empty data with multiple calls to compressStream()
487   codec_->resetStream();
488   output = {};
489   EXPECT_FALSE(codec_->compressStream(input, output));
490   EXPECT_TRUE(
491       codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
492   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
493   codec_->resetStream();
494   output = {buffer->writableData(), buffer->length()};
495   EXPECT_FALSE(codec_->compressStream(input, output));
496   EXPECT_TRUE(
497       codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
498   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
499   EXPECT_EQ(buffer->length(), output.size());
500
501   // Test uncompressing empty data
502   output = {};
503   codec_->resetStream();
504   EXPECT_TRUE(codec_->uncompressStream(input, output));
505   codec_->resetStream();
506   EXPECT_TRUE(
507       codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
508   codec_->resetStream();
509   EXPECT_TRUE(
510       codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
511   codec_->resetStream(0);
512   EXPECT_TRUE(codec_->uncompressStream(input, output));
513   codec_->resetStream(0);
514   EXPECT_TRUE(
515       codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
516   codec_->resetStream(0);
517   EXPECT_TRUE(
518       codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
519 }
520
521 TEST_P(StreamingUnitTest, noForwardProgressOkay) {
522   auto inBuffer = IOBuf::create(2);
523   inBuffer->writableData()[0] = 'a';
524   inBuffer->writableData()[0] = 'a';
525   inBuffer->append(2);
526   auto input = inBuffer->coalesce();
527   auto compressed = codec_->compress(inBuffer.get());
528
529   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
530   MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()};
531
532   ByteRange emptyInput;
533   MutableByteRange emptyOutput;
534
535   // Compress some data to avoid empty data special casing
536   codec_->resetStream();
537   while (!input.empty()) {
538     codec_->compressStream(input, output);
539   }
540   // empty input and output is okay for flush NONE and FLUSH.
541   codec_->compressStream(emptyInput, emptyOutput);
542   codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH);
543
544   codec_->resetStream();
545   input = inBuffer->coalesce();
546   output = {outBuffer->writableTail(), outBuffer->tailroom()};
547   while (!input.empty()) {
548     codec_->compressStream(input, output);
549   }
550   // empty input and output is okay for flush END.
551   codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END);
552
553   codec_->resetStream();
554   input = compressed->coalesce();
555   input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete
556   output = {inBuffer->writableData(), inBuffer->length()};
557   // Uncompress some data to avoid empty data special casing
558   while (!input.empty()) {
559     EXPECT_FALSE(codec_->uncompressStream(input, output));
560   }
561   // empty input and output is okay for all flush values.
562   EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput));
563   EXPECT_FALSE(codec_->uncompressStream(
564       emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH));
565   EXPECT_FALSE(codec_->uncompressStream(
566       emptyInput, emptyOutput, StreamCodec::FlushOp::END));
567 }
568
569 TEST_P(StreamingUnitTest, stateTransitions) {
570   auto inBuffer = IOBuf::create(1);
571   inBuffer->writableData()[0] = 'a';
572   inBuffer->append(1);
573   auto compressed = codec_->compress(inBuffer.get());
574   ByteRange const in = compressed->coalesce();
575   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
576   MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
577
578   auto compress = [&](
579       StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
580       bool empty = false) {
581     auto input = in;
582     auto output = empty ? MutableByteRange{} : out;
583     return codec_->compressStream(input, output, flushOp);
584   };
585   auto uncompress = [&](
586       StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
587       bool empty = false) {
588     auto input = in;
589     auto output = empty ? MutableByteRange{} : out;
590     return codec_->uncompressStream(input, output, flushOp);
591   };
592
593   // compression flow
594   codec_->resetStream();
595   EXPECT_FALSE(compress());
596   EXPECT_FALSE(compress());
597   EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
598   EXPECT_FALSE(compress());
599   EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
600   // uncompression flow
601   codec_->resetStream();
602   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
603   codec_->resetStream();
604   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
605   codec_->resetStream();
606   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
607   codec_->resetStream();
608   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
609   codec_->resetStream();
610   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
611   // compress -> uncompress
612   codec_->resetStream();
613   EXPECT_FALSE(compress());
614   EXPECT_THROW(uncompress(), std::logic_error);
615   // uncompress -> compress
616   codec_->resetStream();
617   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
618   EXPECT_THROW(compress(), std::logic_error);
619   // end -> compress
620   codec_->resetStream();
621   EXPECT_FALSE(compress());
622   EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
623   EXPECT_THROW(compress(), std::logic_error);
624   // end -> uncompress
625   codec_->resetStream();
626   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
627   EXPECT_THROW(uncompress(), std::logic_error);
628   // flush -> compress
629   codec_->resetStream();
630   EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
631   EXPECT_THROW(compress(), std::logic_error);
632   // flush -> end
633   codec_->resetStream();
634   EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
635   EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
636   // undefined -> compress
637   codec_->compress(inBuffer.get());
638   EXPECT_THROW(compress(), std::logic_error);
639   codec_->uncompress(compressed.get());
640   EXPECT_THROW(compress(), std::logic_error);
641   // undefined -> undefined
642   codec_->uncompress(compressed.get());
643   codec_->compress(inBuffer.get());
644 }
645
646 INSTANTIATE_TEST_CASE_P(
647     StreamingUnitTest,
648     StreamingUnitTest,
649     testing::ValuesIn(availableStreamCodecs()));
650
651 class StreamingCompressionTest
652     : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
653  protected:
654   void SetUp() override {
655     auto const tup = GetParam();
656     uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
657     chunkSize_ = size_t(1) << std::get<1>(tup);
658     codec_ = getStreamCodec(std::get<2>(tup));
659   }
660
661   void runResetStreamTest(DataHolder const& dh);
662   void runCompressStreamTest(DataHolder const& dh);
663   void runUncompressStreamTest(DataHolder const& dh);
664   void runFlushTest(DataHolder const& dh);
665
666  private:
667   std::vector<ByteRange> split(ByteRange data) const;
668
669   uint64_t uncompressedLength_;
670   size_t chunkSize_;
671   std::unique_ptr<StreamCodec> codec_;
672 };
673
674 std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
675   size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
676   std::vector<ByteRange> result;
677   result.reserve(pieces + 1);
678   while (!data.empty()) {
679     size_t const pieceSize = std::min(data.size(), chunkSize_);
680     result.push_back(data.subpiece(0, pieceSize));
681     data.uncheckedAdvance(pieceSize);
682   }
683   return result;
684 }
685
686 static std::unique_ptr<IOBuf> compressSome(
687     StreamCodec* codec,
688     ByteRange data,
689     uint64_t bufferSize,
690     StreamCodec::FlushOp flush) {
691   bool result;
692   IOBufQueue queue;
693   do {
694     auto buffer = IOBuf::create(bufferSize);
695     buffer->append(buffer->capacity());
696     MutableByteRange output{buffer->writableData(), buffer->length()};
697
698     result = codec->compressStream(data, output, flush);
699     buffer->trimEnd(output.size());
700     queue.append(std::move(buffer));
701
702   } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
703   EXPECT_TRUE(data.empty());
704   return queue.move();
705 }
706
707 static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
708     StreamCodec* codec,
709     ByteRange& data,
710     uint64_t bufferSize,
711     StreamCodec::FlushOp flush) {
712   bool result;
713   IOBufQueue queue;
714   do {
715     auto buffer = IOBuf::create(bufferSize);
716     buffer->append(buffer->capacity());
717     MutableByteRange output{buffer->writableData(), buffer->length()};
718
719     result = codec->uncompressStream(data, output, flush);
720     buffer->trimEnd(output.size());
721     queue.append(std::move(buffer));
722
723   } while (queue.tailroom() == 0 && !result);
724   return std::make_pair(result, queue.move());
725 }
726
727 void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
728   auto const input = dh.data(uncompressedLength_);
729   // Compress some but leave state unclean
730   codec_->resetStream(uncompressedLength_);
731   compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
732   // Reset stream and compress all
733   codec_->resetStream();
734   auto compressed =
735       compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
736   auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
737   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
738 }
739
740 TEST_P(StreamingCompressionTest, resetStream) {
741   runResetStreamTest(constantDataHolder);
742   runResetStreamTest(randomDataHolder);
743 }
744
745 void StreamingCompressionTest::runCompressStreamTest(
746     const folly::io::test::DataHolder& dh) {
747   auto const inputs = split(dh.data(uncompressedLength_));
748
749   IOBufQueue queue;
750   codec_->resetStream(uncompressedLength_);
751   // Compress many inputs in a row
752   for (auto const input : inputs) {
753     queue.append(compressSome(
754         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
755   }
756   // Finish the operation with empty input.
757   ByteRange empty;
758   queue.append(
759       compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
760
761   auto const uncompressed = codec_->uncompress(queue.front());
762   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
763 }
764
765 TEST_P(StreamingCompressionTest, compressStream) {
766   runCompressStreamTest(constantDataHolder);
767   runCompressStreamTest(randomDataHolder);
768 }
769
770 void StreamingCompressionTest::runUncompressStreamTest(
771     const folly::io::test::DataHolder& dh) {
772   auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
773   // Concatenate 3 compressed frames in a row
774   auto compressed = codec_->compress(data.get());
775   compressed->prependChain(codec_->compress(data.get()));
776   compressed->prependChain(codec_->compress(data.get()));
777   // Pass all 3 compressed frames in one input buffer
778   auto input = compressed->coalesce();
779   // Uncompress the first frame
780   codec_->resetStream(data->computeChainDataLength());
781   {
782     auto const result = uncompressSome(
783         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
784     ASSERT_TRUE(result.first);
785     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
786   }
787   // Uncompress the second frame
788   codec_->resetStream();
789   {
790     auto const result = uncompressSome(
791         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
792     ASSERT_TRUE(result.first);
793     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
794   }
795   // Uncompress the third frame
796   codec_->resetStream();
797   {
798     auto const result = uncompressSome(
799         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
800     ASSERT_TRUE(result.first);
801     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
802   }
803   EXPECT_TRUE(input.empty());
804 }
805
806 TEST_P(StreamingCompressionTest, uncompressStream) {
807   runUncompressStreamTest(constantDataHolder);
808   runUncompressStreamTest(randomDataHolder);
809 }
810
811 void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
812   auto const inputs = split(dh.data(uncompressedLength_));
813   auto uncodec = getStreamCodec(codec_->type());
814
815   codec_->resetStream();
816   for (auto input : inputs) {
817     // Compress some data and flush the stream
818     auto compressed = compressSome(
819         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
820     auto compressedRange = compressed->coalesce();
821     // Uncompress the compressed data
822     auto result = uncompressSome(
823         uncodec.get(),
824         compressedRange,
825         chunkSize_,
826         StreamCodec::FlushOp::FLUSH);
827     // All compressed data should have been consumed
828     EXPECT_TRUE(compressedRange.empty());
829     // The frame isn't complete
830     EXPECT_FALSE(result.first);
831     // The uncompressed data should be exactly the input data
832     EXPECT_EQ(input.size(), result.second->computeChainDataLength());
833     auto const data = IOBuf::wrapBuffer(input);
834     EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
835   }
836 }
837
838 TEST_P(StreamingCompressionTest, testFlush) {
839   runFlushTest(constantDataHolder);
840   runFlushTest(randomDataHolder);
841 }
842
843 INSTANTIATE_TEST_CASE_P(
844     StreamingCompressionTest,
845     StreamingCompressionTest,
846     testing::Combine(
847         testing::Values(0, 1, 12, 22, 27),
848         testing::Values(12, 17, 20),
849         testing::ValuesIn(availableStreamCodecs())));
850
851 class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
852  protected:
853   void SetUp() override {
854     codec_ = getCodec(GetParam());
855     auto_ = getAutoUncompressionCodec();
856   }
857
858   void runSimpleTest(const DataHolder& dh);
859
860   std::unique_ptr<Codec> codec_;
861   std::unique_ptr<Codec> auto_;
862 };
863
864 void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) {
865   constexpr uint64_t uncompressedLength = 1000;
866   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
867   auto compressed = codec_->compress(original.get());
868
869   if (!codec_->needsUncompressedLength()) {
870     auto uncompressed = auto_->uncompress(compressed.get());
871     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
872     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
873   }
874   {
875     auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength);
876     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
877     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
878   }
879   ASSERT_GE(compressed->computeChainDataLength(), 8);
880   for (size_t i = 0; i < 8; ++i) {
881     auto split = compressed->clone();
882     auto rest = compressed->clone();
883     split->trimEnd(split->length() - i);
884     rest->trimStart(i);
885     split->appendChain(std::move(rest));
886     auto uncompressed = auto_->uncompress(split.get(), uncompressedLength);
887     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
888     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
889   }
890 }
891
892 TEST_P(AutomaticCodecTest, RandomData) {
893   runSimpleTest(randomDataHolder);
894 }
895
896 TEST_P(AutomaticCodecTest, ConstantData) {
897   runSimpleTest(constantDataHolder);
898 }
899
900 TEST_P(AutomaticCodecTest, ValidPrefixes) {
901   const auto prefixes = codec_->validPrefixes();
902   for (const auto& prefix : prefixes) {
903     EXPECT_FALSE(prefix.empty());
904     // Ensure that all strings are at least 8 bytes for LZMA2.
905     // The bytes after the prefix should be ignored by `canUncompress()`.
906     IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8};
907     data.append(8);
908     EXPECT_TRUE(codec_->canUncompress(&data));
909     EXPECT_TRUE(auto_->canUncompress(&data));
910   }
911 }
912
913 TEST_P(AutomaticCodecTest, NeedsUncompressedLength) {
914   if (codec_->needsUncompressedLength()) {
915     EXPECT_TRUE(auto_->needsUncompressedLength());
916   }
917 }
918
919 TEST_P(AutomaticCodecTest, maxUncompressedLength) {
920   EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength());
921 }
922
923 TEST_P(AutomaticCodecTest, DefaultCodec) {
924   const uint64_t length = 42;
925   std::vector<std::unique_ptr<Codec>> codecs;
926   codecs.push_back(getCodec(CodecType::ZSTD));
927   auto automatic = getAutoUncompressionCodec(std::move(codecs));
928   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
929   auto compressed = codec_->compress(original.get());
930   auto decompressed = automatic->uncompress(compressed.get());
931
932   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
933 }
934
935 namespace {
936 class CustomCodec : public Codec {
937  public:
938   static std::unique_ptr<Codec> create(std::string prefix, CodecType type) {
939     return std::make_unique<CustomCodec>(std::move(prefix), type);
940   }
941   explicit CustomCodec(std::string prefix, CodecType type)
942       : Codec(CodecType::USER_DEFINED),
943         prefix_(std::move(prefix)),
944         codec_(getCodec(type)) {}
945
946  private:
947   std::vector<std::string> validPrefixes() const override {
948     return {prefix_};
949   }
950
951   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
952     return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
953   }
954
955   bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
956     auto clone = data->cloneCoalescedAsValue();
957     if (clone.length() < prefix_.size()) {
958       return false;
959     }
960     return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0;
961   }
962
963   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override {
964     auto result = IOBuf::copyBuffer(prefix_);
965     result->appendChain(codec_->compress(data));
966     EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength()));
967     return result;
968   }
969
970   std::unique_ptr<IOBuf> doUncompress(
971       const IOBuf* data,
972       Optional<uint64_t> uncompressedLength) override {
973     EXPECT_TRUE(canUncompress(data, uncompressedLength));
974     auto clone = data->cloneCoalescedAsValue();
975     clone.trimStart(prefix_.size());
976     return codec_->uncompress(&clone, uncompressedLength);
977   }
978
979   std::string prefix_;
980   std::unique_ptr<Codec> codec_;
981 };
982 }
983
984 TEST_P(AutomaticCodecTest, CustomCodec) {
985   const uint64_t length = 42;
986   auto ab = CustomCodec::create("ab", CodecType::ZSTD);
987   std::vector<std::unique_ptr<Codec>> codecs;
988   codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD));
989   auto automatic = getAutoUncompressionCodec(std::move(codecs));
990   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
991
992   auto abCompressed = ab->compress(original.get());
993   auto abDecompressed = automatic->uncompress(abCompressed.get());
994   EXPECT_TRUE(automatic->canUncompress(abCompressed.get()));
995   EXPECT_FALSE(auto_->canUncompress(abCompressed.get()));
996   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get()));
997
998   auto compressed = codec_->compress(original.get());
999   auto decompressed = automatic->uncompress(compressed.get());
1000   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1001 }
1002
1003 TEST_P(AutomaticCodecTest, CustomDefaultCodec) {
1004   const uint64_t length = 42;
1005   auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION);
1006   std::vector<std::unique_ptr<Codec>> codecs;
1007   codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1008   codecs.push_back(getCodec(CodecType::LZ4_FRAME));
1009   auto automatic = getAutoUncompressionCodec(std::move(codecs));
1010   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
1011
1012   auto noneCompressed = none->compress(original.get());
1013   auto noneDecompressed = automatic->uncompress(noneCompressed.get());
1014   EXPECT_TRUE(automatic->canUncompress(noneCompressed.get()));
1015   EXPECT_FALSE(auto_->canUncompress(noneCompressed.get()));
1016   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get()));
1017
1018   auto compressed = codec_->compress(original.get());
1019   auto decompressed = automatic->uncompress(compressed.get());
1020   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1021 }
1022
1023 TEST_P(AutomaticCodecTest, canUncompressOneBytes) {
1024   // No default codec can uncompress 1 bytes.
1025   IOBuf buf{IOBuf::CREATE, 1};
1026   buf.append(1);
1027   EXPECT_FALSE(codec_->canUncompress(&buf, 1));
1028   EXPECT_FALSE(codec_->canUncompress(&buf, folly::none));
1029   EXPECT_FALSE(auto_->canUncompress(&buf, 1));
1030   EXPECT_FALSE(auto_->canUncompress(&buf, folly::none));
1031 }
1032
1033 INSTANTIATE_TEST_CASE_P(
1034     AutomaticCodecTest,
1035     AutomaticCodecTest,
1036     testing::Values(
1037         CodecType::LZ4_FRAME,
1038         CodecType::ZSTD,
1039         CodecType::ZLIB,
1040         CodecType::GZIP,
1041         CodecType::LZMA2,
1042         CodecType::BZIP2));
1043
1044 TEST(ValidPrefixesTest, CustomCodec) {
1045   std::vector<std::unique_ptr<Codec>> codecs;
1046   codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1047   const auto none = getAutoUncompressionCodec(std::move(codecs));
1048   const auto prefixes = none->validPrefixes();
1049   const auto it = std::find(prefixes.begin(), prefixes.end(), "none");
1050   EXPECT_TRUE(it != prefixes.end());
1051 }
1052
1053 #define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \
1054   do {                                                       \
1055     if (kIsDebug) {                                          \
1056       EXPECT_THROW((statement), expected_exception);         \
1057     } else {                                                 \
1058       EXPECT_NO_THROW((statement));                          \
1059     }                                                        \
1060   } while (false)
1061
1062 TEST(CheckCompatibleTest, SimplePrefixSecond) {
1063   std::vector<std::unique_ptr<Codec>> codecs;
1064   codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1065   codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1066   EXPECT_THROW_IF_DEBUG(
1067       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1068 }
1069
1070 TEST(CheckCompatibleTest, SimplePrefixFirst) {
1071   std::vector<std::unique_ptr<Codec>> codecs;
1072   codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1073   codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1074   EXPECT_THROW_IF_DEBUG(
1075       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1076 }
1077
1078 TEST(CheckCompatibleTest, Empty) {
1079   std::vector<std::unique_ptr<Codec>> codecs;
1080   codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION));
1081   EXPECT_THROW_IF_DEBUG(
1082       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1083 }
1084
1085 TEST(CheckCompatibleTest, ZstdPrefix) {
1086   std::vector<std::unique_ptr<Codec>> codecs;
1087   codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD));
1088   EXPECT_THROW_IF_DEBUG(
1089       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1090 }
1091
1092 TEST(CheckCompatibleTest, ZstdDuplicate) {
1093   std::vector<std::unique_ptr<Codec>> codecs;
1094   codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD));
1095   EXPECT_THROW_IF_DEBUG(
1096       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1097 }
1098
1099 TEST(CheckCompatibleTest, ZlibIsPrefix) {
1100   std::vector<std::unique_ptr<Codec>> codecs;
1101   codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD));
1102   EXPECT_THROW_IF_DEBUG(
1103       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1104 }
1105
1106 #if FOLLY_HAVE_LIBZSTD
1107
1108 TEST(ZstdTest, BackwardCompatible) {
1109   auto codec = getCodec(CodecType::ZSTD);
1110   {
1111     auto const data = IOBuf::wrapBuffer(randomDataHolder.data(size_t(1) << 20));
1112     auto compressed = codec->compress(data.get());
1113     compressed->coalesce();
1114     EXPECT_EQ(
1115         data->length(),
1116         ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1117   }
1118   {
1119     auto const data =
1120         IOBuf::wrapBuffer(randomDataHolder.data(size_t(100) << 20));
1121     auto compressed = codec->compress(data.get());
1122     compressed->coalesce();
1123     EXPECT_EQ(
1124         data->length(),
1125         ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1126   }
1127 }
1128
1129 #endif
1130 }}}  // namespaces
1131
1132 int main(int argc, char *argv[]) {
1133   testing::InitGoogleTest(&argc, argv);
1134   gflags::ParseCommandLineFlags(&argc, &argv, true);
1135
1136   auto ret = RUN_ALL_TESTS();
1137   if (!ret) {
1138     folly::runBenchmarksOnFlag();
1139   }
1140   return ret;
1141 }