Add zstd streaming interface
[folly.git] / folly / io / test / CompressionTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/Compression.h>
18
19 #include <algorithm>
20 #include <random>
21 #include <set>
22 #include <thread>
23 #include <unordered_map>
24 #include <utility>
25
26 #include <boost/noncopyable.hpp>
27 #include <glog/logging.h>
28
29 #include <folly/Benchmark.h>
30 #include <folly/Hash.h>
31 #include <folly/Memory.h>
32 #include <folly/Random.h>
33 #include <folly/Varint.h>
34 #include <folly/io/IOBufQueue.h>
35 #include <folly/portability/GTest.h>
36
37 #if FOLLY_HAVE_LIBZSTD
38 #include <zstd.h>
39 #endif
40
41 namespace folly { namespace io { namespace test {
42
43 class DataHolder : private boost::noncopyable {
44  public:
45   uint64_t hash(size_t size) const;
46   ByteRange data(size_t size) const;
47
48  protected:
49   explicit DataHolder(size_t sizeLog2);
50   const size_t size_;
51   std::unique_ptr<uint8_t[]> data_;
52   mutable std::unordered_map<uint64_t, uint64_t> hashCache_;
53 };
54
55 DataHolder::DataHolder(size_t sizeLog2)
56   : size_(size_t(1) << sizeLog2),
57     data_(new uint8_t[size_]) {
58 }
59
60 uint64_t DataHolder::hash(size_t size) const {
61   CHECK_LE(size, size_);
62   auto p = hashCache_.find(size);
63   if (p != hashCache_.end()) {
64     return p->second;
65   }
66
67   uint64_t h = folly::hash::fnv64_buf(data_.get(), size);
68   hashCache_[size] = h;
69   return h;
70 }
71
72 ByteRange DataHolder::data(size_t size) const {
73   CHECK_LE(size, size_);
74   return ByteRange(data_.get(), size);
75 }
76
77 uint64_t hashIOBuf(const IOBuf* buf) {
78   uint64_t h = folly::hash::FNV_64_HASH_START;
79   for (auto& range : *buf) {
80     h = folly::hash::fnv64_buf(range.data(), range.size(), h);
81   }
82   return h;
83 }
84
85 class RandomDataHolder : public DataHolder {
86  public:
87   explicit RandomDataHolder(size_t sizeLog2);
88 };
89
90 RandomDataHolder::RandomDataHolder(size_t sizeLog2)
91   : DataHolder(sizeLog2) {
92   static constexpr size_t numThreadsLog2 = 3;
93   static constexpr size_t numThreads = size_t(1) << numThreadsLog2;
94
95   uint32_t seed = randomNumberSeed();
96
97   std::vector<std::thread> threads;
98   threads.reserve(numThreads);
99   for (size_t t = 0; t < numThreads; ++t) {
100     threads.emplace_back([this, seed, t, sizeLog2] {
101       std::mt19937 rng(seed + t);
102       size_t countLog2 = sizeLog2 - numThreadsLog2;
103       size_t start = size_t(t) << countLog2;
104       for (size_t i = 0; i < countLog2; ++i) {
105         this->data_[start + i] = rng();
106       }
107     });
108   }
109
110   for (auto& t : threads) {
111     t.join();
112   }
113 }
114
115 class ConstantDataHolder : public DataHolder {
116  public:
117   explicit ConstantDataHolder(size_t sizeLog2);
118 };
119
120 ConstantDataHolder::ConstantDataHolder(size_t sizeLog2)
121   : DataHolder(sizeLog2) {
122   memset(data_.get(), 'a', size_);
123 }
124
125 constexpr size_t dataSizeLog2 = 27;  // 128MiB
126 RandomDataHolder randomDataHolder(dataSizeLog2);
127 ConstantDataHolder constantDataHolder(dataSizeLog2);
128
129 // The intersection of the provided codecs & those that are compiled in.
130 static std::vector<CodecType> supportedCodecs(std::vector<CodecType> const& v) {
131   std::vector<CodecType> supported;
132
133   std::copy_if(
134       std::begin(v),
135       std::end(v),
136       std::back_inserter(supported),
137       hasCodec);
138
139   return supported;
140 }
141
142 // All compiled-in compression codecs.
143 static std::vector<CodecType> availableCodecs() {
144   std::vector<CodecType> codecs;
145
146   for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
147     auto type = static_cast<CodecType>(i);
148     if (hasCodec(type)) {
149       codecs.push_back(type);
150     }
151   }
152
153   return codecs;
154 }
155
156 static std::vector<CodecType> availableStreamCodecs() {
157   std::vector<CodecType> codecs;
158
159   for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
160     auto type = static_cast<CodecType>(i);
161     if (hasStreamCodec(type)) {
162       codecs.push_back(type);
163     }
164   }
165
166   return codecs;
167 }
168
169 TEST(CompressionTestNeedsUncompressedLength, Simple) {
170   static const struct { CodecType type; bool needsUncompressedLength; }
171     expectations[] = {
172       { CodecType::NO_COMPRESSION, false },
173       { CodecType::LZ4, true },
174       { CodecType::SNAPPY, false },
175       { CodecType::ZLIB, false },
176       { CodecType::LZ4_VARINT_SIZE, false },
177       { CodecType::LZMA2, false },
178       { CodecType::LZMA2_VARINT_SIZE, false },
179       { CodecType::ZSTD, false },
180       { CodecType::GZIP, false },
181       { CodecType::LZ4_FRAME, false },
182       { CodecType::BZIP2, false },
183     };
184
185   for (auto const& test : expectations) {
186     if (hasCodec(test.type)) {
187       EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(),
188                 test.needsUncompressedLength);
189     }
190   }
191 }
192
193 class CompressionTest
194     : public testing::TestWithParam<std::tr1::tuple<int, int, CodecType>> {
195  protected:
196   void SetUp() override {
197     auto tup = GetParam();
198     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
199     chunks_ = std::tr1::get<1>(tup);
200     codec_ = getCodec(std::tr1::get<2>(tup));
201   }
202
203   void runSimpleIOBufTest(const DataHolder& dh);
204
205   void runSimpleStringTest(const DataHolder& dh);
206
207  private:
208   std::unique_ptr<IOBuf> split(std::unique_ptr<IOBuf> data) const;
209
210   uint64_t uncompressedLength_;
211   size_t chunks_;
212   std::unique_ptr<Codec> codec_;
213 };
214
215 void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) {
216   const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_)));
217   const auto compressed = split(codec_->compress(original.get()));
218   if (!codec_->needsUncompressedLength()) {
219     auto uncompressed = codec_->uncompress(compressed.get());
220     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
221     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
222   }
223   {
224     auto uncompressed = codec_->uncompress(compressed.get(),
225                                            uncompressedLength_);
226     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
227     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
228   }
229 }
230
231 void CompressionTest::runSimpleStringTest(const DataHolder& dh) {
232   const auto original = std::string(
233       reinterpret_cast<const char*>(dh.data(uncompressedLength_).data()),
234       uncompressedLength_);
235   const auto compressed = codec_->compress(original);
236   if (!codec_->needsUncompressedLength()) {
237     auto uncompressed = codec_->uncompress(compressed);
238     EXPECT_EQ(uncompressedLength_, uncompressed.length());
239     EXPECT_EQ(uncompressed, original);
240   }
241   {
242     auto uncompressed = codec_->uncompress(compressed, uncompressedLength_);
243     EXPECT_EQ(uncompressedLength_, uncompressed.length());
244     EXPECT_EQ(uncompressed, original);
245   }
246 }
247
248 // Uniformly split data into (potentially empty) chunks.
249 std::unique_ptr<IOBuf> CompressionTest::split(
250     std::unique_ptr<IOBuf> data) const {
251   if (data->isChained()) {
252     data->coalesce();
253   }
254
255   const size_t size = data->computeChainDataLength();
256
257   std::multiset<size_t> splits;
258   for (size_t i = 1; i < chunks_; ++i) {
259     splits.insert(Random::rand64(size));
260   }
261
262   folly::IOBufQueue result;
263
264   size_t offset = 0;
265   for (size_t split : splits) {
266     result.append(IOBuf::copyBuffer(data->data() + offset, split - offset));
267     offset = split;
268   }
269   result.append(IOBuf::copyBuffer(data->data() + offset, size - offset));
270
271   return result.move();
272 }
273
274 TEST_P(CompressionTest, RandomData) {
275   runSimpleIOBufTest(randomDataHolder);
276 }
277
278 TEST_P(CompressionTest, ConstantData) {
279   runSimpleIOBufTest(constantDataHolder);
280 }
281
282 TEST_P(CompressionTest, RandomDataString) {
283   runSimpleStringTest(randomDataHolder);
284 }
285
286 TEST_P(CompressionTest, ConstantDataString) {
287   runSimpleStringTest(constantDataHolder);
288 }
289
290 INSTANTIATE_TEST_CASE_P(
291     CompressionTest,
292     CompressionTest,
293     testing::Combine(
294         testing::Values(0, 1, 12, 22, 25, 27),
295         testing::Values(1, 2, 3, 8, 65),
296         testing::ValuesIn(availableCodecs())));
297
298 class CompressionVarintTest
299     : public testing::TestWithParam<std::tr1::tuple<int, CodecType>> {
300  protected:
301   void SetUp() override {
302     auto tup = GetParam();
303     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
304     codec_ = getCodec(std::tr1::get<1>(tup));
305   }
306
307   void runSimpleTest(const DataHolder& dh);
308
309   uint64_t uncompressedLength_;
310   std::unique_ptr<Codec> codec_;
311 };
312
313 inline uint64_t oneBasedMsbPos(uint64_t number) {
314   uint64_t pos = 0;
315   for (; number > 0; ++pos, number >>= 1) {
316   }
317   return pos;
318 }
319
320 void CompressionVarintTest::runSimpleTest(const DataHolder& dh) {
321   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
322   auto compressed = codec_->compress(original.get());
323   auto breakPoint =
324       1UL +
325       Random::rand64(
326           std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL);
327   auto tinyBuf = IOBuf::copyBuffer(compressed->data(),
328                                    std::min(compressed->length(), breakPoint));
329   compressed->trimStart(breakPoint);
330   tinyBuf->prependChain(std::move(compressed));
331   compressed = std::move(tinyBuf);
332
333   auto uncompressed = codec_->uncompress(compressed.get());
334
335   EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
336   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
337 }
338
339 TEST_P(CompressionVarintTest, RandomData) {
340   runSimpleTest(randomDataHolder);
341 }
342
343 TEST_P(CompressionVarintTest, ConstantData) {
344   runSimpleTest(constantDataHolder);
345 }
346
347 INSTANTIATE_TEST_CASE_P(
348     CompressionVarintTest,
349     CompressionVarintTest,
350     testing::Combine(
351         testing::Values(0, 1, 12, 22, 25, 27),
352         testing::ValuesIn(supportedCodecs({
353             CodecType::LZ4_VARINT_SIZE,
354             CodecType::LZMA2_VARINT_SIZE,
355             }))));
356
357 class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
358  protected:
359   void SetUp() override { codec_ = getCodec(GetParam()); }
360
361   void runSimpleTest(const DataHolder& dh);
362
363   std::unique_ptr<Codec> codec_;
364 };
365
366 void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) {
367   constexpr uint64_t uncompressedLength = 42;
368   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
369   auto compressed = codec_->compress(original.get());
370
371   if (!codec_->needsUncompressedLength()) {
372     auto uncompressed = codec_->uncompress(compressed.get());
373     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
374     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
375   }
376   {
377     auto uncompressed = codec_->uncompress(compressed.get(),
378                                            uncompressedLength);
379     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
380     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
381   }
382
383   EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1),
384                std::runtime_error);
385
386   // Corrupt the first character
387   ++(compressed->writableData()[0]);
388
389   if (!codec_->needsUncompressedLength()) {
390     EXPECT_THROW(codec_->uncompress(compressed.get()),
391                  std::runtime_error);
392   }
393
394   EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength),
395                std::runtime_error);
396 }
397
398 TEST_P(CompressionCorruptionTest, RandomData) {
399   runSimpleTest(randomDataHolder);
400 }
401
402 TEST_P(CompressionCorruptionTest, ConstantData) {
403   runSimpleTest(constantDataHolder);
404 }
405
406 INSTANTIATE_TEST_CASE_P(
407     CompressionCorruptionTest,
408     CompressionCorruptionTest,
409     testing::ValuesIn(
410         // NO_COMPRESSION can't detect corruption
411         // LZ4 can't detect corruption reliably (sigh)
412         supportedCodecs({
413             CodecType::SNAPPY,
414             CodecType::ZLIB,
415             CodecType::LZMA2,
416             CodecType::ZSTD,
417             CodecType::LZ4_FRAME,
418             CodecType::BZIP2,
419         })));
420
421 class StreamingUnitTest : public testing::TestWithParam<CodecType> {
422  protected:
423   void SetUp() override {
424     codec_ = getStreamCodec(GetParam());
425   }
426
427   std::unique_ptr<StreamCodec> codec_;
428 };
429
430 TEST_P(StreamingUnitTest, maxCompressedLength) {
431   EXPECT_EQ(0, codec_->maxCompressedLength(0));
432   for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
433     EXPECT_GE(codec_->maxCompressedLength(length), length);
434   }
435 }
436
437 TEST_P(StreamingUnitTest, getUncompressedLength) {
438   auto const empty = IOBuf::create(0);
439   EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
440   EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
441
442   auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
443   auto const compressed = codec_->compress(data.get());
444
445   EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0));
446   if (auto const length = codec_->getUncompressedLength(data.get())) {
447     EXPECT_EQ(100, *length);
448   }
449   EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
450   // If the uncompressed length is stored in the frame, then make sure it throws
451   // when it is given the wrong length.
452   if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
453     EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
454   }
455 }
456
457 TEST_P(StreamingUnitTest, emptyData) {
458   ByteRange input{};
459   auto buffer = IOBuf::create(1);
460   buffer->append(buffer->capacity());
461   MutableByteRange output{};
462
463   // Test compressing empty data in one pass
464   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
465   codec_->resetStream(0);
466   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
467   codec_->resetStream();
468   output = {buffer->writableData(), buffer->length()};
469   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
470   EXPECT_EQ(buffer->length(), output.size());
471
472   // Test compressing empty data with multiple calls to compressStream()
473   codec_->resetStream();
474   output = {};
475   EXPECT_FALSE(codec_->compressStream(input, output));
476   EXPECT_TRUE(
477       codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
478   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
479   codec_->resetStream();
480   output = {buffer->writableData(), buffer->length()};
481   EXPECT_FALSE(codec_->compressStream(input, output));
482   EXPECT_TRUE(
483       codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
484   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
485   EXPECT_EQ(buffer->length(), output.size());
486
487   // Test uncompressing empty data
488   output = {};
489   codec_->resetStream();
490   EXPECT_TRUE(codec_->uncompressStream(input, output));
491   codec_->resetStream();
492   EXPECT_TRUE(
493       codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
494   codec_->resetStream();
495   EXPECT_TRUE(
496       codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
497   codec_->resetStream(0);
498   EXPECT_TRUE(codec_->uncompressStream(input, output));
499   codec_->resetStream(0);
500   EXPECT_TRUE(
501       codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
502   codec_->resetStream(0);
503   EXPECT_TRUE(
504       codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
505 }
506
507 TEST_P(StreamingUnitTest, noForwardProgressOkay) {
508   auto inBuffer = IOBuf::create(2);
509   inBuffer->writableData()[0] = 'a';
510   inBuffer->writableData()[0] = 'a';
511   inBuffer->append(2);
512   auto input = inBuffer->coalesce();
513   auto compressed = codec_->compress(inBuffer.get());
514
515   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
516   MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()};
517
518   ByteRange emptyInput;
519   MutableByteRange emptyOutput;
520
521   // Compress some data to avoid empty data special casing
522   codec_->resetStream();
523   while (!input.empty()) {
524     codec_->compressStream(input, output);
525   }
526   // empty input and output is okay for flush NONE and FLUSH.
527   codec_->compressStream(emptyInput, emptyOutput);
528   codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH);
529
530   codec_->resetStream();
531   input = inBuffer->coalesce();
532   output = {outBuffer->writableTail(), outBuffer->tailroom()};
533   while (!input.empty()) {
534     codec_->compressStream(input, output);
535   }
536   // empty input and output is okay for flush END.
537   codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END);
538
539   codec_->resetStream();
540   input = compressed->coalesce();
541   input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete
542   output = {inBuffer->writableData(), inBuffer->length()};
543   // Uncompress some data to avoid empty data special casing
544   while (!input.empty()) {
545     EXPECT_FALSE(codec_->uncompressStream(input, output));
546   }
547   // empty input and output is okay for all flush values.
548   EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput));
549   EXPECT_FALSE(codec_->uncompressStream(
550       emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH));
551   EXPECT_FALSE(codec_->uncompressStream(
552       emptyInput, emptyOutput, StreamCodec::FlushOp::END));
553 }
554
555 TEST_P(StreamingUnitTest, stateTransitions) {
556   auto inBuffer = IOBuf::create(1);
557   inBuffer->writableData()[0] = 'a';
558   inBuffer->append(1);
559   auto compressed = codec_->compress(inBuffer.get());
560   ByteRange const in = compressed->coalesce();
561   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
562   MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
563
564   auto compress = [&](
565       StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
566       bool empty = false) {
567     auto input = in;
568     auto output = empty ? MutableByteRange{} : out;
569     return codec_->compressStream(input, output, flushOp);
570   };
571   auto uncompress = [&](
572       StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
573       bool empty = false) {
574     auto input = in;
575     auto output = empty ? MutableByteRange{} : out;
576     return codec_->uncompressStream(input, output, flushOp);
577   };
578
579   // compression flow
580   codec_->resetStream();
581   EXPECT_FALSE(compress());
582   EXPECT_FALSE(compress());
583   EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
584   EXPECT_FALSE(compress());
585   EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
586   // uncompression flow
587   codec_->resetStream();
588   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
589   codec_->resetStream();
590   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
591   codec_->resetStream();
592   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
593   codec_->resetStream();
594   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
595   codec_->resetStream();
596   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
597   // compress -> uncompress
598   codec_->resetStream();
599   EXPECT_FALSE(compress());
600   EXPECT_THROW(uncompress(), std::logic_error);
601   // uncompress -> compress
602   codec_->resetStream();
603   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
604   EXPECT_THROW(compress(), std::logic_error);
605   // end -> compress
606   codec_->resetStream();
607   EXPECT_FALSE(compress());
608   EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
609   EXPECT_THROW(compress(), std::logic_error);
610   // end -> uncompress
611   codec_->resetStream();
612   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
613   EXPECT_THROW(uncompress(), std::logic_error);
614   // flush -> compress
615   codec_->resetStream();
616   EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
617   EXPECT_THROW(compress(), std::logic_error);
618   // flush -> end
619   codec_->resetStream();
620   EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
621   EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
622   // undefined -> compress
623   codec_->compress(inBuffer.get());
624   EXPECT_THROW(compress(), std::logic_error);
625   codec_->uncompress(compressed.get());
626   EXPECT_THROW(compress(), std::logic_error);
627   // undefined -> undefined
628   codec_->uncompress(compressed.get());
629   codec_->compress(inBuffer.get());
630 }
631
632 INSTANTIATE_TEST_CASE_P(
633     StreamingUnitTest,
634     StreamingUnitTest,
635     testing::ValuesIn(availableStreamCodecs()));
636
637 class StreamingCompressionTest
638     : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
639  protected:
640   void SetUp() override {
641     auto const tup = GetParam();
642     uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
643     chunkSize_ = size_t(1) << std::get<1>(tup);
644     codec_ = getStreamCodec(std::get<2>(tup));
645   }
646
647   void runResetStreamTest(DataHolder const& dh);
648   void runCompressStreamTest(DataHolder const& dh);
649   void runUncompressStreamTest(DataHolder const& dh);
650   void runFlushTest(DataHolder const& dh);
651
652  private:
653   std::vector<ByteRange> split(ByteRange data) const;
654
655   uint64_t uncompressedLength_;
656   size_t chunkSize_;
657   std::unique_ptr<StreamCodec> codec_;
658 };
659
660 std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
661   size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
662   std::vector<ByteRange> result;
663   result.reserve(pieces + 1);
664   while (!data.empty()) {
665     size_t const pieceSize = std::min(data.size(), chunkSize_);
666     result.push_back(data.subpiece(0, pieceSize));
667     data.uncheckedAdvance(pieceSize);
668   }
669   return result;
670 }
671
672 static std::unique_ptr<IOBuf> compressSome(
673     StreamCodec* codec,
674     ByteRange data,
675     uint64_t bufferSize,
676     StreamCodec::FlushOp flush) {
677   bool result;
678   IOBufQueue queue;
679   do {
680     auto buffer = IOBuf::create(bufferSize);
681     buffer->append(buffer->capacity());
682     MutableByteRange output{buffer->writableData(), buffer->length()};
683
684     result = codec->compressStream(data, output, flush);
685     buffer->trimEnd(output.size());
686     queue.append(std::move(buffer));
687
688   } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
689   EXPECT_TRUE(data.empty());
690   return queue.move();
691 }
692
693 static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
694     StreamCodec* codec,
695     ByteRange& data,
696     uint64_t bufferSize,
697     StreamCodec::FlushOp flush) {
698   bool result;
699   IOBufQueue queue;
700   do {
701     auto buffer = IOBuf::create(bufferSize);
702     buffer->append(buffer->capacity());
703     MutableByteRange output{buffer->writableData(), buffer->length()};
704
705     result = codec->uncompressStream(data, output, flush);
706     buffer->trimEnd(output.size());
707     queue.append(std::move(buffer));
708
709   } while (queue.tailroom() == 0 && !result);
710   return std::make_pair(result, queue.move());
711 }
712
713 void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
714   auto const input = dh.data(uncompressedLength_);
715   // Compress some but leave state unclean
716   codec_->resetStream(uncompressedLength_);
717   compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
718   // Reset stream and compress all
719   codec_->resetStream();
720   auto compressed =
721       compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
722   auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
723   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
724 }
725
726 TEST_P(StreamingCompressionTest, resetStream) {
727   runResetStreamTest(constantDataHolder);
728   runResetStreamTest(randomDataHolder);
729 }
730
731 void StreamingCompressionTest::runCompressStreamTest(
732     const folly::io::test::DataHolder& dh) {
733   auto const inputs = split(dh.data(uncompressedLength_));
734
735   IOBufQueue queue;
736   codec_->resetStream(uncompressedLength_);
737   // Compress many inputs in a row
738   for (auto const input : inputs) {
739     queue.append(compressSome(
740         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
741   }
742   // Finish the operation with empty input.
743   ByteRange empty;
744   queue.append(
745       compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
746
747   auto const uncompressed = codec_->uncompress(queue.front());
748   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
749 }
750
751 TEST_P(StreamingCompressionTest, compressStream) {
752   runCompressStreamTest(constantDataHolder);
753   runCompressStreamTest(randomDataHolder);
754 }
755
756 void StreamingCompressionTest::runUncompressStreamTest(
757     const folly::io::test::DataHolder& dh) {
758   auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
759   // Concatenate 3 compressed frames in a row
760   auto compressed = codec_->compress(data.get());
761   compressed->prependChain(codec_->compress(data.get()));
762   compressed->prependChain(codec_->compress(data.get()));
763   // Pass all 3 compressed frames in one input buffer
764   auto input = compressed->coalesce();
765   // Uncompress the first frame
766   codec_->resetStream(data->computeChainDataLength());
767   {
768     auto const result = uncompressSome(
769         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
770     ASSERT_TRUE(result.first);
771     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
772   }
773   // Uncompress the second frame
774   codec_->resetStream();
775   {
776     auto const result = uncompressSome(
777         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
778     ASSERT_TRUE(result.first);
779     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
780   }
781   // Uncompress the third frame
782   codec_->resetStream();
783   {
784     auto const result = uncompressSome(
785         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
786     ASSERT_TRUE(result.first);
787     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
788   }
789   EXPECT_TRUE(input.empty());
790 }
791
792 TEST_P(StreamingCompressionTest, uncompressStream) {
793   runUncompressStreamTest(constantDataHolder);
794   runUncompressStreamTest(randomDataHolder);
795 }
796
797 void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
798   auto const inputs = split(dh.data(uncompressedLength_));
799   auto uncodec = getStreamCodec(codec_->type());
800
801   codec_->resetStream();
802   for (auto input : inputs) {
803     // Compress some data and flush the stream
804     auto compressed = compressSome(
805         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
806     auto compressedRange = compressed->coalesce();
807     // Uncompress the compressed data
808     auto result = uncompressSome(
809         uncodec.get(),
810         compressedRange,
811         chunkSize_,
812         StreamCodec::FlushOp::FLUSH);
813     // All compressed data should have been consumed
814     EXPECT_TRUE(compressedRange.empty());
815     // The frame isn't complete
816     EXPECT_FALSE(result.first);
817     // The uncompressed data should be exactly the input data
818     EXPECT_EQ(input.size(), result.second->computeChainDataLength());
819     auto const data = IOBuf::wrapBuffer(input);
820     EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
821   }
822 }
823
824 TEST_P(StreamingCompressionTest, testFlush) {
825   runFlushTest(constantDataHolder);
826   runFlushTest(randomDataHolder);
827 }
828
829 INSTANTIATE_TEST_CASE_P(
830     StreamingCompressionTest,
831     StreamingCompressionTest,
832     testing::Combine(
833         testing::Values(0, 1, 12, 22, 27),
834         testing::Values(12, 17, 20),
835         testing::ValuesIn(availableStreamCodecs())));
836
837 class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
838  protected:
839   void SetUp() override {
840     codec_ = getCodec(GetParam());
841     auto_ = getAutoUncompressionCodec();
842   }
843
844   void runSimpleTest(const DataHolder& dh);
845
846   std::unique_ptr<Codec> codec_;
847   std::unique_ptr<Codec> auto_;
848 };
849
850 void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) {
851   constexpr uint64_t uncompressedLength = 1000;
852   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
853   auto compressed = codec_->compress(original.get());
854
855   if (!codec_->needsUncompressedLength()) {
856     auto uncompressed = auto_->uncompress(compressed.get());
857     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
858     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
859   }
860   {
861     auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength);
862     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
863     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
864   }
865   ASSERT_GE(compressed->computeChainDataLength(), 8);
866   for (size_t i = 0; i < 8; ++i) {
867     auto split = compressed->clone();
868     auto rest = compressed->clone();
869     split->trimEnd(split->length() - i);
870     rest->trimStart(i);
871     split->appendChain(std::move(rest));
872     auto uncompressed = auto_->uncompress(split.get(), uncompressedLength);
873     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
874     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
875   }
876 }
877
878 TEST_P(AutomaticCodecTest, RandomData) {
879   runSimpleTest(randomDataHolder);
880 }
881
882 TEST_P(AutomaticCodecTest, ConstantData) {
883   runSimpleTest(constantDataHolder);
884 }
885
886 TEST_P(AutomaticCodecTest, ValidPrefixes) {
887   const auto prefixes = codec_->validPrefixes();
888   for (const auto& prefix : prefixes) {
889     EXPECT_FALSE(prefix.empty());
890     // Ensure that all strings are at least 8 bytes for LZMA2.
891     // The bytes after the prefix should be ignored by `canUncompress()`.
892     IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8};
893     data.append(8);
894     EXPECT_TRUE(codec_->canUncompress(&data));
895     EXPECT_TRUE(auto_->canUncompress(&data));
896   }
897 }
898
899 TEST_P(AutomaticCodecTest, NeedsUncompressedLength) {
900   if (codec_->needsUncompressedLength()) {
901     EXPECT_TRUE(auto_->needsUncompressedLength());
902   }
903 }
904
905 TEST_P(AutomaticCodecTest, maxUncompressedLength) {
906   EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength());
907 }
908
909 TEST_P(AutomaticCodecTest, DefaultCodec) {
910   const uint64_t length = 42;
911   std::vector<std::unique_ptr<Codec>> codecs;
912   codecs.push_back(getCodec(CodecType::ZSTD));
913   auto automatic = getAutoUncompressionCodec(std::move(codecs));
914   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
915   auto compressed = codec_->compress(original.get());
916   auto decompressed = automatic->uncompress(compressed.get());
917
918   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
919 }
920
921 namespace {
922 class CustomCodec : public Codec {
923  public:
924   static std::unique_ptr<Codec> create(std::string prefix, CodecType type) {
925     return std::make_unique<CustomCodec>(std::move(prefix), type);
926   }
927   explicit CustomCodec(std::string prefix, CodecType type)
928       : Codec(CodecType::USER_DEFINED),
929         prefix_(std::move(prefix)),
930         codec_(getCodec(type)) {}
931
932  private:
933   std::vector<std::string> validPrefixes() const override {
934     return {prefix_};
935   }
936
937   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
938     return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
939   }
940
941   bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
942     auto clone = data->cloneCoalescedAsValue();
943     if (clone.length() < prefix_.size()) {
944       return false;
945     }
946     return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0;
947   }
948
949   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override {
950     auto result = IOBuf::copyBuffer(prefix_);
951     result->appendChain(codec_->compress(data));
952     EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength()));
953     return result;
954   }
955
956   std::unique_ptr<IOBuf> doUncompress(
957       const IOBuf* data,
958       Optional<uint64_t> uncompressedLength) override {
959     EXPECT_TRUE(canUncompress(data, uncompressedLength));
960     auto clone = data->cloneCoalescedAsValue();
961     clone.trimStart(prefix_.size());
962     return codec_->uncompress(&clone, uncompressedLength);
963   }
964
965   std::string prefix_;
966   std::unique_ptr<Codec> codec_;
967 };
968 }
969
970 TEST_P(AutomaticCodecTest, CustomCodec) {
971   const uint64_t length = 42;
972   auto ab = CustomCodec::create("ab", CodecType::ZSTD);
973   std::vector<std::unique_ptr<Codec>> codecs;
974   codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD));
975   auto automatic = getAutoUncompressionCodec(std::move(codecs));
976   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
977
978   auto abCompressed = ab->compress(original.get());
979   auto abDecompressed = automatic->uncompress(abCompressed.get());
980   EXPECT_TRUE(automatic->canUncompress(abCompressed.get()));
981   EXPECT_FALSE(auto_->canUncompress(abCompressed.get()));
982   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get()));
983
984   auto compressed = codec_->compress(original.get());
985   auto decompressed = automatic->uncompress(compressed.get());
986   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
987 }
988
989 TEST_P(AutomaticCodecTest, CustomDefaultCodec) {
990   const uint64_t length = 42;
991   auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION);
992   std::vector<std::unique_ptr<Codec>> codecs;
993   codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
994   codecs.push_back(getCodec(CodecType::LZ4_FRAME));
995   auto automatic = getAutoUncompressionCodec(std::move(codecs));
996   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
997
998   auto noneCompressed = none->compress(original.get());
999   auto noneDecompressed = automatic->uncompress(noneCompressed.get());
1000   EXPECT_TRUE(automatic->canUncompress(noneCompressed.get()));
1001   EXPECT_FALSE(auto_->canUncompress(noneCompressed.get()));
1002   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get()));
1003
1004   auto compressed = codec_->compress(original.get());
1005   auto decompressed = automatic->uncompress(compressed.get());
1006   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1007 }
1008
1009 TEST_P(AutomaticCodecTest, canUncompressOneBytes) {
1010   // No default codec can uncompress 1 bytes.
1011   IOBuf buf{IOBuf::CREATE, 1};
1012   buf.append(1);
1013   EXPECT_FALSE(codec_->canUncompress(&buf, 1));
1014   EXPECT_FALSE(codec_->canUncompress(&buf, folly::none));
1015   EXPECT_FALSE(auto_->canUncompress(&buf, 1));
1016   EXPECT_FALSE(auto_->canUncompress(&buf, folly::none));
1017 }
1018
1019 INSTANTIATE_TEST_CASE_P(
1020     AutomaticCodecTest,
1021     AutomaticCodecTest,
1022     testing::Values(
1023         CodecType::LZ4_FRAME,
1024         CodecType::ZSTD,
1025         CodecType::ZLIB,
1026         CodecType::GZIP,
1027         CodecType::LZMA2,
1028         CodecType::BZIP2));
1029
1030 TEST(ValidPrefixesTest, CustomCodec) {
1031   std::vector<std::unique_ptr<Codec>> codecs;
1032   codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1033   const auto none = getAutoUncompressionCodec(std::move(codecs));
1034   const auto prefixes = none->validPrefixes();
1035   const auto it = std::find(prefixes.begin(), prefixes.end(), "none");
1036   EXPECT_TRUE(it != prefixes.end());
1037 }
1038
1039 #define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \
1040   do {                                                       \
1041     if (kIsDebug) {                                          \
1042       EXPECT_THROW((statement), expected_exception);         \
1043     } else {                                                 \
1044       EXPECT_NO_THROW((statement));                          \
1045     }                                                        \
1046   } while (false)
1047
1048 TEST(CheckCompatibleTest, SimplePrefixSecond) {
1049   std::vector<std::unique_ptr<Codec>> codecs;
1050   codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1051   codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1052   EXPECT_THROW_IF_DEBUG(
1053       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1054 }
1055
1056 TEST(CheckCompatibleTest, SimplePrefixFirst) {
1057   std::vector<std::unique_ptr<Codec>> codecs;
1058   codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1059   codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1060   EXPECT_THROW_IF_DEBUG(
1061       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1062 }
1063
1064 TEST(CheckCompatibleTest, Empty) {
1065   std::vector<std::unique_ptr<Codec>> codecs;
1066   codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION));
1067   EXPECT_THROW_IF_DEBUG(
1068       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1069 }
1070
1071 TEST(CheckCompatibleTest, ZstdPrefix) {
1072   std::vector<std::unique_ptr<Codec>> codecs;
1073   codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD));
1074   EXPECT_THROW_IF_DEBUG(
1075       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1076 }
1077
1078 TEST(CheckCompatibleTest, ZstdDuplicate) {
1079   std::vector<std::unique_ptr<Codec>> codecs;
1080   codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD));
1081   EXPECT_THROW_IF_DEBUG(
1082       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1083 }
1084
1085 TEST(CheckCompatibleTest, ZlibIsPrefix) {
1086   std::vector<std::unique_ptr<Codec>> codecs;
1087   codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD));
1088   EXPECT_THROW_IF_DEBUG(
1089       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1090 }
1091
1092 #if FOLLY_HAVE_LIBZSTD
1093
1094 TEST(ZstdTest, BackwardCompatible) {
1095   auto codec = getCodec(CodecType::ZSTD);
1096   {
1097     auto const data = IOBuf::wrapBuffer(randomDataHolder.data(size_t(1) << 20));
1098     auto compressed = codec->compress(data.get());
1099     compressed->coalesce();
1100     EXPECT_EQ(
1101         data->length(),
1102         ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1103   }
1104   {
1105     auto const data =
1106         IOBuf::wrapBuffer(randomDataHolder.data(size_t(100) << 20));
1107     auto compressed = codec->compress(data.get());
1108     compressed->coalesce();
1109     EXPECT_EQ(
1110         data->length(),
1111         ZSTD_getDecompressedSize(compressed->data(), compressed->length()));
1112   }
1113 }
1114
1115 #endif
1116 }}}  // namespaces
1117
1118 int main(int argc, char *argv[]) {
1119   testing::InitGoogleTest(&argc, argv);
1120   gflags::ParseCommandLineFlags(&argc, &argv, true);
1121
1122   auto ret = RUN_ALL_TESTS();
1123   if (!ret) {
1124     folly::runBenchmarksOnFlag();
1125   }
1126   return ret;
1127 }