Re-enable io tests
[folly.git] / folly / io / test / CompressionTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/Compression.h>
18
19 #include <random>
20 #include <set>
21 #include <thread>
22 #include <unordered_map>
23
24 #include <boost/noncopyable.hpp>
25 #include <glog/logging.h>
26
27 #include <folly/Benchmark.h>
28 #include <folly/Hash.h>
29 #include <folly/Random.h>
30 #include <folly/Varint.h>
31 #include <folly/io/IOBufQueue.h>
32 #include <folly/portability/GTest.h>
33
34 namespace folly { namespace io { namespace test {
35
36 class DataHolder : private boost::noncopyable {
37  public:
38   uint64_t hash(size_t size) const;
39   ByteRange data(size_t size) const;
40
41  protected:
42   explicit DataHolder(size_t sizeLog2);
43   const size_t size_;
44   std::unique_ptr<uint8_t[]> data_;
45   mutable std::unordered_map<uint64_t, uint64_t> hashCache_;
46 };
47
48 DataHolder::DataHolder(size_t sizeLog2)
49   : size_(size_t(1) << sizeLog2),
50     data_(new uint8_t[size_]) {
51 }
52
53 uint64_t DataHolder::hash(size_t size) const {
54   CHECK_LE(size, size_);
55   auto p = hashCache_.find(size);
56   if (p != hashCache_.end()) {
57     return p->second;
58   }
59
60   uint64_t h = folly::hash::fnv64_buf(data_.get(), size);
61   hashCache_[size] = h;
62   return h;
63 }
64
65 ByteRange DataHolder::data(size_t size) const {
66   CHECK_LE(size, size_);
67   return ByteRange(data_.get(), size);
68 }
69
70 uint64_t hashIOBuf(const IOBuf* buf) {
71   uint64_t h = folly::hash::FNV_64_HASH_START;
72   for (auto& range : *buf) {
73     h = folly::hash::fnv64_buf(range.data(), range.size(), h);
74   }
75   return h;
76 }
77
78 class RandomDataHolder : public DataHolder {
79  public:
80   explicit RandomDataHolder(size_t sizeLog2);
81 };
82
83 RandomDataHolder::RandomDataHolder(size_t sizeLog2)
84   : DataHolder(sizeLog2) {
85   constexpr size_t numThreadsLog2 = 3;
86   constexpr size_t numThreads = size_t(1) << numThreadsLog2;
87
88   uint32_t seed = randomNumberSeed();
89
90   std::vector<std::thread> threads;
91   threads.reserve(numThreads);
92   for (size_t t = 0; t < numThreads; ++t) {
93     threads.emplace_back(
94         [this, seed, t, numThreadsLog2, sizeLog2] () {
95           std::mt19937 rng(seed + t);
96           size_t countLog2 = sizeLog2 - numThreadsLog2;
97           size_t start = size_t(t) << countLog2;
98           for (size_t i = 0; i < countLog2; ++i) {
99             this->data_[start + i] = rng();
100           }
101         });
102   }
103
104   for (auto& t : threads) {
105     t.join();
106   }
107 }
108
109 class ConstantDataHolder : public DataHolder {
110  public:
111   explicit ConstantDataHolder(size_t sizeLog2);
112 };
113
114 ConstantDataHolder::ConstantDataHolder(size_t sizeLog2)
115   : DataHolder(sizeLog2) {
116   memset(data_.get(), 'a', size_);
117 }
118
119 constexpr size_t dataSizeLog2 = 27;  // 128MiB
120 RandomDataHolder randomDataHolder(dataSizeLog2);
121 ConstantDataHolder constantDataHolder(dataSizeLog2);
122
123 // The intersection of the provided codecs & those that are compiled in.
124 static std::vector<CodecType> supportedCodecs(std::vector<CodecType> const& v) {
125   std::vector<CodecType> supported;
126
127   std::copy_if(
128       std::begin(v),
129       std::end(v),
130       std::back_inserter(supported),
131       hasCodec);
132
133   return supported;
134 }
135
136 // All compiled-in compression codecs.
137 static std::vector<CodecType> availableCodecs() {
138   std::vector<CodecType> codecs;
139
140   for (size_t i = 0; i < static_cast<size_t>(CodecType::NUM_CODEC_TYPES); ++i) {
141     auto type = static_cast<CodecType>(i);
142     if (hasCodec(type)) {
143       codecs.push_back(type);
144     }
145   }
146
147   return codecs;
148 }
149
150 TEST(CompressionTestNeedsUncompressedLength, Simple) {
151   static const struct { CodecType type; bool needsUncompressedLength; }
152     expectations[] = {
153       { CodecType::NO_COMPRESSION, false },
154       { CodecType::LZ4, true },
155       { CodecType::SNAPPY, false },
156       { CodecType::ZLIB, false },
157       { CodecType::LZ4_VARINT_SIZE, false },
158       { CodecType::LZMA2, true },
159       { CodecType::LZMA2_VARINT_SIZE, false },
160       { CodecType::ZSTD, false },
161       { CodecType::GZIP, false },
162     };
163
164   for (auto const& test : expectations) {
165     if (hasCodec(test.type)) {
166       EXPECT_EQ(getCodec(test.type)->needsUncompressedLength(),
167                 test.needsUncompressedLength);
168     }
169   }
170 }
171
172 class CompressionTest
173     : public testing::TestWithParam<std::tr1::tuple<int, int, CodecType>> {
174  protected:
175   void SetUp() override {
176     auto tup = GetParam();
177     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
178     chunks_ = std::tr1::get<1>(tup);
179     codec_ = getCodec(std::tr1::get<2>(tup));
180   }
181
182   void runSimpleIOBufTest(const DataHolder& dh);
183
184   void runSimpleStringTest(const DataHolder& dh);
185
186  private:
187   std::unique_ptr<IOBuf> split(std::unique_ptr<IOBuf> data) const;
188
189   uint64_t uncompressedLength_;
190   size_t chunks_;
191   std::unique_ptr<Codec> codec_;
192 };
193
194 void CompressionTest::runSimpleIOBufTest(const DataHolder& dh) {
195   const auto original = split(IOBuf::wrapBuffer(dh.data(uncompressedLength_)));
196   const auto compressed = split(codec_->compress(original.get()));
197   if (!codec_->needsUncompressedLength()) {
198     auto uncompressed = codec_->uncompress(compressed.get());
199     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
200     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
201   }
202   {
203     auto uncompressed = codec_->uncompress(compressed.get(),
204                                            uncompressedLength_);
205     EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
206     EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
207   }
208 }
209
210 void CompressionTest::runSimpleStringTest(const DataHolder& dh) {
211   const auto original = std::string(
212       reinterpret_cast<const char*>(dh.data(uncompressedLength_).data()),
213       uncompressedLength_);
214   const auto compressed = codec_->compress(original);
215   if (!codec_->needsUncompressedLength()) {
216     auto uncompressed = codec_->uncompress(compressed);
217     EXPECT_EQ(uncompressedLength_, uncompressed.length());
218     EXPECT_EQ(uncompressed, original);
219   }
220   {
221     auto uncompressed = codec_->uncompress(compressed, uncompressedLength_);
222     EXPECT_EQ(uncompressedLength_, uncompressed.length());
223     EXPECT_EQ(uncompressed, original);
224   }
225 }
226
227 // Uniformly split data into (potentially empty) chunks.
228 std::unique_ptr<IOBuf> CompressionTest::split(
229     std::unique_ptr<IOBuf> data) const {
230   if (data->isChained()) {
231     data->coalesce();
232   }
233
234   const size_t size = data->computeChainDataLength();
235
236   std::multiset<size_t> splits;
237   for (size_t i = 1; i < chunks_; ++i) {
238     splits.insert(Random::rand64(size));
239   }
240
241   folly::IOBufQueue result;
242
243   size_t offset = 0;
244   for (size_t split : splits) {
245     result.append(IOBuf::copyBuffer(data->data() + offset, split - offset));
246     offset = split;
247   }
248   result.append(IOBuf::copyBuffer(data->data() + offset, size - offset));
249
250   return result.move();
251 }
252
253 TEST_P(CompressionTest, RandomData) {
254   runSimpleIOBufTest(randomDataHolder);
255 }
256
257 TEST_P(CompressionTest, ConstantData) {
258   runSimpleIOBufTest(constantDataHolder);
259 }
260
261 TEST_P(CompressionTest, RandomDataString) {
262   runSimpleStringTest(randomDataHolder);
263 }
264
265 TEST_P(CompressionTest, ConstantDataString) {
266   runSimpleStringTest(constantDataHolder);
267 }
268
269 INSTANTIATE_TEST_CASE_P(
270     CompressionTest,
271     CompressionTest,
272     testing::Combine(
273         testing::Values(0, 1, 12, 22, 25, 27),
274         testing::Values(1, 2, 3, 8, 65),
275         testing::ValuesIn(availableCodecs())));
276
277 class CompressionVarintTest
278     : public testing::TestWithParam<std::tr1::tuple<int, CodecType>> {
279  protected:
280   void SetUp() override {
281     auto tup = GetParam();
282     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
283     codec_ = getCodec(std::tr1::get<1>(tup));
284   }
285
286   void runSimpleTest(const DataHolder& dh);
287
288   uint64_t uncompressedLength_;
289   std::unique_ptr<Codec> codec_;
290 };
291
292 inline uint64_t oneBasedMsbPos(uint64_t number) {
293   uint64_t pos = 0;
294   for (; number > 0; ++pos, number >>= 1) {
295   }
296   return pos;
297 }
298
299 void CompressionVarintTest::runSimpleTest(const DataHolder& dh) {
300   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength_));
301   auto compressed = codec_->compress(original.get());
302   auto breakPoint =
303       1UL +
304       Random::rand64(
305           std::max(uint64_t(9), oneBasedMsbPos(uncompressedLength_)) / 9UL);
306   auto tinyBuf = IOBuf::copyBuffer(compressed->data(),
307                                    std::min(compressed->length(), breakPoint));
308   compressed->trimStart(breakPoint);
309   tinyBuf->prependChain(std::move(compressed));
310   compressed = std::move(tinyBuf);
311
312   auto uncompressed = codec_->uncompress(compressed.get());
313
314   EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
315   EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get()));
316 }
317
318 TEST_P(CompressionVarintTest, RandomData) {
319   runSimpleTest(randomDataHolder);
320 }
321
322 TEST_P(CompressionVarintTest, ConstantData) {
323   runSimpleTest(constantDataHolder);
324 }
325
326 INSTANTIATE_TEST_CASE_P(
327     CompressionVarintTest,
328     CompressionVarintTest,
329     testing::Combine(
330         testing::Values(0, 1, 12, 22, 25, 27),
331         testing::ValuesIn(supportedCodecs({
332             CodecType::LZ4_VARINT_SIZE,
333             CodecType::LZMA2_VARINT_SIZE,
334             }))));
335
336 class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
337  protected:
338   void SetUp() override { codec_ = getCodec(GetParam()); }
339
340   void runSimpleTest(const DataHolder& dh);
341
342   std::unique_ptr<Codec> codec_;
343 };
344
345 void CompressionCorruptionTest::runSimpleTest(const DataHolder& dh) {
346   constexpr uint64_t uncompressedLength = 42;
347   auto original = IOBuf::wrapBuffer(dh.data(uncompressedLength));
348   auto compressed = codec_->compress(original.get());
349
350   if (!codec_->needsUncompressedLength()) {
351     auto uncompressed = codec_->uncompress(compressed.get());
352     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
353     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
354   }
355   {
356     auto uncompressed = codec_->uncompress(compressed.get(),
357                                            uncompressedLength);
358     EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
359     EXPECT_EQ(dh.hash(uncompressedLength), hashIOBuf(uncompressed.get()));
360   }
361
362   EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1),
363                std::runtime_error);
364
365   // Corrupt the first character
366   ++(compressed->writableData()[0]);
367
368   if (!codec_->needsUncompressedLength()) {
369     EXPECT_THROW(codec_->uncompress(compressed.get()),
370                  std::runtime_error);
371   }
372
373   EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength),
374                std::runtime_error);
375 }
376
377 TEST_P(CompressionCorruptionTest, RandomData) {
378   runSimpleTest(randomDataHolder);
379 }
380
381 TEST_P(CompressionCorruptionTest, ConstantData) {
382   runSimpleTest(constantDataHolder);
383 }
384
385 INSTANTIATE_TEST_CASE_P(
386     CompressionCorruptionTest,
387     CompressionCorruptionTest,
388     testing::ValuesIn(
389         // NO_COMPRESSION can't detect corruption
390         // LZ4 can't detect corruption reliably (sigh)
391         supportedCodecs({
392             CodecType::SNAPPY,
393             CodecType::ZLIB,
394             })));
395
396 }}}  // namespaces
397
398 int main(int argc, char *argv[]) {
399   testing::InitGoogleTest(&argc, argv);
400   gflags::ParseCommandLineFlags(&argc, &argv, true);
401
402   auto ret = RUN_ALL_TESTS();
403   if (!ret) {
404     folly::runBenchmarksOnFlag();
405   }
406   return ret;
407 }