Add streaming API
[folly.git] / folly / io / test / CompressionTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/Compression.h>
18
19 #include <algorithm>
20 #include <random>
21 #include <set>
22 #include <thread>
23 #include <unordered_map>
24 #include <utility>
25
26 #include <boost/noncopyable.hpp>
27 #include <glog/logging.h>
28
29 #include <folly/Benchmark.h>
30 #include <folly/Hash.h>
31 #include <folly/Memory.h>
32 #include <folly/Random.h>
33 #include <folly/Varint.h>
34 #include <folly/io/IOBufQueue.h>
35 #include <folly/portability/GTest.h>
36
37 namespace folly { namespace io { namespace test {
38
39 class DataHolder : private boost::noncopyable {
40  public:
41   uint64_t hash(size_t size) const;
42   ByteRange data(size_t size) const;
43
44  protected:
45   explicit DataHolder(size_t sizeLog2);
46   const size_t size_;
47   std::unique_ptr<uint8_t[]> data_;
48   mutable std::unordered_map<uint64_t, uint64_t> hashCache_;
49 };
50
51 DataHolder::DataHolder(size_t sizeLog2)
52   : size_(size_t(1) << sizeLog2),
53     data_(new uint8_t[size_]) {
54 }
55
56 uint64_t DataHolder::hash(size_t size) const {
57   CHECK_LE(size, size_);
58   auto p = hashCache_.find(size);
59   if (p != hashCache_.end()) {
60     return p->second;
61   }
62
63   uint64_t h = folly::hash::fnv64_buf(data_.get(), size);
64   hashCache_[size] = h;
65   return h;
66 }
67
68 ByteRange DataHolder::data(size_t size) const {
69   CHECK_LE(size, size_);
70   return ByteRange(data_.get(), size);
71 }
72
73 uint64_t hashIOBuf(const IOBuf* buf) {
74   uint64_t h = folly::hash::FNV_64_HASH_START;
75   for (auto& range : *buf) {
76     h = folly::hash::fnv64_buf(range.data(), range.size(), h);
77   }
78   return h;
79 }
80
81 class RandomDataHolder : public DataHolder {
82  public:
83   explicit RandomDataHolder(size_t sizeLog2);
84 };
85
86 RandomDataHolder::RandomDataHolder(size_t sizeLog2)
87   : DataHolder(sizeLog2) {
88   static constexpr size_t numThreadsLog2 = 3;
89   static constexpr size_t numThreads = size_t(1) << numThreadsLog2;
90
91   uint32_t seed = randomNumberSeed();
92
93   std::vector<std::thread> threads;
94   threads.reserve(numThreads);
95   for (size_t t = 0; t < numThreads; ++t) {
96     threads.emplace_back([this, seed, t, sizeLog2] {
97       std::mt19937 rng(seed + t);
98       size_t countLog2 = sizeLog2 - numThreadsLog2;
99       size_t start = size_t(t) << countLog2;
100       for (size_t i = 0; i < countLog2; ++i) {
101         this->data_[start + i] = rng();
102       }
103     });
104   }
105
106   for (auto& t : threads) {
107     t.join();
108   }
109 }
110
111 class ConstantDataHolder : public DataHolder {
112  public:
113   explicit ConstantDataHolder(size_t sizeLog2);
114 };
115
116 ConstantDataHolder::ConstantDataHolder(size_t sizeLog2)
117   : DataHolder(sizeLog2) {
118   memset(data_.get(), 'a', size_);
119 }
120
121 constexpr size_t dataSizeLog2 = 27;  // 128MiB
122 RandomDataHolder randomDataHolder(dataSizeLog2);
123 ConstantDataHolder constantDataHolder(dataSizeLog2);
124
125 // The intersection of the provided codecs & those that are compiled in.
126 static std::vector<CodecType> supportedCodecs(std::vector<CodecType> const& v) {
127   std::vector<CodecType> supported;
128
129   std::copy_if(
130       std::begin(v),
131       std::end(v),
132       std::back_inserter(supported),
133       hasCodec);
134
135   return supported;
136 }
137
138 // All compiled-in compression codecs.
139 static std::vector<CodecType> availableCodecs() {
140   std::vector<CodecType> codecs;
141
142   for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
143     auto type = static_cast<CodecType>(i);
144     if (hasCodec(type)) {
145       codecs.push_back(type);
146     }
147   }
148
149   return codecs;
150 }
151
152 static std::vector<CodecType> availableStreamCodecs() {
153   std::vector<CodecType> codecs;
154
155   for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
156     auto type = static_cast<CodecType>(i);
157     if (hasStreamCodec(type)) {
158       codecs.push_back(type);
159     }
160   }
161
162   return codecs;
163 }
164
165 TEST(CompressionTestNeedsUncompressedLength, Simple) {
166   static const struct { CodecType type; bool needsUncompressedLength; }
167     expectations[] = {
168       { CodecType::NO_COMPRESSION, false },
169       { CodecType::LZ4, true },
170       { CodecType::SNAPPY, false },
171       { CodecType::ZLIB, false },
172       { CodecType::LZ4_VARINT_SIZE, false },
173       { CodecType::LZMA2, false },
174       { CodecType::LZMA2_VARINT_SIZE, false },
175       { CodecType::ZSTD, false },
176       { CodecType::GZIP, false },
177       { CodecType::LZ4_FRAME, false },
178       { CodecType::BZIP2, false },
179     };
180
181   for (auto const& test : expectations) {
182     if (hasCodec(test.type)) {
183       EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(),
184                 test.needsUncompressedLength);
185     }
186   }
187 }
188
189 class CompressionTest
190     : public testing::TestWithParam<std::tr1::tuple<int, int, CodecType>> {
191  protected:
192   void SetUp() override {
193     auto tup = GetParam();
194     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
195     chunks_ = std::tr1::get<1>(tup);
196     codec_ = getCodec(std::tr1::get<2>(tup));
197   }
198
199   void runSimpleIOBufTest(const DataHolder& dh);
200
201   void runSimpleStringTest(const DataHolder& dh);
202
203  private:
204   std::unique_ptr<IOBuf> split(std::unique_ptr<IOBuf> data) const;
205
206   uint64_t uncompressedLength_;
207   size_t chunks_;
208   std::unique_ptr<Codec> codec_;
209 };
210
211 void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) {
212   const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_)));
213   const auto compressed = split(codec_->compress(original.get()));
214   if (!codec_->needsUncompressedLength()) {
215     auto uncompressed = codec_->uncompress(compressed.get());
216     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
217     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
218   }
219   {
220     auto uncompressed = codec_->uncompress(compressed.get(),
221                                            uncompressedLength_);
222     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
223     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
224   }
225 }
226
227 void CompressionTest::runSimpleStringTest(const DataHolder& dh) {
228   const auto original = std::string(
229       reinterpret_cast<const char*>(dh.data(uncompressedLength_).data()),
230       uncompressedLength_);
231   const auto compressed = codec_->compress(original);
232   if (!codec_->needsUncompressedLength()) {
233     auto uncompressed = codec_->uncompress(compressed);
234     EXPECT_EQ(uncompressedLength_, uncompressed.length());
235     EXPECT_EQ(uncompressed, original);
236   }
237   {
238     auto uncompressed = codec_->uncompress(compressed, uncompressedLength_);
239     EXPECT_EQ(uncompressedLength_, uncompressed.length());
240     EXPECT_EQ(uncompressed, original);
241   }
242 }
243
244 // Uniformly split data into (potentially empty) chunks.
245 std::unique_ptr<IOBuf> CompressionTest::split(
246     std::unique_ptr<IOBuf> data) const {
247   if (data->isChained()) {
248     data->coalesce();
249   }
250
251   const size_t size = data->computeChainDataLength();
252
253   std::multiset<size_t> splits;
254   for (size_t i = 1; i < chunks_; ++i) {
255     splits.insert(Random::rand64(size));
256   }
257
258   folly::IOBufQueue result;
259
260   size_t offset = 0;
261   for (size_t split : splits) {
262     result.append(IOBuf::copyBuffer(data->data() + offset, split - offset));
263     offset = split;
264   }
265   result.append(IOBuf::copyBuffer(data->data() + offset, size - offset));
266
267   return result.move();
268 }
269
270 TEST_P(CompressionTest, RandomData) {
271   runSimpleIOBufTest(randomDataHolder);
272 }
273
274 TEST_P(CompressionTest, ConstantData) {
275   runSimpleIOBufTest(constantDataHolder);
276 }
277
278 TEST_P(CompressionTest, RandomDataString) {
279   runSimpleStringTest(randomDataHolder);
280 }
281
282 TEST_P(CompressionTest, ConstantDataString) {
283   runSimpleStringTest(constantDataHolder);
284 }
285
286 INSTANTIATE_TEST_CASE_P(
287     CompressionTest,
288     CompressionTest,
289     testing::Combine(
290         testing::Values(0, 1, 12, 22, 25, 27),
291         testing::Values(1, 2, 3, 8, 65),
292         testing::ValuesIn(availableCodecs())));
293
294 class CompressionVarintTest
295     : public testing::TestWithParam<std::tr1::tuple<int, CodecType>> {
296  protected:
297   void SetUp() override {
298     auto tup = GetParam();
299     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
300     codec_ = getCodec(std::tr1::get<1>(tup));
301   }
302
303   void runSimpleTest(const DataHolder& dh);
304
305   uint64_t uncompressedLength_;
306   std::unique_ptr<Codec> codec_;
307 };
308
309 inline uint64_t oneBasedMsbPos(uint64_t number) {
310   uint64_t pos = 0;
311   for (; number > 0; ++pos, number >>= 1) {
312   }
313   return pos;
314 }
315
316 void CompressionVarintTest::runSimpleTest(const DataHolder& dh) {
317   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
318   auto compressed = codec_->compress(original.get());
319   auto breakPoint =
320       1UL +
321       Random::rand64(
322           std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL);
323   auto tinyBuf = IOBuf::copyBuffer(compressed->data(),
324                                    std::min(compressed->length(), breakPoint));
325   compressed->trimStart(breakPoint);
326   tinyBuf->prependChain(std::move(compressed));
327   compressed = std::move(tinyBuf);
328
329   auto uncompressed = codec_->uncompress(compressed.get());
330
331   EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
332   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
333 }
334
335 TEST_P(CompressionVarintTest, RandomData) {
336   runSimpleTest(randomDataHolder);
337 }
338
339 TEST_P(CompressionVarintTest, ConstantData) {
340   runSimpleTest(constantDataHolder);
341 }
342
343 INSTANTIATE_TEST_CASE_P(
344     CompressionVarintTest,
345     CompressionVarintTest,
346     testing::Combine(
347         testing::Values(0, 1, 12, 22, 25, 27),
348         testing::ValuesIn(supportedCodecs({
349             CodecType::LZ4_VARINT_SIZE,
350             CodecType::LZMA2_VARINT_SIZE,
351             }))));
352
353 class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
354  protected:
355   void SetUp() override { codec_ = getCodec(GetParam()); }
356
357   void runSimpleTest(const DataHolder& dh);
358
359   std::unique_ptr<Codec> codec_;
360 };
361
362 void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) {
363   constexpr uint64_t uncompressedLength = 42;
364   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
365   auto compressed = codec_->compress(original.get());
366
367   if (!codec_->needsUncompressedLength()) {
368     auto uncompressed = codec_->uncompress(compressed.get());
369     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
370     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
371   }
372   {
373     auto uncompressed = codec_->uncompress(compressed.get(),
374                                            uncompressedLength);
375     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
376     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
377   }
378
379   EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1),
380                std::runtime_error);
381
382   // Corrupt the first character
383   ++(compressed->writableData()[0]);
384
385   if (!codec_->needsUncompressedLength()) {
386     EXPECT_THROW(codec_->uncompress(compressed.get()),
387                  std::runtime_error);
388   }
389
390   EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength),
391                std::runtime_error);
392 }
393
394 TEST_P(CompressionCorruptionTest, RandomData) {
395   runSimpleTest(randomDataHolder);
396 }
397
398 TEST_P(CompressionCorruptionTest, ConstantData) {
399   runSimpleTest(constantDataHolder);
400 }
401
402 INSTANTIATE_TEST_CASE_P(
403     CompressionCorruptionTest,
404     CompressionCorruptionTest,
405     testing::ValuesIn(
406         // NO_COMPRESSION can't detect corruption
407         // LZ4 can't detect corruption reliably (sigh)
408         supportedCodecs({
409             CodecType::SNAPPY,
410             CodecType::ZLIB,
411             CodecType::LZMA2,
412             CodecType::ZSTD,
413             CodecType::LZ4_FRAME,
414             CodecType::BZIP2,
415         })));
416
417 class StreamingUnitTest : public testing::TestWithParam<CodecType> {
418  protected:
419   void SetUp() override {
420     codec_ = getStreamCodec(GetParam());
421   }
422
423   std::unique_ptr<StreamCodec> codec_;
424 };
425
426 TEST_P(StreamingUnitTest, maxCompressedLength) {
427   EXPECT_EQ(0, codec_->maxCompressedLength(0));
428   for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) {
429     EXPECT_GE(codec_->maxCompressedLength(length), length);
430   }
431 }
432
433 TEST_P(StreamingUnitTest, getUncompressedLength) {
434   auto const empty = IOBuf::create(0);
435   EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get()));
436   EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0));
437
438   auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100));
439   auto const compressed = codec_->compress(data.get());
440
441   EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0));
442   if (auto const length = codec_->getUncompressedLength(data.get())) {
443     EXPECT_EQ(100, *length);
444   }
445   EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100));
446   // If the uncompressed length is stored in the frame, then make sure it throws
447   // when it is given the wrong length.
448   if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) {
449     EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200));
450   }
451 }
452
453 TEST_P(StreamingUnitTest, emptyData) {
454   ByteRange input{};
455   auto buffer = IOBuf::create(1);
456   buffer->append(buffer->capacity());
457   MutableByteRange output{};
458
459   // Test compressing empty data in one pass
460   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
461   codec_->resetStream(0);
462   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
463   codec_->resetStream();
464   output = {buffer->writableData(), buffer->length()};
465   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
466   EXPECT_EQ(buffer->length(), output.size());
467
468   // Test compressing empty data with multiple calls to compressStream()
469   codec_->resetStream();
470   output = {};
471   EXPECT_FALSE(codec_->compressStream(input, output));
472   EXPECT_TRUE(
473       codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
474   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
475   codec_->resetStream();
476   output = {buffer->writableData(), buffer->length()};
477   EXPECT_FALSE(codec_->compressStream(input, output));
478   EXPECT_TRUE(
479       codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH));
480   EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END));
481   EXPECT_EQ(buffer->length(), output.size());
482
483   // Test uncompressing empty data
484   output = {};
485   codec_->resetStream();
486   EXPECT_TRUE(codec_->uncompressStream(input, output));
487   codec_->resetStream();
488   EXPECT_TRUE(
489       codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
490   codec_->resetStream();
491   EXPECT_TRUE(
492       codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
493   codec_->resetStream(0);
494   EXPECT_TRUE(codec_->uncompressStream(input, output));
495   codec_->resetStream(0);
496   EXPECT_TRUE(
497       codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH));
498   codec_->resetStream(0);
499   EXPECT_TRUE(
500       codec_->uncompressStream(input, output, StreamCodec::FlushOp::END));
501 }
502
503 TEST_P(StreamingUnitTest, noForwardProgressOkay) {
504   auto inBuffer = IOBuf::create(2);
505   inBuffer->writableData()[0] = 'a';
506   inBuffer->writableData()[0] = 'a';
507   inBuffer->append(2);
508   auto input = inBuffer->coalesce();
509   auto compressed = codec_->compress(inBuffer.get());
510
511   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2));
512   MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()};
513
514   ByteRange emptyInput;
515   MutableByteRange emptyOutput;
516
517   // Compress some data to avoid empty data special casing
518   codec_->resetStream();
519   while (!input.empty()) {
520     codec_->compressStream(input, output);
521   }
522   // empty input and output is okay for flush NONE and FLUSH.
523   codec_->compressStream(emptyInput, emptyOutput);
524   codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH);
525
526   codec_->resetStream();
527   input = inBuffer->coalesce();
528   output = {outBuffer->writableTail(), outBuffer->tailroom()};
529   while (!input.empty()) {
530     codec_->compressStream(input, output);
531   }
532   // empty input and output is okay for flush END.
533   codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END);
534
535   codec_->resetStream();
536   input = compressed->coalesce();
537   input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete
538   output = {inBuffer->writableData(), inBuffer->length()};
539   // Uncompress some data to avoid empty data special casing
540   while (!input.empty()) {
541     EXPECT_FALSE(codec_->uncompressStream(input, output));
542   }
543   // empty input and output is okay for all flush values.
544   EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput));
545   EXPECT_FALSE(codec_->uncompressStream(
546       emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH));
547   EXPECT_FALSE(codec_->uncompressStream(
548       emptyInput, emptyOutput, StreamCodec::FlushOp::END));
549 }
550
551 TEST_P(StreamingUnitTest, stateTransitions) {
552   auto inBuffer = IOBuf::create(1);
553   inBuffer->writableData()[0] = 'a';
554   inBuffer->append(1);
555   auto compressed = codec_->compress(inBuffer.get());
556   ByteRange const in = compressed->coalesce();
557   auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size()));
558   MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()};
559
560   auto compress = [&](
561       StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
562       bool empty = false) {
563     auto input = in;
564     auto output = empty ? MutableByteRange{} : out;
565     return codec_->compressStream(input, output, flushOp);
566   };
567   auto uncompress = [&](
568       StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE,
569       bool empty = false) {
570     auto input = in;
571     auto output = empty ? MutableByteRange{} : out;
572     return codec_->uncompressStream(input, output, flushOp);
573   };
574
575   // compression flow
576   codec_->resetStream();
577   EXPECT_FALSE(compress());
578   EXPECT_FALSE(compress());
579   EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH));
580   EXPECT_FALSE(compress());
581   EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
582   // uncompression flow
583   codec_->resetStream();
584   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
585   codec_->resetStream();
586   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true));
587   codec_->resetStream();
588   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
589   codec_->resetStream();
590   EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true));
591   codec_->resetStream();
592   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
593   // compress -> uncompress
594   codec_->resetStream();
595   EXPECT_FALSE(compress());
596   EXPECT_THROW(uncompress(), std::logic_error);
597   // uncompress -> compress
598   codec_->resetStream();
599   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
600   EXPECT_THROW(compress(), std::logic_error);
601   // end -> compress
602   codec_->resetStream();
603   EXPECT_FALSE(compress());
604   EXPECT_TRUE(compress(StreamCodec::FlushOp::END));
605   EXPECT_THROW(compress(), std::logic_error);
606   // end -> uncompress
607   codec_->resetStream();
608   EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH));
609   EXPECT_THROW(uncompress(), std::logic_error);
610   // flush -> compress
611   codec_->resetStream();
612   EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
613   EXPECT_THROW(compress(), std::logic_error);
614   // flush -> end
615   codec_->resetStream();
616   EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true));
617   EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error);
618   // undefined -> compress
619   codec_->compress(inBuffer.get());
620   EXPECT_THROW(compress(), std::logic_error);
621   codec_->uncompress(compressed.get());
622   EXPECT_THROW(compress(), std::logic_error);
623   // undefined -> undefined
624   codec_->uncompress(compressed.get());
625   codec_->compress(inBuffer.get());
626 }
627
628 INSTANTIATE_TEST_CASE_P(
629     StreamingUnitTest,
630     StreamingUnitTest,
631     testing::ValuesIn(availableStreamCodecs()));
632
633 class StreamingCompressionTest
634     : public testing::TestWithParam<std::tuple<int, int, CodecType>> {
635  protected:
636   void SetUp() override {
637     auto const tup = GetParam();
638     uncompressedLength_ = uint64_t(1) << std::get<0>(tup);
639     chunkSize_ = size_t(1) << std::get<1>(tup);
640     codec_ = getStreamCodec(std::get<2>(tup));
641   }
642
643   void runResetStreamTest(DataHolder const& dh);
644   void runCompressStreamTest(DataHolder const& dh);
645   void runUncompressStreamTest(DataHolder const& dh);
646   void runFlushTest(DataHolder const& dh);
647
648  private:
649   std::vector<ByteRange> split(ByteRange data) const;
650
651   uint64_t uncompressedLength_;
652   size_t chunkSize_;
653   std::unique_ptr<StreamCodec> codec_;
654 };
655
656 std::vector<ByteRange> StreamingCompressionTest::split(ByteRange data) const {
657   size_t const pieces = std::max<size_t>(1, data.size() / chunkSize_);
658   std::vector<ByteRange> result;
659   result.reserve(pieces + 1);
660   while (!data.empty()) {
661     size_t const pieceSize = std::min(data.size(), chunkSize_);
662     result.push_back(data.subpiece(0, pieceSize));
663     data.uncheckedAdvance(pieceSize);
664   }
665   return result;
666 }
667
668 static std::unique_ptr<IOBuf> compressSome(
669     StreamCodec* codec,
670     ByteRange data,
671     uint64_t bufferSize,
672     StreamCodec::FlushOp flush) {
673   bool result;
674   IOBufQueue queue;
675   do {
676     auto buffer = IOBuf::create(bufferSize);
677     buffer->append(buffer->capacity());
678     MutableByteRange output{buffer->writableData(), buffer->length()};
679
680     result = codec->compressStream(data, output, flush);
681     buffer->trimEnd(output.size());
682     queue.append(std::move(buffer));
683
684   } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result);
685   EXPECT_TRUE(data.empty());
686   return queue.move();
687 }
688
689 static std::pair<bool, std::unique_ptr<IOBuf>> uncompressSome(
690     StreamCodec* codec,
691     ByteRange& data,
692     uint64_t bufferSize,
693     StreamCodec::FlushOp flush) {
694   bool result;
695   IOBufQueue queue;
696   do {
697     auto buffer = IOBuf::create(bufferSize);
698     buffer->append(buffer->capacity());
699     MutableByteRange output{buffer->writableData(), buffer->length()};
700
701     result = codec->uncompressStream(data, output, flush);
702     buffer->trimEnd(output.size());
703     queue.append(std::move(buffer));
704
705   } while (queue.tailroom() == 0 && !result);
706   return std::make_pair(result, queue.move());
707 }
708
709 void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) {
710   auto const input = dh.data(uncompressedLength_);
711   // Compress some but leave state unclean
712   codec_->resetStream(uncompressedLength_);
713   compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE);
714   // Reset stream and compress all
715   codec_->resetStream();
716   auto compressed =
717       compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
718   auto const uncompressed = codec_->uncompress(compressed.get(), input.size());
719   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
720 }
721
722 TEST_P(StreamingCompressionTest, resetStream) {
723   runResetStreamTest(constantDataHolder);
724   runResetStreamTest(randomDataHolder);
725 }
726
727 void StreamingCompressionTest::runCompressStreamTest(
728     const folly::io::test::DataHolder& dh) {
729   auto const inputs = split(dh.data(uncompressedLength_));
730
731   IOBufQueue queue;
732   codec_->resetStream(uncompressedLength_);
733   // Compress many inputs in a row
734   for (auto const input : inputs) {
735     queue.append(compressSome(
736         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE));
737   }
738   // Finish the operation with empty input.
739   ByteRange empty;
740   queue.append(
741       compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END));
742
743   auto const uncompressed = codec_->uncompress(queue.front());
744   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
745 }
746
747 TEST_P(StreamingCompressionTest, compressStream) {
748   runCompressStreamTest(constantDataHolder);
749   runCompressStreamTest(randomDataHolder);
750 }
751
752 void StreamingCompressionTest::runUncompressStreamTest(
753     const folly::io::test::DataHolder& dh) {
754   auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
755   // Concatenate 3 compressed frames in a row
756   auto compressed = codec_->compress(data.get());
757   compressed->prependChain(codec_->compress(data.get()));
758   compressed->prependChain(codec_->compress(data.get()));
759   // Pass all 3 compressed frames in one input buffer
760   auto input = compressed->coalesce();
761   // Uncompress the first frame
762   codec_->resetStream(data->computeChainDataLength());
763   {
764     auto const result = uncompressSome(
765         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
766     ASSERT_TRUE(result.first);
767     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
768   }
769   // Uncompress the second frame
770   codec_->resetStream();
771   {
772     auto const result = uncompressSome(
773         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END);
774     ASSERT_TRUE(result.first);
775     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
776   }
777   // Uncompress the third frame
778   codec_->resetStream();
779   {
780     auto const result = uncompressSome(
781         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
782     ASSERT_TRUE(result.first);
783     ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
784   }
785   EXPECT_TRUE(input.empty());
786 }
787
788 TEST_P(StreamingCompressionTest, uncompressStream) {
789   runUncompressStreamTest(constantDataHolder);
790   runUncompressStreamTest(randomDataHolder);
791 }
792
793 void StreamingCompressionTest::runFlushTest(DataHolder const& dh) {
794   auto const inputs = split(dh.data(uncompressedLength_));
795   auto uncodec = getStreamCodec(codec_->type());
796
797   codec_->resetStream();
798   for (auto input : inputs) {
799     // Compress some data and flush the stream
800     auto compressed = compressSome(
801         codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH);
802     auto compressedRange = compressed->coalesce();
803     // Uncompress the compressed data
804     auto result = uncompressSome(
805         uncodec.get(),
806         compressedRange,
807         chunkSize_,
808         StreamCodec::FlushOp::FLUSH);
809     // All compressed data should have been consumed
810     EXPECT_TRUE(compressedRange.empty());
811     // The frame isn't complete
812     EXPECT_FALSE(result.first);
813     // The uncompressed data should be exactly the input data
814     EXPECT_EQ(input.size(), result.second->computeChainDataLength());
815     auto const data = IOBuf::wrapBuffer(input);
816     EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get()));
817   }
818 }
819
820 TEST_P(StreamingCompressionTest, testFlush) {
821   runFlushTest(constantDataHolder);
822   runFlushTest(randomDataHolder);
823 }
824
825 INSTANTIATE_TEST_CASE_P(
826     StreamingCompressionTest,
827     StreamingCompressionTest,
828     testing::Combine(
829         testing::Values(0, 1, 12, 22, 27),
830         testing::Values(12, 17, 20),
831         testing::ValuesIn(availableStreamCodecs())));
832
833 class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
834  protected:
835   void SetUp() override {
836     codec_ = getCodec(GetParam());
837     auto_ = getAutoUncompressionCodec();
838   }
839
840   void runSimpleTest(const DataHolder& dh);
841
842   std::unique_ptr<Codec> codec_;
843   std::unique_ptr<Codec> auto_;
844 };
845
846 void AutomaticCodecTest::runSimpleTest(const DataHolder& dh) {
847   constexpr uint64_t uncompressedLength = 1000;
848   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
849   auto compressed = codec_->compress(original.get());
850
851   if (!codec_->needsUncompressedLength()) {
852     auto uncompressed = auto_->uncompress(compressed.get());
853     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
854     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
855   }
856   {
857     auto uncompressed = auto_->uncompress(compressed.get(), uncompressedLength);
858     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
859     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
860   }
861   ASSERT_GE(compressed->computeChainDataLength(), 8);
862   for (size_t i = 0; i < 8; ++i) {
863     auto split = compressed->clone();
864     auto rest = compressed->clone();
865     split->trimEnd(split->length() - i);
866     rest->trimStart(i);
867     split->appendChain(std::move(rest));
868     auto uncompressed = auto_->uncompress(split.get(), uncompressedLength);
869     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
870     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
871   }
872 }
873
874 TEST_P(AutomaticCodecTest, RandomData) {
875   runSimpleTest(randomDataHolder);
876 }
877
878 TEST_P(AutomaticCodecTest, ConstantData) {
879   runSimpleTest(constantDataHolder);
880 }
881
882 TEST_P(AutomaticCodecTest, ValidPrefixes) {
883   const auto prefixes = codec_->validPrefixes();
884   for (const auto& prefix : prefixes) {
885     EXPECT_FALSE(prefix.empty());
886     // Ensure that all strings are at least 8 bytes for LZMA2.
887     // The bytes after the prefix should be ignored by `canUncompress()`.
888     IOBuf data{IOBuf::COPY_BUFFER, prefix, 0, 8};
889     data.append(8);
890     EXPECT_TRUE(codec_->canUncompress(&data));
891     EXPECT_TRUE(auto_->canUncompress(&data));
892   }
893 }
894
895 TEST_P(AutomaticCodecTest, NeedsUncompressedLength) {
896   if (codec_->needsUncompressedLength()) {
897     EXPECT_TRUE(auto_->needsUncompressedLength());
898   }
899 }
900
901 TEST_P(AutomaticCodecTest, maxUncompressedLength) {
902   EXPECT_LE(codec_->maxUncompressedLength(), auto_->maxUncompressedLength());
903 }
904
905 TEST_P(AutomaticCodecTest, DefaultCodec) {
906   const uint64_t length = 42;
907   std::vector<std::unique_ptr<Codec>> codecs;
908   codecs.push_back(getCodec(CodecType::ZSTD));
909   auto automatic = getAutoUncompressionCodec(std::move(codecs));
910   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
911   auto compressed = codec_->compress(original.get());
912   auto decompressed = automatic->uncompress(compressed.get());
913
914   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
915 }
916
917 namespace {
918 class CustomCodec : public Codec {
919  public:
920   static std::unique_ptr<Codec> create(std::string prefix, CodecType type) {
921     return std::make_unique<CustomCodec>(std::move(prefix), type);
922   }
923   explicit CustomCodec(std::string prefix, CodecType type)
924       : Codec(CodecType::USER_DEFINED),
925         prefix_(std::move(prefix)),
926         codec_(getCodec(type)) {}
927
928  private:
929   std::vector<std::string> validPrefixes() const override {
930     return {prefix_};
931   }
932
933   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override {
934     return codec_->maxCompressedLength(uncompressedLength) + prefix_.size();
935   }
936
937   bool canUncompress(const IOBuf* data, Optional<uint64_t>) const override {
938     auto clone = data->cloneCoalescedAsValue();
939     if (clone.length() < prefix_.size()) {
940       return false;
941     }
942     return memcmp(clone.data(), prefix_.data(), prefix_.size()) == 0;
943   }
944
945   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override {
946     auto result = IOBuf::copyBuffer(prefix_);
947     result->appendChain(codec_->compress(data));
948     EXPECT_TRUE(canUncompress(result.get(), data->computeChainDataLength()));
949     return result;
950   }
951
952   std::unique_ptr<IOBuf> doUncompress(
953       const IOBuf* data,
954       Optional<uint64_t> uncompressedLength) override {
955     EXPECT_TRUE(canUncompress(data, uncompressedLength));
956     auto clone = data->cloneCoalescedAsValue();
957     clone.trimStart(prefix_.size());
958     return codec_->uncompress(&clone, uncompressedLength);
959   }
960
961   std::string prefix_;
962   std::unique_ptr<Codec> codec_;
963 };
964 }
965
966 TEST_P(AutomaticCodecTest, CustomCodec) {
967   const uint64_t length = 42;
968   auto ab = CustomCodec::create("ab", CodecType::ZSTD);
969   std::vector<std::unique_ptr<Codec>> codecs;
970   codecs.push_back(CustomCodec::create("ab", CodecType::ZSTD));
971   auto automatic = getAutoUncompressionCodec(std::move(codecs));
972   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
973
974   auto abCompressed = ab->compress(original.get());
975   auto abDecompressed = automatic->uncompress(abCompressed.get());
976   EXPECT_TRUE(automatic->canUncompress(abCompressed.get()));
977   EXPECT_FALSE(auto_->canUncompress(abCompressed.get()));
978   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(abDecompressed.get()));
979
980   auto compressed = codec_->compress(original.get());
981   auto decompressed = automatic->uncompress(compressed.get());
982   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
983 }
984
985 TEST_P(AutomaticCodecTest, CustomDefaultCodec) {
986   const uint64_t length = 42;
987   auto none = CustomCodec::create("none", CodecType::NO_COMPRESSION);
988   std::vector<std::unique_ptr<Codec>> codecs;
989   codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
990   codecs.push_back(getCodec(CodecType::LZ4_FRAME));
991   auto automatic = getAutoUncompressionCodec(std::move(codecs));
992   auto original = IOBuf::wrapBuffer(constantDataHolder.data(length));
993
994   auto noneCompressed = none->compress(original.get());
995   auto noneDecompressed = automatic->uncompress(noneCompressed.get());
996   EXPECT_TRUE(automatic->canUncompress(noneCompressed.get()));
997   EXPECT_FALSE(auto_->canUncompress(noneCompressed.get()));
998   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(noneDecompressed.get()));
999
1000   auto compressed = codec_->compress(original.get());
1001   auto decompressed = automatic->uncompress(compressed.get());
1002   EXPECT_EQ(constantDataHolder.hash(length), hashIOBuf(decompressed.get()));
1003 }
1004
1005 TEST_P(AutomaticCodecTest, canUncompressOneBytes) {
1006   // No default codec can uncompress 1 bytes.
1007   IOBuf buf{IOBuf::CREATE, 1};
1008   buf.append(1);
1009   EXPECT_FALSE(codec_->canUncompress(&buf, 1));
1010   EXPECT_FALSE(codec_->canUncompress(&buf, folly::none));
1011   EXPECT_FALSE(auto_->canUncompress(&buf, 1));
1012   EXPECT_FALSE(auto_->canUncompress(&buf, folly::none));
1013 }
1014
1015 INSTANTIATE_TEST_CASE_P(
1016     AutomaticCodecTest,
1017     AutomaticCodecTest,
1018     testing::Values(
1019         CodecType::LZ4_FRAME,
1020         CodecType::ZSTD,
1021         CodecType::ZLIB,
1022         CodecType::GZIP,
1023         CodecType::LZMA2,
1024         CodecType::BZIP2));
1025
1026 TEST(ValidPrefixesTest, CustomCodec) {
1027   std::vector<std::unique_ptr<Codec>> codecs;
1028   codecs.push_back(CustomCodec::create("none", CodecType::NO_COMPRESSION));
1029   const auto none = getAutoUncompressionCodec(std::move(codecs));
1030   const auto prefixes = none->validPrefixes();
1031   const auto it = std::find(prefixes.begin(), prefixes.end(), "none");
1032   EXPECT_TRUE(it != prefixes.end());
1033 }
1034
1035 #define EXPECT_THROW_IF_DEBUG(statement, expected_exception) \
1036   do {                                                       \
1037     if (kIsDebug) {                                          \
1038       EXPECT_THROW((statement), expected_exception);         \
1039     } else {                                                 \
1040       EXPECT_NO_THROW((statement));                          \
1041     }                                                        \
1042   } while (false)
1043
1044 TEST(CheckCompatibleTest, SimplePrefixSecond) {
1045   std::vector<std::unique_ptr<Codec>> codecs;
1046   codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1047   codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1048   EXPECT_THROW_IF_DEBUG(
1049       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1050 }
1051
1052 TEST(CheckCompatibleTest, SimplePrefixFirst) {
1053   std::vector<std::unique_ptr<Codec>> codecs;
1054   codecs.push_back(CustomCodec::create("ab", CodecType::NO_COMPRESSION));
1055   codecs.push_back(CustomCodec::create("abc", CodecType::NO_COMPRESSION));
1056   EXPECT_THROW_IF_DEBUG(
1057       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1058 }
1059
1060 TEST(CheckCompatibleTest, Empty) {
1061   std::vector<std::unique_ptr<Codec>> codecs;
1062   codecs.push_back(CustomCodec::create("", CodecType::NO_COMPRESSION));
1063   EXPECT_THROW_IF_DEBUG(
1064       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1065 }
1066
1067 TEST(CheckCompatibleTest, ZstdPrefix) {
1068   std::vector<std::unique_ptr<Codec>> codecs;
1069   codecs.push_back(CustomCodec::create("\x28\xB5\x2F", CodecType::ZSTD));
1070   EXPECT_THROW_IF_DEBUG(
1071       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1072 }
1073
1074 TEST(CheckCompatibleTest, ZstdDuplicate) {
1075   std::vector<std::unique_ptr<Codec>> codecs;
1076   codecs.push_back(CustomCodec::create("\x28\xB5\x2F\xFD", CodecType::ZSTD));
1077   EXPECT_THROW_IF_DEBUG(
1078       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1079 }
1080
1081 TEST(CheckCompatibleTest, ZlibIsPrefix) {
1082   std::vector<std::unique_ptr<Codec>> codecs;
1083   codecs.push_back(CustomCodec::create("\x18\x76zzasdf", CodecType::ZSTD));
1084   EXPECT_THROW_IF_DEBUG(
1085       getAutoUncompressionCodec(std::move(codecs)), std::invalid_argument);
1086 }
1087 }}}  // namespaces
1088
1089 int main(int argc, char *argv[]) {
1090   testing::InitGoogleTest(&argc, argv);
1091   gflags::ParseCommandLineFlags(&argc, &argv, true);
1092
1093   auto ret = RUN_ALL_TESTS();
1094   if (!ret) {
1095     folly::runBenchmarksOnFlag();
1096   }
1097   return ret;
1098 }