Fix decompression of truncated data
[folly.git] / folly / io / Compression.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/Compression.h>
18
19 #if FOLLY_HAVE_LIBLZ4
20 #include <lz4.h>
21 #include <lz4hc.h>
22 #if LZ4_VERSION_NUMBER >= 10301
23 #include <lz4frame.h>
24 #endif
25 #endif
26
27 #include <glog/logging.h>
28
29 #if FOLLY_HAVE_LIBSNAPPY
30 #include <snappy.h>
31 #include <snappy-sinksource.h>
32 #endif
33
34 #if FOLLY_HAVE_LIBZ
35 #include <zlib.h>
36 #endif
37
38 #if FOLLY_HAVE_LIBLZMA
39 #include <lzma.h>
40 #endif
41
42 #if FOLLY_HAVE_LIBZSTD
43 #define ZSTD_STATIC_LINKING_ONLY
44 #include <zstd.h>
45 #endif
46
47 #if FOLLY_HAVE_LIBBZ2
48 #include <bzlib.h>
49 #endif
50
51 #include <folly/Bits.h>
52 #include <folly/Conv.h>
53 #include <folly/Memory.h>
54 #include <folly/Portability.h>
55 #include <folly/ScopeGuard.h>
56 #include <folly/Varint.h>
57 #include <folly/io/Cursor.h>
58 #include <algorithm>
59 #include <unordered_set>
60
61 namespace folly { namespace io {
62
63 Codec::Codec(CodecType type) : type_(type) { }
64
65 // Ensure consistent behavior in the nullptr case
66 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
67   uint64_t len = data->computeChainDataLength();
68   if (len == 0) {
69     return IOBuf::create(0);
70   }
71   if (len > maxUncompressedLength()) {
72     throw std::runtime_error("Codec: uncompressed length too large");
73   }
74
75   return doCompress(data);
76 }
77
78 std::string Codec::compress(const StringPiece data) {
79   const uint64_t len = data.size();
80   if (len == 0) {
81     return "";
82   }
83   if (len > maxUncompressedLength()) {
84     throw std::runtime_error("Codec: uncompressed length too large");
85   }
86
87   return doCompressString(data);
88 }
89
90 std::unique_ptr<IOBuf> Codec::uncompress(
91     const IOBuf* data,
92     Optional<uint64_t> uncompressedLength) {
93   if (!uncompressedLength) {
94     if (needsUncompressedLength()) {
95       throw std::invalid_argument("Codec: uncompressed length required");
96     }
97   } else if (*uncompressedLength > maxUncompressedLength()) {
98     throw std::runtime_error("Codec: uncompressed length too large");
99   }
100
101   if (data->empty()) {
102     if (uncompressedLength.value_or(0) != 0) {
103       throw std::runtime_error("Codec: invalid uncompressed length");
104     }
105     return IOBuf::create(0);
106   }
107
108   return doUncompress(data, uncompressedLength);
109 }
110
111 std::string Codec::uncompress(
112     const StringPiece data,
113     Optional<uint64_t> uncompressedLength) {
114   if (!uncompressedLength) {
115     if (needsUncompressedLength()) {
116       throw std::invalid_argument("Codec: uncompressed length required");
117     }
118   } else if (*uncompressedLength > maxUncompressedLength()) {
119     throw std::runtime_error("Codec: uncompressed length too large");
120   }
121
122   if (data.empty()) {
123     if (uncompressedLength.value_or(0) != 0) {
124       throw std::runtime_error("Codec: invalid uncompressed length");
125     }
126     return "";
127   }
128
129   return doUncompressString(data, uncompressedLength);
130 }
131
132 bool Codec::needsUncompressedLength() const {
133   return doNeedsUncompressedLength();
134 }
135
136 uint64_t Codec::maxUncompressedLength() const {
137   return doMaxUncompressedLength();
138 }
139
140 bool Codec::doNeedsUncompressedLength() const {
141   return false;
142 }
143
144 uint64_t Codec::doMaxUncompressedLength() const {
145   return UNLIMITED_UNCOMPRESSED_LENGTH;
146 }
147
148 std::vector<std::string> Codec::validPrefixes() const {
149   return {};
150 }
151
152 bool Codec::canUncompress(const IOBuf*, Optional<uint64_t>) const {
153   return false;
154 }
155
156 std::string Codec::doCompressString(const StringPiece data) {
157   const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
158   auto outputBuffer = doCompress(&inputBuffer);
159   std::string output;
160   output.reserve(outputBuffer->computeChainDataLength());
161   for (auto range : *outputBuffer) {
162     output.append(reinterpret_cast<const char*>(range.data()), range.size());
163   }
164   return output;
165 }
166
167 std::string Codec::doUncompressString(
168     const StringPiece data,
169     Optional<uint64_t> uncompressedLength) {
170   const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
171   auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
172   std::string output;
173   output.reserve(outputBuffer->computeChainDataLength());
174   for (auto range : *outputBuffer) {
175     output.append(reinterpret_cast<const char*>(range.data()), range.size());
176   }
177   return output;
178 }
179
180 uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
181   if (uncompressedLength == 0) {
182     return 0;
183   }
184   return doMaxCompressedLength(uncompressedLength);
185 }
186
187 Optional<uint64_t> Codec::getUncompressedLength(
188     const folly::IOBuf* data,
189     Optional<uint64_t> uncompressedLength) const {
190   auto const compressedLength = data->computeChainDataLength();
191   if (uncompressedLength == uint64_t(0) || compressedLength == 0) {
192     if (uncompressedLength.value_or(0) != 0 || compressedLength != 0) {
193       throw std::runtime_error("Invalid uncompressed length");
194     }
195     return 0;
196   }
197   return doGetUncompressedLength(data, uncompressedLength);
198 }
199
200 Optional<uint64_t> Codec::doGetUncompressedLength(
201     const folly::IOBuf*,
202     Optional<uint64_t> uncompressedLength) const {
203   return uncompressedLength;
204 }
205
206 bool StreamCodec::needsDataLength() const {
207   return doNeedsDataLength();
208 }
209
210 bool StreamCodec::doNeedsDataLength() const {
211   return false;
212 }
213
214 void StreamCodec::assertStateIs(State expected) const {
215   if (state_ != expected) {
216     throw std::logic_error(folly::to<std::string>(
217         "Codec: state is ", state_, "; expected state ", expected));
218   }
219 }
220
221 void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
222   state_ = State::RESET;
223   uncompressedLength_ = uncompressedLength;
224   doResetStream();
225 }
226
227 bool StreamCodec::compressStream(
228     ByteRange& input,
229     MutableByteRange& output,
230     StreamCodec::FlushOp flushOp) {
231   if (state_ == State::RESET && input.empty()) {
232     if (flushOp == StreamCodec::FlushOp::NONE) {
233       return false;
234     }
235     if (flushOp == StreamCodec::FlushOp::END &&
236         uncompressedLength().value_or(0) != 0) {
237       throw std::runtime_error("Codec: invalid uncompressed length");
238     }
239     return true;
240   }
241   if (state_ == State::RESET && !input.empty() &&
242       uncompressedLength() == uint64_t(0)) {
243     throw std::runtime_error("Codec: invalid uncompressed length");
244   }
245   // Handle input state transitions
246   switch (flushOp) {
247     case StreamCodec::FlushOp::NONE:
248       if (state_ == State::RESET) {
249         state_ = State::COMPRESS;
250       }
251       assertStateIs(State::COMPRESS);
252       break;
253     case StreamCodec::FlushOp::FLUSH:
254       if (state_ == State::RESET || state_ == State::COMPRESS) {
255         state_ = State::COMPRESS_FLUSH;
256       }
257       assertStateIs(State::COMPRESS_FLUSH);
258       break;
259     case StreamCodec::FlushOp::END:
260       if (state_ == State::RESET || state_ == State::COMPRESS) {
261         state_ = State::COMPRESS_END;
262       }
263       assertStateIs(State::COMPRESS_END);
264       break;
265   }
266   bool const done = doCompressStream(input, output, flushOp);
267   // Handle output state transitions
268   if (done) {
269     if (state_ == State::COMPRESS_FLUSH) {
270       state_ = State::COMPRESS;
271     } else if (state_ == State::COMPRESS_END) {
272       state_ = State::END;
273     }
274     // Check internal invariants
275     DCHECK(input.empty());
276     DCHECK(flushOp != StreamCodec::FlushOp::NONE);
277   }
278   return done;
279 }
280
281 bool StreamCodec::uncompressStream(
282     ByteRange& input,
283     MutableByteRange& output,
284     StreamCodec::FlushOp flushOp) {
285   if (state_ == State::RESET && input.empty()) {
286     if (uncompressedLength().value_or(0) == 0) {
287       return true;
288     }
289     return false;
290   }
291   // Handle input state transitions
292   if (state_ == State::RESET) {
293     state_ = State::UNCOMPRESS;
294   }
295   assertStateIs(State::UNCOMPRESS);
296   bool const done = doUncompressStream(input, output, flushOp);
297   // Handle output state transitions
298   if (done) {
299     state_ = State::END;
300   }
301   return done;
302 }
303
304 static std::unique_ptr<IOBuf> addOutputBuffer(
305     MutableByteRange& output,
306     uint64_t size) {
307   DCHECK(output.empty());
308   auto buffer = IOBuf::create(size);
309   buffer->append(buffer->capacity());
310   output = {buffer->writableData(), buffer->length()};
311   return buffer;
312 }
313
314 std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
315   uint64_t const uncompressedLength = data->computeChainDataLength();
316   resetStream(uncompressedLength);
317   uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
318
319   auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
320   auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
321
322   MutableByteRange output;
323   auto buffer = addOutputBuffer(
324       output,
325       maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
326                                                : kDefaultBufferLength);
327
328   // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
329   IOBuf const* current = data;
330   ByteRange input{current->data(), current->length()};
331   StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
332   for (;;) {
333     while (input.empty() && current->next() != data) {
334       current = current->next();
335       input = {current->data(), current->length()};
336     }
337     if (current->next() == data) {
338       // This is the last input buffer so end the stream
339       flushOp = StreamCodec::FlushOp::END;
340     }
341     if (output.empty()) {
342       buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
343     }
344     size_t const inputSize = input.size();
345     size_t const outputSize = output.size();
346     bool const done = compressStream(input, output, flushOp);
347     if (done) {
348       DCHECK(input.empty());
349       DCHECK(flushOp == StreamCodec::FlushOp::END);
350       DCHECK_EQ(current->next(), data);
351       break;
352     }
353     if (inputSize == input.size() && outputSize == output.size()) {
354       throw std::runtime_error("Codec: No forward progress made");
355     }
356   }
357   buffer->prev()->trimEnd(output.size());
358   return buffer;
359 }
360
361 static uint64_t computeBufferLength(
362     uint64_t const compressedLength,
363     uint64_t const blockSize) {
364   uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
365   uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
366   return std::min(goodBufferSize, kMaxBufferLength);
367 }
368
369 std::unique_ptr<IOBuf> StreamCodec::doUncompress(
370     IOBuf const* data,
371     Optional<uint64_t> uncompressedLength) {
372   auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
373   auto constexpr kBlockSize = uint64_t(128) << 10;
374   auto const defaultBufferLength =
375       computeBufferLength(data->computeChainDataLength(), kBlockSize);
376
377   uncompressedLength = getUncompressedLength(data, uncompressedLength);
378   resetStream(uncompressedLength);
379
380   MutableByteRange output;
381   auto buffer = addOutputBuffer(
382       output,
383       (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
384            ? *uncompressedLength
385            : defaultBufferLength));
386
387   // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
388   IOBuf const* current = data;
389   ByteRange input{current->data(), current->length()};
390   StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
391   for (;;) {
392     while (input.empty() && current->next() != data) {
393       current = current->next();
394       input = {current->data(), current->length()};
395     }
396     if (current->next() == data) {
397       // Tell the uncompressor there is no more input (it may optimize)
398       flushOp = StreamCodec::FlushOp::END;
399     }
400     if (output.empty()) {
401       buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
402     }
403     size_t const inputSize = input.size();
404     size_t const outputSize = output.size();
405     bool const done = uncompressStream(input, output, flushOp);
406     if (done) {
407       break;
408     }
409     if (inputSize == input.size() && outputSize == output.size()) {
410       throw std::runtime_error("Codec: Truncated data");
411     }
412   }
413   if (!input.empty()) {
414     throw std::runtime_error("Codec: Junk after end of data");
415   }
416
417   buffer->prev()->trimEnd(output.size());
418   if (uncompressedLength &&
419       *uncompressedLength != buffer->computeChainDataLength()) {
420     throw std::runtime_error("Codec: invalid uncompressed length");
421   }
422
423   return buffer;
424 }
425
426 namespace {
427
428 /**
429  * No compression
430  */
431 class NoCompressionCodec final : public Codec {
432  public:
433   static std::unique_ptr<Codec> create(int level, CodecType type);
434   explicit NoCompressionCodec(int level, CodecType type);
435
436  private:
437   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
438   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
439   std::unique_ptr<IOBuf> doUncompress(
440       const IOBuf* data,
441       Optional<uint64_t> uncompressedLength) override;
442 };
443
444 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
445   return std::make_unique<NoCompressionCodec>(level, type);
446 }
447
448 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
449   : Codec(type) {
450   DCHECK(type == CodecType::NO_COMPRESSION);
451   switch (level) {
452   case COMPRESSION_LEVEL_DEFAULT:
453   case COMPRESSION_LEVEL_FASTEST:
454   case COMPRESSION_LEVEL_BEST:
455     level = 0;
456   }
457   if (level != 0) {
458     throw std::invalid_argument(to<std::string>(
459         "NoCompressionCodec: invalid level ", level));
460   }
461 }
462
463 uint64_t NoCompressionCodec::doMaxCompressedLength(
464     uint64_t uncompressedLength) const {
465   return uncompressedLength;
466 }
467
468 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
469     const IOBuf* data) {
470   return data->clone();
471 }
472
473 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
474     const IOBuf* data,
475     Optional<uint64_t> uncompressedLength) {
476   if (uncompressedLength &&
477       data->computeChainDataLength() != *uncompressedLength) {
478     throw std::runtime_error(
479         to<std::string>("NoCompressionCodec: invalid uncompressed length"));
480   }
481   return data->clone();
482 }
483
484 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
485
486 namespace {
487
488 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
489   DCHECK_GE(out->tailroom(), kMaxVarintLength64);
490   out->append(encodeVarint(val, out->writableTail()));
491 }
492
493 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
494   uint64_t val = 0;
495   int8_t b = 0;
496   for (int shift = 0; shift <= 63; shift += 7) {
497     b = cursor.read<int8_t>();
498     val |= static_cast<uint64_t>(b & 0x7f) << shift;
499     if (b >= 0) {
500       break;
501     }
502   }
503   if (b < 0) {
504     throw std::invalid_argument("Invalid varint value. Too big.");
505   }
506   return val;
507 }
508
509 }  // namespace
510
511 #endif  // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
512
513 namespace {
514 /**
515  * Reads sizeof(T) bytes, and returns false if not enough bytes are available.
516  * Returns true if the first n bytes are equal to prefix when interpreted as
517  * a little endian T.
518  */
519 template <typename T>
520 typename std::enable_if<std::is_unsigned<T>::value, bool>::type
521 dataStartsWithLE(const IOBuf* data, T prefix, uint64_t n = sizeof(T)) {
522   DCHECK_GT(n, 0);
523   DCHECK_LE(n, sizeof(T));
524   T value;
525   Cursor cursor{data};
526   if (!cursor.tryReadLE(value)) {
527     return false;
528   }
529   const T mask = n == sizeof(T) ? T(-1) : (T(1) << (8 * n)) - 1;
530   return prefix == (value & mask);
531 }
532
533 template <typename T>
534 typename std::enable_if<std::is_arithmetic<T>::value, std::string>::type
535 prefixToStringLE(T prefix, uint64_t n = sizeof(T)) {
536   DCHECK_GT(n, 0);
537   DCHECK_LE(n, sizeof(T));
538   prefix = Endian::little(prefix);
539   std::string result;
540   result.resize(n);
541   memcpy(&result[0], &prefix, n);
542   return result;
543 }
544 } // namespace
545
546 #if FOLLY_HAVE_LIBLZ4
547
548 /**
549  * LZ4 compression
550  */
551 class LZ4Codec final : public Codec {
552  public:
553   static std::unique_ptr<Codec> create(int level, CodecType type);
554   explicit LZ4Codec(int level, CodecType type);
555
556  private:
557   bool doNeedsUncompressedLength() const override;
558   uint64_t doMaxUncompressedLength() const override;
559   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
560
561   bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
562
563   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
564   std::unique_ptr<IOBuf> doUncompress(
565       const IOBuf* data,
566       Optional<uint64_t> uncompressedLength) override;
567
568   bool highCompression_;
569 };
570
571 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
572   return std::make_unique<LZ4Codec>(level, type);
573 }
574
575 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
576   DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
577
578   switch (level) {
579   case COMPRESSION_LEVEL_FASTEST:
580   case COMPRESSION_LEVEL_DEFAULT:
581     level = 1;
582     break;
583   case COMPRESSION_LEVEL_BEST:
584     level = 2;
585     break;
586   }
587   if (level < 1 || level > 2) {
588     throw std::invalid_argument(to<std::string>(
589         "LZ4Codec: invalid level: ", level));
590   }
591   highCompression_ = (level > 1);
592 }
593
594 bool LZ4Codec::doNeedsUncompressedLength() const {
595   return !encodeSize();
596 }
597
598 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
599 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
600 // here.
601 #ifndef LZ4_MAX_INPUT_SIZE
602 # define LZ4_MAX_INPUT_SIZE 0x7E000000
603 #endif
604
605 uint64_t LZ4Codec::doMaxUncompressedLength() const {
606   return LZ4_MAX_INPUT_SIZE;
607 }
608
609 uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
610   return LZ4_compressBound(uncompressedLength) +
611       (encodeSize() ? kMaxVarintLength64 : 0);
612 }
613
614 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
615   IOBuf clone;
616   if (data->isChained()) {
617     // LZ4 doesn't support streaming, so we have to coalesce
618     clone = data->cloneCoalescedAsValue();
619     data = &clone;
620   }
621
622   auto out = IOBuf::create(maxCompressedLength(data->length()));
623   if (encodeSize()) {
624     encodeVarintToIOBuf(data->length(), out.get());
625   }
626
627   int n;
628   auto input = reinterpret_cast<const char*>(data->data());
629   auto output = reinterpret_cast<char*>(out->writableTail());
630   const auto inputLength = data->length();
631 #if LZ4_VERSION_NUMBER >= 10700
632   if (highCompression_) {
633     n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
634   } else {
635     n = LZ4_compress_default(input, output, inputLength, out->tailroom());
636   }
637 #else
638   if (highCompression_) {
639     n = LZ4_compressHC(input, output, inputLength);
640   } else {
641     n = LZ4_compress(input, output, inputLength);
642   }
643 #endif
644
645   CHECK_GE(n, 0);
646   CHECK_LE(n, out->capacity());
647
648   out->append(n);
649   return out;
650 }
651
652 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
653     const IOBuf* data,
654     Optional<uint64_t> uncompressedLength) {
655   IOBuf clone;
656   if (data->isChained()) {
657     // LZ4 doesn't support streaming, so we have to coalesce
658     clone = data->cloneCoalescedAsValue();
659     data = &clone;
660   }
661
662   folly::io::Cursor cursor(data);
663   uint64_t actualUncompressedLength;
664   if (encodeSize()) {
665     actualUncompressedLength = decodeVarintFromCursor(cursor);
666     if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
667       throw std::runtime_error("LZ4Codec: invalid uncompressed length");
668     }
669   } else {
670     // Invariants
671     DCHECK(uncompressedLength.hasValue());
672     DCHECK(*uncompressedLength <= maxUncompressedLength());
673     actualUncompressedLength = *uncompressedLength;
674   }
675
676   auto sp = StringPiece{cursor.peekBytes()};
677   auto out = IOBuf::create(actualUncompressedLength);
678   int n = LZ4_decompress_safe(
679       sp.data(),
680       reinterpret_cast<char*>(out->writableTail()),
681       sp.size(),
682       actualUncompressedLength);
683
684   if (n < 0 || uint64_t(n) != actualUncompressedLength) {
685     throw std::runtime_error(to<std::string>(
686         "LZ4 decompression returned invalid value ", n));
687   }
688   out->append(actualUncompressedLength);
689   return out;
690 }
691
692 #if LZ4_VERSION_NUMBER >= 10301
693
694 class LZ4FrameCodec final : public Codec {
695  public:
696   static std::unique_ptr<Codec> create(int level, CodecType type);
697   explicit LZ4FrameCodec(int level, CodecType type);
698   ~LZ4FrameCodec() override;
699
700   std::vector<std::string> validPrefixes() const override;
701   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
702       const override;
703
704  private:
705   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
706
707   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
708   std::unique_ptr<IOBuf> doUncompress(
709       const IOBuf* data,
710       Optional<uint64_t> uncompressedLength) override;
711
712   // Reset the dctx_ if it is dirty or null.
713   void resetDCtx();
714
715   int level_;
716   LZ4F_decompressionContext_t dctx_{nullptr};
717   bool dirty_{false};
718 };
719
720 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
721     int level,
722     CodecType type) {
723   return std::make_unique<LZ4FrameCodec>(level, type);
724 }
725
726 static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204;
727
728 std::vector<std::string> LZ4FrameCodec::validPrefixes() const {
729   return {prefixToStringLE(kLZ4FrameMagicLE)};
730 }
731
732 bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
733   return dataStartsWithLE(data, kLZ4FrameMagicLE);
734 }
735
736 uint64_t LZ4FrameCodec::doMaxCompressedLength(
737     uint64_t uncompressedLength) const {
738   LZ4F_preferences_t prefs{};
739   prefs.compressionLevel = level_;
740   prefs.frameInfo.contentSize = uncompressedLength;
741   return LZ4F_compressFrameBound(uncompressedLength, &prefs);
742 }
743
744 static size_t lz4FrameThrowOnError(size_t code) {
745   if (LZ4F_isError(code)) {
746     throw std::runtime_error(
747         to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
748   }
749   return code;
750 }
751
752 void LZ4FrameCodec::resetDCtx() {
753   if (dctx_ && !dirty_) {
754     return;
755   }
756   if (dctx_) {
757     LZ4F_freeDecompressionContext(dctx_);
758   }
759   lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
760   dirty_ = false;
761 }
762
763 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
764   DCHECK(type == CodecType::LZ4_FRAME);
765   switch (level) {
766     case COMPRESSION_LEVEL_FASTEST:
767     case COMPRESSION_LEVEL_DEFAULT:
768       level_ = 0;
769       break;
770     case COMPRESSION_LEVEL_BEST:
771       level_ = 16;
772       break;
773     default:
774       level_ = level;
775       break;
776   }
777 }
778
779 LZ4FrameCodec::~LZ4FrameCodec() {
780   if (dctx_) {
781     LZ4F_freeDecompressionContext(dctx_);
782   }
783 }
784
785 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
786   // LZ4 Frame compression doesn't support streaming so we have to coalesce
787   IOBuf clone;
788   if (data->isChained()) {
789     clone = data->cloneCoalescedAsValue();
790     data = &clone;
791   }
792   // Set preferences
793   const auto uncompressedLength = data->length();
794   LZ4F_preferences_t prefs{};
795   prefs.compressionLevel = level_;
796   prefs.frameInfo.contentSize = uncompressedLength;
797   // Compress
798   auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
799   const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
800       buf->writableTail(),
801       buf->tailroom(),
802       data->data(),
803       data->length(),
804       &prefs));
805   buf->append(written);
806   return buf;
807 }
808
809 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
810     const IOBuf* data,
811     Optional<uint64_t> uncompressedLength) {
812   // Reset the dctx if any errors have occurred
813   resetDCtx();
814   // Coalesce the data
815   ByteRange in = *data->begin();
816   IOBuf clone;
817   if (data->isChained()) {
818     clone = data->cloneCoalescedAsValue();
819     in = clone.coalesce();
820   }
821   data = nullptr;
822   // Select decompression options
823   LZ4F_decompressOptions_t options;
824   options.stableDst = 1;
825   // Select blockSize and growthSize for the IOBufQueue
826   IOBufQueue queue(IOBufQueue::cacheChainLength());
827   auto blockSize = uint64_t{64} << 10;
828   auto growthSize = uint64_t{4} << 20;
829   if (uncompressedLength) {
830     // Allocate uncompressedLength in one chunk (up to 64 MB)
831     const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20);
832     queue.preallocate(allocateSize, allocateSize);
833     blockSize = std::min(*uncompressedLength, blockSize);
834     growthSize = std::min(*uncompressedLength, growthSize);
835   } else {
836     // Reduce growthSize for small data
837     const auto guessUncompressedLen =
838         4 * std::max<uint64_t>(blockSize, in.size());
839     growthSize = std::min(guessUncompressedLen, growthSize);
840   }
841   // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
842   // returns 0
843   dirty_ = true;
844   // Decompress until the frame is over
845   size_t code = 0;
846   do {
847     // Allocate enough space to decompress at least a block
848     void* out;
849     size_t outSize;
850     std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
851     // Decompress
852     size_t inSize = in.size();
853     code = lz4FrameThrowOnError(
854         LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
855     if (in.empty() && outSize == 0 && code != 0) {
856       // We passed no input, no output was produced, and the frame isn't over
857       // No more forward progress is possible
858       throw std::runtime_error("LZ4Frame error: Incomplete frame");
859     }
860     in.uncheckedAdvance(inSize);
861     queue.postallocate(outSize);
862   } while (code != 0);
863   // At this point the decompression context can be reused
864   dirty_ = false;
865   if (uncompressedLength && queue.chainLength() != *uncompressedLength) {
866     throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
867   }
868   return queue.move();
869 }
870
871 #endif // LZ4_VERSION_NUMBER >= 10301
872 #endif // FOLLY_HAVE_LIBLZ4
873
874 #if FOLLY_HAVE_LIBSNAPPY
875
876 /**
877  * Snappy compression
878  */
879
880 /**
881  * Implementation of snappy::Source that reads from a IOBuf chain.
882  */
883 class IOBufSnappySource final : public snappy::Source {
884  public:
885   explicit IOBufSnappySource(const IOBuf* data);
886   size_t Available() const override;
887   const char* Peek(size_t* len) override;
888   void Skip(size_t n) override;
889  private:
890   size_t available_;
891   io::Cursor cursor_;
892 };
893
894 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
895   : available_(data->computeChainDataLength()),
896     cursor_(data) {
897 }
898
899 size_t IOBufSnappySource::Available() const {
900   return available_;
901 }
902
903 const char* IOBufSnappySource::Peek(size_t* len) {
904   auto sp = StringPiece{cursor_.peekBytes()};
905   *len = sp.size();
906   return sp.data();
907 }
908
909 void IOBufSnappySource::Skip(size_t n) {
910   CHECK_LE(n, available_);
911   cursor_.skip(n);
912   available_ -= n;
913 }
914
915 class SnappyCodec final : public Codec {
916  public:
917   static std::unique_ptr<Codec> create(int level, CodecType type);
918   explicit SnappyCodec(int level, CodecType type);
919
920  private:
921   uint64_t doMaxUncompressedLength() const override;
922   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
923   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
924   std::unique_ptr<IOBuf> doUncompress(
925       const IOBuf* data,
926       Optional<uint64_t> uncompressedLength) override;
927 };
928
929 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
930   return std::make_unique<SnappyCodec>(level, type);
931 }
932
933 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
934   DCHECK(type == CodecType::SNAPPY);
935   switch (level) {
936   case COMPRESSION_LEVEL_FASTEST:
937   case COMPRESSION_LEVEL_DEFAULT:
938   case COMPRESSION_LEVEL_BEST:
939     level = 1;
940   }
941   if (level != 1) {
942     throw std::invalid_argument(to<std::string>(
943         "SnappyCodec: invalid level: ", level));
944   }
945 }
946
947 uint64_t SnappyCodec::doMaxUncompressedLength() const {
948   // snappy.h uses uint32_t for lengths, so there's that.
949   return std::numeric_limits<uint32_t>::max();
950 }
951
952 uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
953   return snappy::MaxCompressedLength(uncompressedLength);
954 }
955
956 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
957   IOBufSnappySource source(data);
958   auto out = IOBuf::create(maxCompressedLength(source.Available()));
959
960   snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
961       out->writableTail()));
962
963   size_t n = snappy::Compress(&source, &sink);
964
965   CHECK_LE(n, out->capacity());
966   out->append(n);
967   return out;
968 }
969
970 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(
971     const IOBuf* data,
972     Optional<uint64_t> uncompressedLength) {
973   uint32_t actualUncompressedLength = 0;
974
975   {
976     IOBufSnappySource source(data);
977     if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
978       throw std::runtime_error("snappy::GetUncompressedLength failed");
979     }
980     if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
981       throw std::runtime_error("snappy: invalid uncompressed length");
982     }
983   }
984
985   auto out = IOBuf::create(actualUncompressedLength);
986
987   {
988     IOBufSnappySource source(data);
989     if (!snappy::RawUncompress(&source,
990                                reinterpret_cast<char*>(out->writableTail()))) {
991       throw std::runtime_error("snappy::RawUncompress failed");
992     }
993   }
994
995   out->append(actualUncompressedLength);
996   return out;
997 }
998
999 #endif  // FOLLY_HAVE_LIBSNAPPY
1000
1001 #if FOLLY_HAVE_LIBZ
1002 /**
1003  * Zlib codec
1004  */
1005 class ZlibStreamCodec final : public StreamCodec {
1006  public:
1007   static std::unique_ptr<Codec> createCodec(int level, CodecType type);
1008   static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
1009   explicit ZlibStreamCodec(int level, CodecType type);
1010   ~ZlibStreamCodec() override;
1011
1012   std::vector<std::string> validPrefixes() const override;
1013   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1014       const override;
1015
1016  private:
1017   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1018
1019   void doResetStream() override;
1020   bool doCompressStream(
1021       ByteRange& input,
1022       MutableByteRange& output,
1023       StreamCodec::FlushOp flush) override;
1024   bool doUncompressStream(
1025       ByteRange& input,
1026       MutableByteRange& output,
1027       StreamCodec::FlushOp flush) override;
1028
1029   void resetDeflateStream();
1030   void resetInflateStream();
1031
1032   Optional<z_stream> deflateStream_{};
1033   Optional<z_stream> inflateStream_{};
1034   int level_;
1035   bool needReset_{true};
1036 };
1037
1038 static constexpr uint16_t kGZIPMagicLE = 0x8B1F;
1039
1040 std::vector<std::string> ZlibStreamCodec::validPrefixes() const {
1041   if (type() == CodecType::ZLIB) {
1042     // Zlib streams start with a 2 byte header.
1043     //
1044     //   0   1
1045     // +---+---+
1046     // |CMF|FLG|
1047     // +---+---+
1048     //
1049     // We won't restrict the values of any sub-fields except as described below.
1050     //
1051     // The lowest 4 bits of CMF is the compression method (CM).
1052     // CM == 0x8 is the deflate compression method, which is currently the only
1053     // supported compression method, so any valid prefix must have CM == 0x8.
1054     //
1055     // The lowest 5 bits of FLG is FCHECK.
1056     // FCHECK must be such that the two header bytes are a multiple of 31 when
1057     // interpreted as a big endian 16-bit number.
1058     std::vector<std::string> result;
1059     // 16 values for the first byte, 8 values for the second byte.
1060     // There are also 4 combinations where both 0x00 and 0x1F work as FCHECK.
1061     result.reserve(132);
1062     // Select all values for the CMF byte that use the deflate algorithm 0x8.
1063     for (uint32_t first = 0x0800; first <= 0xF800; first += 0x1000) {
1064       // Select all values for the FLG, but leave FCHECK as 0 since it's fixed.
1065       for (uint32_t second = 0x00; second <= 0xE0; second += 0x20) {
1066         uint16_t prefix = first | second;
1067         // Compute FCHECK.
1068         prefix += 31 - (prefix % 31);
1069         result.push_back(prefixToStringLE(Endian::big(prefix)));
1070         // zlib won't produce this, but it is a valid prefix.
1071         if ((prefix & 0x1F) == 31) {
1072           prefix -= 31;
1073           result.push_back(prefixToStringLE(Endian::big(prefix)));
1074         }
1075       }
1076     }
1077     return result;
1078   } else {
1079     // The gzip frame starts with 2 magic bytes.
1080     return {prefixToStringLE(kGZIPMagicLE)};
1081   }
1082 }
1083
1084 bool ZlibStreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1085     const {
1086   if (type() == CodecType::ZLIB) {
1087     uint16_t value;
1088     Cursor cursor{data};
1089     if (!cursor.tryReadBE(value)) {
1090       return false;
1091     }
1092     // zlib compressed if using deflate and is a multiple of 31.
1093     return (value & 0x0F00) == 0x0800 && value % 31 == 0;
1094   } else {
1095     return dataStartsWithLE(data, kGZIPMagicLE);
1096   }
1097 }
1098
1099 uint64_t ZlibStreamCodec::doMaxCompressedLength(
1100     uint64_t uncompressedLength) const {
1101   return deflateBound(nullptr, uncompressedLength);
1102 }
1103
1104 std::unique_ptr<Codec> ZlibStreamCodec::createCodec(int level, CodecType type) {
1105   return std::make_unique<ZlibStreamCodec>(level, type);
1106 }
1107
1108 std::unique_ptr<StreamCodec> ZlibStreamCodec::createStream(
1109     int level,
1110     CodecType type) {
1111   return std::make_unique<ZlibStreamCodec>(level, type);
1112 }
1113
1114 ZlibStreamCodec::ZlibStreamCodec(int level, CodecType type)
1115     : StreamCodec(type) {
1116   DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP);
1117   switch (level) {
1118     case COMPRESSION_LEVEL_FASTEST:
1119       level = 1;
1120       break;
1121     case COMPRESSION_LEVEL_DEFAULT:
1122       level = Z_DEFAULT_COMPRESSION;
1123       break;
1124     case COMPRESSION_LEVEL_BEST:
1125       level = 9;
1126       break;
1127   }
1128   if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
1129     throw std::invalid_argument(
1130         to<std::string>("ZlibStreamCodec: invalid level: ", level));
1131   }
1132   level_ = level;
1133 }
1134
1135 ZlibStreamCodec::~ZlibStreamCodec() {
1136   if (deflateStream_) {
1137     deflateEnd(deflateStream_.get_pointer());
1138     deflateStream_.clear();
1139   }
1140   if (inflateStream_) {
1141     inflateEnd(inflateStream_.get_pointer());
1142     inflateStream_.clear();
1143   }
1144 }
1145
1146 void ZlibStreamCodec::doResetStream() {
1147   needReset_ = true;
1148 }
1149
1150 void ZlibStreamCodec::resetDeflateStream() {
1151   if (deflateStream_) {
1152     int const rc = deflateReset(deflateStream_.get_pointer());
1153     if (rc != Z_OK) {
1154       deflateStream_.clear();
1155       throw std::runtime_error(
1156           to<std::string>("ZlibStreamCodec: deflateReset error: ", rc));
1157     }
1158     return;
1159   }
1160   deflateStream_ = z_stream{};
1161   // Using deflateInit2() to support gzip.  "The windowBits parameter is the
1162   // base two logarithm of the maximum window size (...) The default value is
1163   // 15 (...) Add 16 to windowBits to write a simple gzip header and trailer
1164   // around the compressed data instead of a zlib wrapper. The gzip header
1165   // will have no file name, no extra data, no comment, no modification time
1166   // (set to zero), no header crc, and the operating system will be set to 255
1167   // (unknown)."
1168   int const windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
1169   // All other parameters (method, memLevel, strategy) get default values from
1170   // the zlib manual.
1171   int const rc = deflateInit2(
1172       deflateStream_.get_pointer(),
1173       level_,
1174       Z_DEFLATED,
1175       windowBits,
1176       /* memLevel */ 8,
1177       Z_DEFAULT_STRATEGY);
1178   if (rc != Z_OK) {
1179     deflateStream_.clear();
1180     throw std::runtime_error(
1181         to<std::string>("ZlibStreamCodec: deflateInit error: ", rc));
1182   }
1183 }
1184
1185 void ZlibStreamCodec::resetInflateStream() {
1186   if (inflateStream_) {
1187     int const rc = inflateReset(inflateStream_.get_pointer());
1188     if (rc != Z_OK) {
1189       inflateStream_.clear();
1190       throw std::runtime_error(
1191           to<std::string>("ZlibStreamCodec: inflateReset error: ", rc));
1192     }
1193     return;
1194   }
1195   inflateStream_ = z_stream{};
1196   // "The windowBits parameter is the base two logarithm of the maximum window
1197   // size (...) The default value is 15 (...) add 16 to decode only the gzip
1198   // format (the zlib format will return a Z_DATA_ERROR)."
1199   int const windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
1200   int const rc = inflateInit2(inflateStream_.get_pointer(), windowBits);
1201   if (rc != Z_OK) {
1202     inflateStream_.clear();
1203     throw std::runtime_error(
1204         to<std::string>("ZlibStreamCodec: inflateInit error: ", rc));
1205   }
1206 }
1207
1208 static int zlibTranslateFlush(StreamCodec::FlushOp flush) {
1209   switch (flush) {
1210     case StreamCodec::FlushOp::NONE:
1211       return Z_NO_FLUSH;
1212     case StreamCodec::FlushOp::FLUSH:
1213       return Z_SYNC_FLUSH;
1214     case StreamCodec::FlushOp::END:
1215       return Z_FINISH;
1216     default:
1217       throw std::invalid_argument("ZlibStreamCodec: Invalid flush");
1218   }
1219 }
1220
1221 static int zlibThrowOnError(int rc) {
1222   switch (rc) {
1223     case Z_OK:
1224     case Z_BUF_ERROR:
1225     case Z_STREAM_END:
1226       return rc;
1227     default:
1228       throw std::runtime_error(to<std::string>("ZlibStreamCodec: error: ", rc));
1229   }
1230 }
1231
1232 bool ZlibStreamCodec::doCompressStream(
1233     ByteRange& input,
1234     MutableByteRange& output,
1235     StreamCodec::FlushOp flush) {
1236   if (needReset_) {
1237     resetDeflateStream();
1238     needReset_ = false;
1239   }
1240   DCHECK(deflateStream_.hasValue());
1241   // zlib will return Z_STREAM_ERROR if output.data() is null.
1242   if (output.data() == nullptr) {
1243     return false;
1244   }
1245   deflateStream_->next_in = const_cast<uint8_t*>(input.data());
1246   deflateStream_->avail_in = input.size();
1247   deflateStream_->next_out = output.data();
1248   deflateStream_->avail_out = output.size();
1249   SCOPE_EXIT {
1250     input.uncheckedAdvance(input.size() - deflateStream_->avail_in);
1251     output.uncheckedAdvance(output.size() - deflateStream_->avail_out);
1252   };
1253   int const rc = zlibThrowOnError(
1254       deflate(deflateStream_.get_pointer(), zlibTranslateFlush(flush)));
1255   switch (flush) {
1256     case StreamCodec::FlushOp::NONE:
1257       return false;
1258     case StreamCodec::FlushOp::FLUSH:
1259       return deflateStream_->avail_in == 0 && deflateStream_->avail_out != 0;
1260     case StreamCodec::FlushOp::END:
1261       return rc == Z_STREAM_END;
1262     default:
1263       throw std::invalid_argument("ZlibStreamCodec: Invalid flush");
1264   }
1265 }
1266
1267 bool ZlibStreamCodec::doUncompressStream(
1268     ByteRange& input,
1269     MutableByteRange& output,
1270     StreamCodec::FlushOp flush) {
1271   if (needReset_) {
1272     resetInflateStream();
1273     needReset_ = false;
1274   }
1275   DCHECK(inflateStream_.hasValue());
1276   // zlib will return Z_STREAM_ERROR if output.data() is null.
1277   if (output.data() == nullptr) {
1278     return false;
1279   }
1280   inflateStream_->next_in = const_cast<uint8_t*>(input.data());
1281   inflateStream_->avail_in = input.size();
1282   inflateStream_->next_out = output.data();
1283   inflateStream_->avail_out = output.size();
1284   SCOPE_EXIT {
1285     input.advance(input.size() - inflateStream_->avail_in);
1286     output.advance(output.size() - inflateStream_->avail_out);
1287   };
1288   int const rc = zlibThrowOnError(
1289       inflate(inflateStream_.get_pointer(), zlibTranslateFlush(flush)));
1290   return rc == Z_STREAM_END;
1291 }
1292
1293 #endif // FOLLY_HAVE_LIBZ
1294
1295 #if FOLLY_HAVE_LIBLZMA
1296
1297 /**
1298  * LZMA2 compression
1299  */
1300 class LZMA2Codec final : public Codec {
1301  public:
1302   static std::unique_ptr<Codec> create(int level, CodecType type);
1303   explicit LZMA2Codec(int level, CodecType type);
1304
1305   std::vector<std::string> validPrefixes() const override;
1306   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1307       const override;
1308
1309  private:
1310   bool doNeedsUncompressedLength() const override;
1311   uint64_t doMaxUncompressedLength() const override;
1312   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1313
1314   bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
1315
1316   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1317   std::unique_ptr<IOBuf> doUncompress(
1318       const IOBuf* data,
1319       Optional<uint64_t> uncompressedLength) override;
1320
1321   std::unique_ptr<IOBuf> addOutputBuffer(lzma_stream* stream, size_t length);
1322   bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength);
1323
1324   int level_;
1325 };
1326
1327 static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD;
1328 static constexpr unsigned kLZMA2MagicBytes = 6;
1329
1330 std::vector<std::string> LZMA2Codec::validPrefixes() const {
1331   if (type() == CodecType::LZMA2_VARINT_SIZE) {
1332     return {};
1333   }
1334   return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)};
1335 }
1336
1337 bool LZMA2Codec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
1338   if (type() == CodecType::LZMA2_VARINT_SIZE) {
1339     return false;
1340   }
1341   // Returns false for all inputs less than 8 bytes.
1342   // This is okay, because no valid LZMA2 streams are less than 8 bytes.
1343   return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes);
1344 }
1345
1346 std::unique_ptr<Codec> LZMA2Codec::create(int level, CodecType type) {
1347   return std::make_unique<LZMA2Codec>(level, type);
1348 }
1349
1350 LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) {
1351   DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
1352   switch (level) {
1353   case COMPRESSION_LEVEL_FASTEST:
1354     level = 0;
1355     break;
1356   case COMPRESSION_LEVEL_DEFAULT:
1357     level = LZMA_PRESET_DEFAULT;
1358     break;
1359   case COMPRESSION_LEVEL_BEST:
1360     level = 9;
1361     break;
1362   }
1363   if (level < 0 || level > 9) {
1364     throw std::invalid_argument(to<std::string>(
1365         "LZMA2Codec: invalid level: ", level));
1366   }
1367   level_ = level;
1368 }
1369
1370 bool LZMA2Codec::doNeedsUncompressedLength() const {
1371   return false;
1372 }
1373
1374 uint64_t LZMA2Codec::doMaxUncompressedLength() const {
1375   // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
1376   return uint64_t(1) << 63;
1377 }
1378
1379 uint64_t LZMA2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1380   return lzma_stream_buffer_bound(uncompressedLength) +
1381       (encodeSize() ? kMaxVarintLength64 : 0);
1382 }
1383
1384 std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
1385     lzma_stream* stream,
1386     size_t length) {
1387
1388   CHECK_EQ(stream->avail_out, 0);
1389
1390   auto buf = IOBuf::create(length);
1391   buf->append(buf->capacity());
1392
1393   stream->next_out = buf->writableData();
1394   stream->avail_out = buf->length();
1395
1396   return buf;
1397 }
1398
1399 std::unique_ptr<IOBuf> LZMA2Codec::doCompress(const IOBuf* data) {
1400   lzma_ret rc;
1401   lzma_stream stream = LZMA_STREAM_INIT;
1402
1403   rc = lzma_easy_encoder(&stream, level_, LZMA_CHECK_NONE);
1404   if (rc != LZMA_OK) {
1405     throw std::runtime_error(folly::to<std::string>(
1406       "LZMA2Codec: lzma_easy_encoder error: ", rc));
1407   }
1408
1409   SCOPE_EXIT { lzma_end(&stream); };
1410
1411   uint64_t uncompressedLength = data->computeChainDataLength();
1412   uint64_t maxCompressedLength = lzma_stream_buffer_bound(uncompressedLength);
1413
1414   // Max 64MiB in one go
1415   constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
1416   constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
1417
1418   auto out = addOutputBuffer(
1419     &stream,
1420     (maxCompressedLength <= maxSingleStepLength ?
1421      maxCompressedLength :
1422      defaultBufferLength));
1423
1424   if (encodeSize()) {
1425     auto size = IOBuf::createCombined(kMaxVarintLength64);
1426     encodeVarintToIOBuf(uncompressedLength, size.get());
1427     size->appendChain(std::move(out));
1428     out = std::move(size);
1429   }
1430
1431   for (auto& range : *data) {
1432     if (range.empty()) {
1433       continue;
1434     }
1435
1436     stream.next_in = const_cast<uint8_t*>(range.data());
1437     stream.avail_in = range.size();
1438
1439     while (stream.avail_in != 0) {
1440       if (stream.avail_out == 0) {
1441         out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1442       }
1443
1444       rc = lzma_code(&stream, LZMA_RUN);
1445
1446       if (rc != LZMA_OK) {
1447         throw std::runtime_error(folly::to<std::string>(
1448           "LZMA2Codec: lzma_code error: ", rc));
1449       }
1450     }
1451   }
1452
1453   do {
1454     if (stream.avail_out == 0) {
1455       out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1456     }
1457
1458     rc = lzma_code(&stream, LZMA_FINISH);
1459   } while (rc == LZMA_OK);
1460
1461   if (rc != LZMA_STREAM_END) {
1462     throw std::runtime_error(folly::to<std::string>(
1463       "LZMA2Codec: lzma_code ended with error: ", rc));
1464   }
1465
1466   out->prev()->trimEnd(stream.avail_out);
1467
1468   return out;
1469 }
1470
1471 bool LZMA2Codec::doInflate(lzma_stream* stream,
1472                           IOBuf* head,
1473                           size_t bufferLength) {
1474   if (stream->avail_out == 0) {
1475     head->prependChain(addOutputBuffer(stream, bufferLength));
1476   }
1477
1478   lzma_ret rc = lzma_code(stream, LZMA_RUN);
1479
1480   switch (rc) {
1481   case LZMA_OK:
1482     break;
1483   case LZMA_STREAM_END:
1484     return true;
1485   default:
1486     throw std::runtime_error(to<std::string>(
1487         "LZMA2Codec: lzma_code error: ", rc));
1488   }
1489
1490   return false;
1491 }
1492
1493 std::unique_ptr<IOBuf> LZMA2Codec::doUncompress(
1494     const IOBuf* data,
1495     Optional<uint64_t> uncompressedLength) {
1496   lzma_ret rc;
1497   lzma_stream stream = LZMA_STREAM_INIT;
1498
1499   rc = lzma_auto_decoder(&stream, std::numeric_limits<uint64_t>::max(), 0);
1500   if (rc != LZMA_OK) {
1501     throw std::runtime_error(folly::to<std::string>(
1502       "LZMA2Codec: lzma_auto_decoder error: ", rc));
1503   }
1504
1505   SCOPE_EXIT { lzma_end(&stream); };
1506
1507   // Max 64MiB in one go
1508   constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
1509   constexpr uint32_t defaultBufferLength = uint32_t(256) << 10; // 256 KiB
1510
1511   folly::io::Cursor cursor(data);
1512   if (encodeSize()) {
1513     const uint64_t actualUncompressedLength = decodeVarintFromCursor(cursor);
1514     if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
1515       throw std::runtime_error("LZMA2Codec: invalid uncompressed length");
1516     }
1517     uncompressedLength = actualUncompressedLength;
1518   }
1519
1520   auto out = addOutputBuffer(
1521       &stream,
1522       ((uncompressedLength && *uncompressedLength <= maxSingleStepLength)
1523            ? *uncompressedLength
1524            : defaultBufferLength));
1525
1526   bool streamEnd = false;
1527   auto buf = cursor.peekBytes();
1528   while (!buf.empty()) {
1529     stream.next_in = const_cast<uint8_t*>(buf.data());
1530     stream.avail_in = buf.size();
1531
1532     while (stream.avail_in != 0) {
1533       if (streamEnd) {
1534         throw std::runtime_error(to<std::string>(
1535             "LZMA2Codec: junk after end of data"));
1536       }
1537
1538       streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1539     }
1540
1541     cursor.skip(buf.size());
1542     buf = cursor.peekBytes();
1543   }
1544
1545   while (!streamEnd) {
1546     streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1547   }
1548
1549   out->prev()->trimEnd(stream.avail_out);
1550
1551   if (uncompressedLength && *uncompressedLength != stream.total_out) {
1552     throw std::runtime_error(
1553         to<std::string>("LZMA2Codec: invalid uncompressed length"));
1554   }
1555
1556   return out;
1557 }
1558
1559 #endif  // FOLLY_HAVE_LIBLZMA
1560
1561 #ifdef FOLLY_HAVE_LIBZSTD
1562
1563 namespace {
1564 void zstdFreeCStream(ZSTD_CStream* zcs) {
1565   ZSTD_freeCStream(zcs);
1566 }
1567
1568 void zstdFreeDStream(ZSTD_DStream* zds) {
1569   ZSTD_freeDStream(zds);
1570 }
1571 }
1572
1573 /**
1574  * ZSTD compression
1575  */
1576 class ZSTDStreamCodec final : public StreamCodec {
1577  public:
1578   static std::unique_ptr<Codec> createCodec(int level, CodecType);
1579   static std::unique_ptr<StreamCodec> createStream(int level, CodecType);
1580   explicit ZSTDStreamCodec(int level, CodecType type);
1581
1582   std::vector<std::string> validPrefixes() const override;
1583   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1584       const override;
1585
1586  private:
1587   bool doNeedsUncompressedLength() const override;
1588   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1589   Optional<uint64_t> doGetUncompressedLength(
1590       IOBuf const* data,
1591       Optional<uint64_t> uncompressedLength) const override;
1592
1593   void doResetStream() override;
1594   bool doCompressStream(
1595       ByteRange& input,
1596       MutableByteRange& output,
1597       StreamCodec::FlushOp flushOp) override;
1598   bool doUncompressStream(
1599       ByteRange& input,
1600       MutableByteRange& output,
1601       StreamCodec::FlushOp flushOp) override;
1602
1603   void resetCStream();
1604   void resetDStream();
1605
1606   bool tryBlockCompress(ByteRange& input, MutableByteRange& output) const;
1607   bool tryBlockUncompress(ByteRange& input, MutableByteRange& output) const;
1608
1609   int level_;
1610   bool needReset_{true};
1611   std::unique_ptr<
1612       ZSTD_CStream,
1613       folly::static_function_deleter<ZSTD_CStream, &zstdFreeCStream>>
1614       cstream_{nullptr};
1615   std::unique_ptr<
1616       ZSTD_DStream,
1617       folly::static_function_deleter<ZSTD_DStream, &zstdFreeDStream>>
1618       dstream_{nullptr};
1619 };
1620
1621 static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528;
1622
1623 std::vector<std::string> ZSTDStreamCodec::validPrefixes() const {
1624   return {prefixToStringLE(kZSTDMagicLE)};
1625 }
1626
1627 bool ZSTDStreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1628     const {
1629   return dataStartsWithLE(data, kZSTDMagicLE);
1630 }
1631
1632 std::unique_ptr<Codec> ZSTDStreamCodec::createCodec(int level, CodecType type) {
1633   return make_unique<ZSTDStreamCodec>(level, type);
1634 }
1635
1636 std::unique_ptr<StreamCodec> ZSTDStreamCodec::createStream(
1637     int level,
1638     CodecType type) {
1639   return make_unique<ZSTDStreamCodec>(level, type);
1640 }
1641
1642 ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
1643     : StreamCodec(type) {
1644   DCHECK(type == CodecType::ZSTD);
1645   switch (level) {
1646     case COMPRESSION_LEVEL_FASTEST:
1647       level = 1;
1648       break;
1649     case COMPRESSION_LEVEL_DEFAULT:
1650       level = 1;
1651       break;
1652     case COMPRESSION_LEVEL_BEST:
1653       level = 19;
1654       break;
1655   }
1656   if (level < 1 || level > ZSTD_maxCLevel()) {
1657     throw std::invalid_argument(
1658         to<std::string>("ZSTD: invalid level: ", level));
1659   }
1660   level_ = level;
1661 }
1662
1663 bool ZSTDStreamCodec::doNeedsUncompressedLength() const {
1664   return false;
1665 }
1666
1667 uint64_t ZSTDStreamCodec::doMaxCompressedLength(
1668     uint64_t uncompressedLength) const {
1669   return ZSTD_compressBound(uncompressedLength);
1670 }
1671
1672 void zstdThrowIfError(size_t rc) {
1673   if (!ZSTD_isError(rc)) {
1674     return;
1675   }
1676   throw std::runtime_error(
1677       to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1678 }
1679
1680 Optional<uint64_t> ZSTDStreamCodec::doGetUncompressedLength(
1681     IOBuf const* data,
1682     Optional<uint64_t> uncompressedLength) const {
1683   // Read decompressed size from frame if available in first IOBuf.
1684   auto const decompressedSize =
1685       ZSTD_getDecompressedSize(data->data(), data->length());
1686   if (decompressedSize != 0) {
1687     if (uncompressedLength && *uncompressedLength != decompressedSize) {
1688       throw std::runtime_error("ZSTD: invalid uncompressed length");
1689     }
1690     uncompressedLength = decompressedSize;
1691   }
1692   return uncompressedLength;
1693 }
1694
1695 void ZSTDStreamCodec::doResetStream() {
1696   needReset_ = true;
1697 }
1698
1699 bool ZSTDStreamCodec::tryBlockCompress(
1700     ByteRange& input,
1701     MutableByteRange& output) const {
1702   DCHECK(needReset_);
1703   // We need to know that we have enough output space to use block compression
1704   if (output.size() < ZSTD_compressBound(input.size())) {
1705     return false;
1706   }
1707   size_t const length = ZSTD_compress(
1708       output.data(), output.size(), input.data(), input.size(), level_);
1709   zstdThrowIfError(length);
1710   input.uncheckedAdvance(input.size());
1711   output.uncheckedAdvance(length);
1712   return true;
1713 }
1714
1715 void ZSTDStreamCodec::resetCStream() {
1716   if (!cstream_) {
1717     cstream_.reset(ZSTD_createCStream());
1718     if (!cstream_) {
1719       throw std::bad_alloc{};
1720     }
1721   }
1722   // Advanced API usage works for all supported versions of zstd.
1723   // Required to set contentSizeFlag.
1724   auto params = ZSTD_getParams(level_, uncompressedLength().value_or(0), 0);
1725   params.fParams.contentSizeFlag = uncompressedLength().hasValue();
1726   zstdThrowIfError(ZSTD_initCStream_advanced(
1727       cstream_.get(), nullptr, 0, params, uncompressedLength().value_or(0)));
1728 }
1729
1730 bool ZSTDStreamCodec::doCompressStream(
1731     ByteRange& input,
1732     MutableByteRange& output,
1733     StreamCodec::FlushOp flushOp) {
1734   if (needReset_) {
1735     // If we are given all the input in one chunk try to use block compression
1736     if (flushOp == StreamCodec::FlushOp::END &&
1737         tryBlockCompress(input, output)) {
1738       return true;
1739     }
1740     resetCStream();
1741     needReset_ = false;
1742   }
1743   ZSTD_inBuffer in = {input.data(), input.size(), 0};
1744   ZSTD_outBuffer out = {output.data(), output.size(), 0};
1745   SCOPE_EXIT {
1746     input.uncheckedAdvance(in.pos);
1747     output.uncheckedAdvance(out.pos);
1748   };
1749   if (flushOp == StreamCodec::FlushOp::NONE || !input.empty()) {
1750     zstdThrowIfError(ZSTD_compressStream(cstream_.get(), &out, &in));
1751   }
1752   if (in.pos == in.size && flushOp != StreamCodec::FlushOp::NONE) {
1753     size_t rc;
1754     switch (flushOp) {
1755       case StreamCodec::FlushOp::FLUSH:
1756         rc = ZSTD_flushStream(cstream_.get(), &out);
1757         break;
1758       case StreamCodec::FlushOp::END:
1759         rc = ZSTD_endStream(cstream_.get(), &out);
1760         break;
1761       default:
1762         throw std::invalid_argument("ZSTD: invalid FlushOp");
1763     }
1764     zstdThrowIfError(rc);
1765     if (rc == 0) {
1766       return true;
1767     }
1768   }
1769   return false;
1770 }
1771
1772 bool ZSTDStreamCodec::tryBlockUncompress(
1773     ByteRange& input,
1774     MutableByteRange& output) const {
1775   DCHECK(needReset_);
1776 #if ZSTD_VERSION_NUMBER < 10104
1777   // We require ZSTD_findFrameCompressedSize() to perform this optimization.
1778   return false;
1779 #else
1780   // We need to know the uncompressed length and have enough output space.
1781   if (!uncompressedLength() || output.size() < *uncompressedLength()) {
1782     return false;
1783   }
1784   size_t const compressedLength =
1785       ZSTD_findFrameCompressedSize(input.data(), input.size());
1786   zstdThrowIfError(compressedLength);
1787   size_t const length = ZSTD_decompress(
1788       output.data(), *uncompressedLength(), input.data(), compressedLength);
1789   zstdThrowIfError(length);
1790   DCHECK_EQ(length, *uncompressedLength());
1791   input.uncheckedAdvance(compressedLength);
1792   output.uncheckedAdvance(length);
1793   return true;
1794 #endif
1795 }
1796
1797 void ZSTDStreamCodec::resetDStream() {
1798   if (!dstream_) {
1799     dstream_.reset(ZSTD_createDStream());
1800     if (!dstream_) {
1801       throw std::bad_alloc{};
1802     }
1803   }
1804   zstdThrowIfError(ZSTD_initDStream(dstream_.get()));
1805 }
1806
1807 bool ZSTDStreamCodec::doUncompressStream(
1808     ByteRange& input,
1809     MutableByteRange& output,
1810     StreamCodec::FlushOp flushOp) {
1811   if (needReset_) {
1812     // If we are given all the input in one chunk try to use block uncompression
1813     if (flushOp == StreamCodec::FlushOp::END &&
1814         tryBlockUncompress(input, output)) {
1815       return true;
1816     }
1817     resetDStream();
1818     needReset_ = false;
1819   }
1820   ZSTD_inBuffer in = {input.data(), input.size(), 0};
1821   ZSTD_outBuffer out = {output.data(), output.size(), 0};
1822   SCOPE_EXIT {
1823     input.uncheckedAdvance(in.pos);
1824     output.uncheckedAdvance(out.pos);
1825   };
1826   size_t const rc = ZSTD_decompressStream(dstream_.get(), &out, &in);
1827   zstdThrowIfError(rc);
1828   return rc == 0;
1829 }
1830
1831 #endif // FOLLY_HAVE_LIBZSTD
1832
1833 #if FOLLY_HAVE_LIBBZ2
1834
1835 class Bzip2Codec final : public Codec {
1836  public:
1837   static std::unique_ptr<Codec> create(int level, CodecType type);
1838   explicit Bzip2Codec(int level, CodecType type);
1839
1840   std::vector<std::string> validPrefixes() const override;
1841   bool canUncompress(IOBuf const* data, Optional<uint64_t> uncompressedLength)
1842       const override;
1843
1844  private:
1845   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1846   std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
1847   std::unique_ptr<IOBuf> doUncompress(
1848       IOBuf const* data,
1849       Optional<uint64_t> uncompressedLength) override;
1850
1851   int level_;
1852 };
1853
1854 /* static */ std::unique_ptr<Codec> Bzip2Codec::create(
1855     int level,
1856     CodecType type) {
1857   return std::make_unique<Bzip2Codec>(level, type);
1858 }
1859
1860 Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
1861   DCHECK(type == CodecType::BZIP2);
1862   switch (level) {
1863     case COMPRESSION_LEVEL_FASTEST:
1864       level = 1;
1865       break;
1866     case COMPRESSION_LEVEL_DEFAULT:
1867       level = 9;
1868       break;
1869     case COMPRESSION_LEVEL_BEST:
1870       level = 9;
1871       break;
1872   }
1873   if (level < 1 || level > 9) {
1874     throw std::invalid_argument(
1875         to<std::string>("Bzip2: invalid level: ", level));
1876   }
1877   level_ = level;
1878 }
1879
1880 static uint32_t constexpr kBzip2MagicLE = 0x685a42;
1881 static uint64_t constexpr kBzip2MagicBytes = 3;
1882
1883 std::vector<std::string> Bzip2Codec::validPrefixes() const {
1884   return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
1885 }
1886
1887 bool Bzip2Codec::canUncompress(IOBuf const* data, Optional<uint64_t>) const {
1888   return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
1889 }
1890
1891 uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1892   // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
1893   //   To guarantee that the compressed data will fit in its buffer, allocate an
1894   //   output buffer of size 1% larger than the uncompressed data, plus six
1895   //   hundred extra bytes.
1896   return uncompressedLength + uncompressedLength / 100 + 600;
1897 }
1898
1899 static bz_stream createBzStream() {
1900   bz_stream stream;
1901   stream.bzalloc = nullptr;
1902   stream.bzfree = nullptr;
1903   stream.opaque = nullptr;
1904   stream.next_in = stream.next_out = nullptr;
1905   stream.avail_in = stream.avail_out = 0;
1906   return stream;
1907 }
1908
1909 // Throws on error condition, otherwise returns the code.
1910 static int bzCheck(int const rc) {
1911   switch (rc) {
1912     case BZ_OK:
1913     case BZ_RUN_OK:
1914     case BZ_FLUSH_OK:
1915     case BZ_FINISH_OK:
1916     case BZ_STREAM_END:
1917       return rc;
1918     default:
1919       throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
1920   }
1921 }
1922
1923 static std::unique_ptr<IOBuf> addOutputBuffer(
1924     bz_stream* stream,
1925     uint64_t const bufferLength) {
1926   DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
1927   DCHECK_EQ(stream->avail_out, 0);
1928
1929   auto buf = IOBuf::create(bufferLength);
1930   buf->append(buf->capacity());
1931
1932   stream->next_out = reinterpret_cast<char*>(buf->writableData());
1933   stream->avail_out = buf->length();
1934
1935   return buf;
1936 }
1937
1938 std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
1939   bz_stream stream = createBzStream();
1940   bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
1941   SCOPE_EXIT {
1942     bzCheck(BZ2_bzCompressEnd(&stream));
1943   };
1944
1945   uint64_t const uncompressedLength = data->computeChainDataLength();
1946   uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
1947   uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1948   uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
1949
1950   auto out = addOutputBuffer(
1951       &stream,
1952       maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
1953                                                : kDefaultBufferLength);
1954
1955   for (auto range : *data) {
1956     while (!range.empty()) {
1957       auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1958       stream.next_in =
1959           const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1960       stream.avail_in = inSize;
1961
1962       if (stream.avail_out == 0) {
1963         out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1964       }
1965
1966       bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
1967       range.uncheckedAdvance(inSize - stream.avail_in);
1968     }
1969   }
1970   do {
1971     if (stream.avail_out == 0) {
1972       out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1973     }
1974   } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
1975
1976   out->prev()->trimEnd(stream.avail_out);
1977
1978   return out;
1979 }
1980
1981 std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
1982     const IOBuf* data,
1983     Optional<uint64_t> uncompressedLength) {
1984   bz_stream stream = createBzStream();
1985   bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
1986   SCOPE_EXIT {
1987     bzCheck(BZ2_bzDecompressEnd(&stream));
1988   };
1989
1990   uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1991   uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
1992   uint64_t const kDefaultBufferLength =
1993       computeBufferLength(data->computeChainDataLength(), kBlockSize);
1994
1995   auto out = addOutputBuffer(
1996       &stream,
1997       ((uncompressedLength && *uncompressedLength <= kMaxSingleStepLength)
1998            ? *uncompressedLength
1999            : kDefaultBufferLength));
2000
2001   int rc = BZ_OK;
2002   for (auto range : *data) {
2003     while (!range.empty()) {
2004       auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
2005       stream.next_in =
2006           const_cast<char*>(reinterpret_cast<char const*>(range.data()));
2007       stream.avail_in = inSize;
2008
2009       if (stream.avail_out == 0) {
2010         out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
2011       }
2012
2013       rc = bzCheck(BZ2_bzDecompress(&stream));
2014       range.uncheckedAdvance(inSize - stream.avail_in);
2015     }
2016   }
2017   while (rc != BZ_STREAM_END) {
2018     if (stream.avail_out == 0) {
2019       out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
2020     }
2021     size_t const outputSize = stream.avail_out;
2022     rc = bzCheck(BZ2_bzDecompress(&stream));
2023     if (outputSize == stream.avail_out) {
2024       throw std::runtime_error("Bzip2Codec: Truncated input");
2025     }
2026   }
2027
2028   out->prev()->trimEnd(stream.avail_out);
2029
2030   uint64_t const totalOut =
2031       (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
2032   if (uncompressedLength && uncompressedLength != totalOut) {
2033     throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
2034   }
2035
2036   return out;
2037 }
2038
2039 #endif // FOLLY_HAVE_LIBBZ2
2040
2041 /**
2042  * Automatic decompression
2043  */
2044 class AutomaticCodec final : public Codec {
2045  public:
2046   static std::unique_ptr<Codec> create(
2047       std::vector<std::unique_ptr<Codec>> customCodecs);
2048   explicit AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs);
2049
2050   std::vector<std::string> validPrefixes() const override;
2051   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
2052       const override;
2053
2054  private:
2055   bool doNeedsUncompressedLength() const override;
2056   uint64_t doMaxUncompressedLength() const override;
2057
2058   uint64_t doMaxCompressedLength(uint64_t) const override {
2059     throw std::runtime_error(
2060         "AutomaticCodec error: maxCompressedLength() not supported.");
2061   }
2062   std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
2063     throw std::runtime_error("AutomaticCodec error: compress() not supported.");
2064   }
2065   std::unique_ptr<IOBuf> doUncompress(
2066       const IOBuf* data,
2067       Optional<uint64_t> uncompressedLength) override;
2068
2069   void addCodecIfSupported(CodecType type);
2070
2071   // Throws iff the codecs aren't compatible (very slow)
2072   void checkCompatibleCodecs() const;
2073
2074   std::vector<std::unique_ptr<Codec>> codecs_;
2075   bool needsUncompressedLength_;
2076   uint64_t maxUncompressedLength_;
2077 };
2078
2079 std::vector<std::string> AutomaticCodec::validPrefixes() const {
2080   std::unordered_set<std::string> prefixes;
2081   for (const auto& codec : codecs_) {
2082     const auto codecPrefixes = codec->validPrefixes();
2083     prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
2084   }
2085   return std::vector<std::string>{prefixes.begin(), prefixes.end()};
2086 }
2087
2088 bool AutomaticCodec::canUncompress(
2089     const IOBuf* data,
2090     Optional<uint64_t> uncompressedLength) const {
2091   return std::any_of(
2092       codecs_.begin(),
2093       codecs_.end(),
2094       [data, uncompressedLength](std::unique_ptr<Codec> const& codec) {
2095         return codec->canUncompress(data, uncompressedLength);
2096       });
2097 }
2098
2099 void AutomaticCodec::addCodecIfSupported(CodecType type) {
2100   const bool present = std::any_of(
2101       codecs_.begin(),
2102       codecs_.end(),
2103       [&type](std::unique_ptr<Codec> const& codec) {
2104         return codec->type() == type;
2105       });
2106   if (hasCodec(type) && !present) {
2107     codecs_.push_back(getCodec(type));
2108   }
2109 }
2110
2111 /* static */ std::unique_ptr<Codec> AutomaticCodec::create(
2112     std::vector<std::unique_ptr<Codec>> customCodecs) {
2113   return std::make_unique<AutomaticCodec>(std::move(customCodecs));
2114 }
2115
2116 AutomaticCodec::AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs)
2117     : Codec(CodecType::USER_DEFINED), codecs_(std::move(customCodecs)) {
2118   // Fastest -> slowest
2119   addCodecIfSupported(CodecType::LZ4_FRAME);
2120   addCodecIfSupported(CodecType::ZSTD);
2121   addCodecIfSupported(CodecType::ZLIB);
2122   addCodecIfSupported(CodecType::GZIP);
2123   addCodecIfSupported(CodecType::LZMA2);
2124   addCodecIfSupported(CodecType::BZIP2);
2125   if (kIsDebug) {
2126     checkCompatibleCodecs();
2127   }
2128   // Check that none of the codes are are null
2129   DCHECK(std::none_of(
2130       codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
2131         return codec == nullptr;
2132       }));
2133
2134   needsUncompressedLength_ = std::any_of(
2135       codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
2136         return codec->needsUncompressedLength();
2137       });
2138
2139   const auto it = std::max_element(
2140       codecs_.begin(),
2141       codecs_.end(),
2142       [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) {
2143         return lhs->maxUncompressedLength() < rhs->maxUncompressedLength();
2144       });
2145   DCHECK(it != codecs_.end());
2146   maxUncompressedLength_ = (*it)->maxUncompressedLength();
2147 }
2148
2149 void AutomaticCodec::checkCompatibleCodecs() const {
2150   // Keep track of all the possible headers.
2151   std::unordered_set<std::string> headers;
2152   // The empty header is not allowed.
2153   headers.insert("");
2154   // Step 1:
2155   // Construct a set of headers and check that none of the headers occur twice.
2156   // Eliminate edge cases.
2157   for (auto&& codec : codecs_) {
2158     const auto codecHeaders = codec->validPrefixes();
2159     // Codecs without any valid headers are not allowed.
2160     if (codecHeaders.empty()) {
2161       throw std::invalid_argument{
2162           "AutomaticCodec: validPrefixes() must not be empty."};
2163     }
2164     // Insert all the headers for the current codec.
2165     const size_t beforeSize = headers.size();
2166     headers.insert(codecHeaders.begin(), codecHeaders.end());
2167     // Codecs are not compatible if any header occurred twice.
2168     if (beforeSize + codecHeaders.size() != headers.size()) {
2169       throw std::invalid_argument{
2170           "AutomaticCodec: Two valid prefixes collide."};
2171     }
2172   }
2173   // Step 2:
2174   // Check if any strict non-empty prefix of any header is a header.
2175   for (const auto& header : headers) {
2176     for (size_t i = 1; i < header.size(); ++i) {
2177       if (headers.count(header.substr(0, i))) {
2178         throw std::invalid_argument{
2179             "AutomaticCodec: One valid prefix is a prefix of another valid "
2180             "prefix."};
2181       }
2182     }
2183   }
2184 }
2185
2186 bool AutomaticCodec::doNeedsUncompressedLength() const {
2187   return needsUncompressedLength_;
2188 }
2189
2190 uint64_t AutomaticCodec::doMaxUncompressedLength() const {
2191   return maxUncompressedLength_;
2192 }
2193
2194 std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
2195     const IOBuf* data,
2196     Optional<uint64_t> uncompressedLength) {
2197   for (auto&& codec : codecs_) {
2198     if (codec->canUncompress(data, uncompressedLength)) {
2199       return codec->uncompress(data, uncompressedLength);
2200     }
2201   }
2202   throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
2203 }
2204
2205 using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
2206 using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
2207 struct Factory {
2208   CodecFactory codec;
2209   StreamCodecFactory stream;
2210 };
2211
2212 constexpr Factory
2213     codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
2214         {}, // USER_DEFINED
2215         {NoCompressionCodec::create, nullptr},
2216
2217 #if FOLLY_HAVE_LIBLZ4
2218         {LZ4Codec::create, nullptr},
2219 #else
2220         {},
2221 #endif
2222
2223 #if FOLLY_HAVE_LIBSNAPPY
2224         {SnappyCodec::create, nullptr},
2225 #else
2226         {},
2227 #endif
2228
2229 #if FOLLY_HAVE_LIBZ
2230         {ZlibStreamCodec::createCodec, ZlibStreamCodec::createStream},
2231 #else
2232         {},
2233 #endif
2234
2235 #if FOLLY_HAVE_LIBLZ4
2236         {LZ4Codec::create, nullptr},
2237 #else
2238         {},
2239 #endif
2240
2241 #if FOLLY_HAVE_LIBLZMA
2242         {LZMA2Codec::create, nullptr},
2243         {LZMA2Codec::create, nullptr},
2244 #else
2245         {},
2246         {},
2247 #endif
2248
2249 #if FOLLY_HAVE_LIBZSTD
2250         {ZSTDStreamCodec::createCodec, ZSTDStreamCodec::createStream},
2251 #else
2252         {},
2253 #endif
2254
2255 #if FOLLY_HAVE_LIBZ
2256         {ZlibStreamCodec::createCodec, ZlibStreamCodec::createStream},
2257 #else
2258         {},
2259 #endif
2260
2261 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
2262         {LZ4FrameCodec::create, nullptr},
2263 #else
2264         {},
2265 #endif
2266
2267 #if FOLLY_HAVE_LIBBZ2
2268         {Bzip2Codec::create, nullptr},
2269 #else
2270         {},
2271 #endif
2272 };
2273
2274 Factory const& getFactory(CodecType type) {
2275   size_t const idx = static_cast<size_t>(type);
2276   if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
2277     throw std::invalid_argument(
2278         to<std::string>("Compression type ", idx, " invalid"));
2279   }
2280   return codecFactories[idx];
2281 }
2282 } // namespace
2283
2284 bool hasCodec(CodecType type) {
2285   return getFactory(type).codec != nullptr;
2286 }
2287
2288 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
2289   auto const factory = getFactory(type).codec;
2290   if (!factory) {
2291     throw std::invalid_argument(
2292         to<std::string>("Compression type ", type, " not supported"));
2293   }
2294   auto codec = (*factory)(level, type);
2295   DCHECK(codec->type() == type);
2296   return codec;
2297 }
2298
2299 bool hasStreamCodec(CodecType type) {
2300   return getFactory(type).stream != nullptr;
2301 }
2302
2303 std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
2304   auto const factory = getFactory(type).stream;
2305   if (!factory) {
2306     throw std::invalid_argument(
2307         to<std::string>("Compression type ", type, " not supported"));
2308   }
2309   auto codec = (*factory)(level, type);
2310   DCHECK(codec->type() == type);
2311   return codec;
2312 }
2313
2314 std::unique_ptr<Codec> getAutoUncompressionCodec(
2315     std::vector<std::unique_ptr<Codec>> customCodecs) {
2316   return AutomaticCodec::create(std::move(customCodecs));
2317 }
2318 }}  // namespaces