Move folly/Bits.h to folly/lang/
[folly.git] / folly / compression / Compression.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/compression/Compression.h>
18
19 #if FOLLY_HAVE_LIBLZ4
20 #include <lz4.h>
21 #include <lz4hc.h>
22 #if LZ4_VERSION_NUMBER >= 10301
23 #include <lz4frame.h>
24 #endif
25 #endif
26
27 #include <glog/logging.h>
28
29 #if FOLLY_HAVE_LIBSNAPPY
30 #include <snappy-sinksource.h>
31 #include <snappy.h>
32 #endif
33
34 #if FOLLY_HAVE_LIBZ
35 #include <folly/compression/Zlib.h>
36 #endif
37
38 #if FOLLY_HAVE_LIBLZMA
39 #include <lzma.h>
40 #endif
41
42 #if FOLLY_HAVE_LIBZSTD
43 #define ZSTD_STATIC_LINKING_ONLY
44 #include <zstd.h>
45 #endif
46
47 #if FOLLY_HAVE_LIBBZ2
48 #include <bzlib.h>
49 #endif
50
51 #include <folly/Conv.h>
52 #include <folly/Memory.h>
53 #include <folly/Portability.h>
54 #include <folly/ScopeGuard.h>
55 #include <folly/Varint.h>
56 #include <folly/compression/Utils.h>
57 #include <folly/io/Cursor.h>
58 #include <folly/lang/Bits.h>
59 #include <algorithm>
60 #include <unordered_set>
61
62 using folly::io::compression::detail::dataStartsWithLE;
63 using folly::io::compression::detail::prefixToStringLE;
64
65 namespace folly {
66 namespace io {
67
68 Codec::Codec(CodecType type) : type_(type) { }
69
70 // Ensure consistent behavior in the nullptr case
71 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
72   if (data == nullptr) {
73     throw std::invalid_argument("Codec: data must not be nullptr");
74   }
75   uint64_t len = data->computeChainDataLength();
76   if (len > maxUncompressedLength()) {
77     throw std::runtime_error("Codec: uncompressed length too large");
78   }
79
80   return doCompress(data);
81 }
82
83 std::string Codec::compress(const StringPiece data) {
84   const uint64_t len = data.size();
85   if (len > maxUncompressedLength()) {
86     throw std::runtime_error("Codec: uncompressed length too large");
87   }
88
89   return doCompressString(data);
90 }
91
92 std::unique_ptr<IOBuf> Codec::uncompress(
93     const IOBuf* data,
94     Optional<uint64_t> uncompressedLength) {
95   if (data == nullptr) {
96     throw std::invalid_argument("Codec: data must not be nullptr");
97   }
98   if (!uncompressedLength) {
99     if (needsUncompressedLength()) {
100       throw std::invalid_argument("Codec: uncompressed length required");
101     }
102   } else if (*uncompressedLength > maxUncompressedLength()) {
103     throw std::runtime_error("Codec: uncompressed length too large");
104   }
105
106   if (data->empty()) {
107     if (uncompressedLength.value_or(0) != 0) {
108       throw std::runtime_error("Codec: invalid uncompressed length");
109     }
110     return IOBuf::create(0);
111   }
112
113   return doUncompress(data, uncompressedLength);
114 }
115
116 std::string Codec::uncompress(
117     const StringPiece data,
118     Optional<uint64_t> uncompressedLength) {
119   if (!uncompressedLength) {
120     if (needsUncompressedLength()) {
121       throw std::invalid_argument("Codec: uncompressed length required");
122     }
123   } else if (*uncompressedLength > maxUncompressedLength()) {
124     throw std::runtime_error("Codec: uncompressed length too large");
125   }
126
127   if (data.empty()) {
128     if (uncompressedLength.value_or(0) != 0) {
129       throw std::runtime_error("Codec: invalid uncompressed length");
130     }
131     return "";
132   }
133
134   return doUncompressString(data, uncompressedLength);
135 }
136
137 bool Codec::needsUncompressedLength() const {
138   return doNeedsUncompressedLength();
139 }
140
141 uint64_t Codec::maxUncompressedLength() const {
142   return doMaxUncompressedLength();
143 }
144
145 bool Codec::doNeedsUncompressedLength() const {
146   return false;
147 }
148
149 uint64_t Codec::doMaxUncompressedLength() const {
150   return UNLIMITED_UNCOMPRESSED_LENGTH;
151 }
152
153 std::vector<std::string> Codec::validPrefixes() const {
154   return {};
155 }
156
157 bool Codec::canUncompress(const IOBuf*, Optional<uint64_t>) const {
158   return false;
159 }
160
161 std::string Codec::doCompressString(const StringPiece data) {
162   const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
163   auto outputBuffer = doCompress(&inputBuffer);
164   std::string output;
165   output.reserve(outputBuffer->computeChainDataLength());
166   for (auto range : *outputBuffer) {
167     output.append(reinterpret_cast<const char*>(range.data()), range.size());
168   }
169   return output;
170 }
171
172 std::string Codec::doUncompressString(
173     const StringPiece data,
174     Optional<uint64_t> uncompressedLength) {
175   const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
176   auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
177   std::string output;
178   output.reserve(outputBuffer->computeChainDataLength());
179   for (auto range : *outputBuffer) {
180     output.append(reinterpret_cast<const char*>(range.data()), range.size());
181   }
182   return output;
183 }
184
185 uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
186   return doMaxCompressedLength(uncompressedLength);
187 }
188
189 Optional<uint64_t> Codec::getUncompressedLength(
190     const folly::IOBuf* data,
191     Optional<uint64_t> uncompressedLength) const {
192   auto const compressedLength = data->computeChainDataLength();
193   if (compressedLength == 0) {
194     if (uncompressedLength.value_or(0) != 0) {
195       throw std::runtime_error("Invalid uncompressed length");
196     }
197     return 0;
198   }
199   return doGetUncompressedLength(data, uncompressedLength);
200 }
201
202 Optional<uint64_t> Codec::doGetUncompressedLength(
203     const folly::IOBuf*,
204     Optional<uint64_t> uncompressedLength) const {
205   return uncompressedLength;
206 }
207
208 bool StreamCodec::needsDataLength() const {
209   return doNeedsDataLength();
210 }
211
212 bool StreamCodec::doNeedsDataLength() const {
213   return false;
214 }
215
216 void StreamCodec::assertStateIs(State expected) const {
217   if (state_ != expected) {
218     throw std::logic_error(folly::to<std::string>(
219         "Codec: state is ", state_, "; expected state ", expected));
220   }
221 }
222
223 void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
224   state_ = State::RESET;
225   uncompressedLength_ = uncompressedLength;
226   progressMade_ = true;
227   doResetStream();
228 }
229
230 bool StreamCodec::compressStream(
231     ByteRange& input,
232     MutableByteRange& output,
233     StreamCodec::FlushOp flushOp) {
234   if (state_ == State::RESET && input.empty() &&
235       flushOp == StreamCodec::FlushOp::END &&
236       uncompressedLength().value_or(0) != 0) {
237     throw std::runtime_error("Codec: invalid uncompressed length");
238   }
239
240   if (!uncompressedLength() && needsDataLength()) {
241     throw std::runtime_error("Codec: uncompressed length required");
242   }
243   if (state_ == State::RESET && !input.empty() &&
244       uncompressedLength() == uint64_t(0)) {
245     throw std::runtime_error("Codec: invalid uncompressed length");
246   }
247   // Handle input state transitions
248   switch (flushOp) {
249     case StreamCodec::FlushOp::NONE:
250       if (state_ == State::RESET) {
251         state_ = State::COMPRESS;
252       }
253       assertStateIs(State::COMPRESS);
254       break;
255     case StreamCodec::FlushOp::FLUSH:
256       if (state_ == State::RESET || state_ == State::COMPRESS) {
257         state_ = State::COMPRESS_FLUSH;
258       }
259       assertStateIs(State::COMPRESS_FLUSH);
260       break;
261     case StreamCodec::FlushOp::END:
262       if (state_ == State::RESET || state_ == State::COMPRESS) {
263         state_ = State::COMPRESS_END;
264       }
265       assertStateIs(State::COMPRESS_END);
266       break;
267   }
268   size_t const inputSize = input.size();
269   size_t const outputSize = output.size();
270   bool const done = doCompressStream(input, output, flushOp);
271   if (!done && inputSize == input.size() && outputSize == output.size()) {
272     if (!progressMade_) {
273       throw std::runtime_error("Codec: No forward progress made");
274     }
275     // Throw an exception if there is no progress again next time
276     progressMade_ = false;
277   } else {
278     progressMade_ = true;
279   }
280   // Handle output state transitions
281   if (done) {
282     if (state_ == State::COMPRESS_FLUSH) {
283       state_ = State::COMPRESS;
284     } else if (state_ == State::COMPRESS_END) {
285       state_ = State::END;
286     }
287     // Check internal invariants
288     DCHECK(input.empty());
289     DCHECK(flushOp != StreamCodec::FlushOp::NONE);
290   }
291   return done;
292 }
293
294 bool StreamCodec::uncompressStream(
295     ByteRange& input,
296     MutableByteRange& output,
297     StreamCodec::FlushOp flushOp) {
298   if (state_ == State::RESET && input.empty()) {
299     if (uncompressedLength().value_or(0) == 0) {
300       return true;
301     }
302     return false;
303   }
304   // Handle input state transitions
305   if (state_ == State::RESET) {
306     state_ = State::UNCOMPRESS;
307   }
308   assertStateIs(State::UNCOMPRESS);
309   size_t const inputSize = input.size();
310   size_t const outputSize = output.size();
311   bool const done = doUncompressStream(input, output, flushOp);
312   if (!done && inputSize == input.size() && outputSize == output.size()) {
313     if (!progressMade_) {
314       throw std::runtime_error("Codec: no forward progress made");
315     }
316     // Throw an exception if there is no progress again next time
317     progressMade_ = false;
318   } else {
319     progressMade_ = true;
320   }
321   // Handle output state transitions
322   if (done) {
323     state_ = State::END;
324   }
325   return done;
326 }
327
328 static std::unique_ptr<IOBuf> addOutputBuffer(
329     MutableByteRange& output,
330     uint64_t size) {
331   DCHECK(output.empty());
332   auto buffer = IOBuf::create(size);
333   buffer->append(buffer->capacity());
334   output = {buffer->writableData(), buffer->length()};
335   return buffer;
336 }
337
338 std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
339   uint64_t const uncompressedLength = data->computeChainDataLength();
340   resetStream(uncompressedLength);
341   uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
342
343   auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
344   auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
345
346   MutableByteRange output;
347   auto buffer = addOutputBuffer(
348       output,
349       maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
350                                                : kDefaultBufferLength);
351
352   // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
353   IOBuf const* current = data;
354   ByteRange input{current->data(), current->length()};
355   StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
356   bool done = false;
357   while (!done) {
358     while (input.empty() && current->next() != data) {
359       current = current->next();
360       input = {current->data(), current->length()};
361     }
362     if (current->next() == data) {
363       // This is the last input buffer so end the stream
364       flushOp = StreamCodec::FlushOp::END;
365     }
366     if (output.empty()) {
367       buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
368     }
369     done = compressStream(input, output, flushOp);
370     if (done) {
371       DCHECK(input.empty());
372       DCHECK(flushOp == StreamCodec::FlushOp::END);
373       DCHECK_EQ(current->next(), data);
374     }
375   }
376   buffer->prev()->trimEnd(output.size());
377   return buffer;
378 }
379
380 static uint64_t computeBufferLength(
381     uint64_t const compressedLength,
382     uint64_t const blockSize) {
383   uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
384   uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
385   return std::min(goodBufferSize, kMaxBufferLength);
386 }
387
388 std::unique_ptr<IOBuf> StreamCodec::doUncompress(
389     IOBuf const* data,
390     Optional<uint64_t> uncompressedLength) {
391   auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
392   auto constexpr kBlockSize = uint64_t(128) << 10;
393   auto const defaultBufferLength =
394       computeBufferLength(data->computeChainDataLength(), kBlockSize);
395
396   uncompressedLength = getUncompressedLength(data, uncompressedLength);
397   resetStream(uncompressedLength);
398
399   MutableByteRange output;
400   auto buffer = addOutputBuffer(
401       output,
402       (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
403            ? *uncompressedLength
404            : defaultBufferLength));
405
406   // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
407   IOBuf const* current = data;
408   ByteRange input{current->data(), current->length()};
409   StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
410   bool done = false;
411   while (!done) {
412     while (input.empty() && current->next() != data) {
413       current = current->next();
414       input = {current->data(), current->length()};
415     }
416     if (current->next() == data) {
417       // Tell the uncompressor there is no more input (it may optimize)
418       flushOp = StreamCodec::FlushOp::END;
419     }
420     if (output.empty()) {
421       buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
422     }
423     done = uncompressStream(input, output, flushOp);
424   }
425   if (!input.empty()) {
426     throw std::runtime_error("Codec: Junk after end of data");
427   }
428
429   buffer->prev()->trimEnd(output.size());
430   if (uncompressedLength &&
431       *uncompressedLength != buffer->computeChainDataLength()) {
432     throw std::runtime_error("Codec: invalid uncompressed length");
433   }
434
435   return buffer;
436 }
437
438 namespace {
439
440 /**
441  * No compression
442  */
443 class NoCompressionCodec final : public Codec {
444  public:
445   static std::unique_ptr<Codec> create(int level, CodecType type);
446   explicit NoCompressionCodec(int level, CodecType type);
447
448  private:
449   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
450   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
451   std::unique_ptr<IOBuf> doUncompress(
452       const IOBuf* data,
453       Optional<uint64_t> uncompressedLength) override;
454 };
455
456 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
457   return std::make_unique<NoCompressionCodec>(level, type);
458 }
459
460 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
461     : Codec(type) {
462   DCHECK(type == CodecType::NO_COMPRESSION);
463   switch (level) {
464     case COMPRESSION_LEVEL_DEFAULT:
465     case COMPRESSION_LEVEL_FASTEST:
466     case COMPRESSION_LEVEL_BEST:
467       level = 0;
468   }
469   if (level != 0) {
470     throw std::invalid_argument(to<std::string>(
471         "NoCompressionCodec: invalid level ", level));
472   }
473 }
474
475 uint64_t NoCompressionCodec::doMaxCompressedLength(
476     uint64_t uncompressedLength) const {
477   return uncompressedLength;
478 }
479
480 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
481     const IOBuf* data) {
482   return data->clone();
483 }
484
485 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
486     const IOBuf* data,
487     Optional<uint64_t> uncompressedLength) {
488   if (uncompressedLength &&
489       data->computeChainDataLength() != *uncompressedLength) {
490     throw std::runtime_error(
491         to<std::string>("NoCompressionCodec: invalid uncompressed length"));
492   }
493   return data->clone();
494 }
495
496 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
497
498 namespace {
499
500 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
501   DCHECK_GE(out->tailroom(), kMaxVarintLength64);
502   out->append(encodeVarint(val, out->writableTail()));
503 }
504
505 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
506   uint64_t val = 0;
507   int8_t b = 0;
508   for (int shift = 0; shift <= 63; shift += 7) {
509     b = cursor.read<int8_t>();
510     val |= static_cast<uint64_t>(b & 0x7f) << shift;
511     if (b >= 0) {
512       break;
513     }
514   }
515   if (b < 0) {
516     throw std::invalid_argument("Invalid varint value. Too big.");
517   }
518   return val;
519 }
520
521 } // namespace
522
523 #endif  // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
524
525 #if FOLLY_HAVE_LIBLZ4
526
527 /**
528  * LZ4 compression
529  */
530 class LZ4Codec final : public Codec {
531  public:
532   static std::unique_ptr<Codec> create(int level, CodecType type);
533   explicit LZ4Codec(int level, CodecType type);
534
535  private:
536   bool doNeedsUncompressedLength() const override;
537   uint64_t doMaxUncompressedLength() const override;
538   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
539
540   bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
541
542   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
543   std::unique_ptr<IOBuf> doUncompress(
544       const IOBuf* data,
545       Optional<uint64_t> uncompressedLength) override;
546
547   bool highCompression_;
548 };
549
550 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
551   return std::make_unique<LZ4Codec>(level, type);
552 }
553
554 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
555   DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
556
557   switch (level) {
558     case COMPRESSION_LEVEL_FASTEST:
559     case COMPRESSION_LEVEL_DEFAULT:
560       level = 1;
561       break;
562     case COMPRESSION_LEVEL_BEST:
563       level = 2;
564       break;
565   }
566   if (level < 1 || level > 2) {
567     throw std::invalid_argument(to<std::string>(
568         "LZ4Codec: invalid level: ", level));
569   }
570   highCompression_ = (level > 1);
571 }
572
573 bool LZ4Codec::doNeedsUncompressedLength() const {
574   return !encodeSize();
575 }
576
577 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
578 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
579 // here.
580 #ifndef LZ4_MAX_INPUT_SIZE
581 # define LZ4_MAX_INPUT_SIZE 0x7E000000
582 #endif
583
584 uint64_t LZ4Codec::doMaxUncompressedLength() const {
585   return LZ4_MAX_INPUT_SIZE;
586 }
587
588 uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
589   return LZ4_compressBound(uncompressedLength) +
590       (encodeSize() ? kMaxVarintLength64 : 0);
591 }
592
593 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
594   IOBuf clone;
595   if (data->isChained()) {
596     // LZ4 doesn't support streaming, so we have to coalesce
597     clone = data->cloneCoalescedAsValue();
598     data = &clone;
599   }
600
601   auto out = IOBuf::create(maxCompressedLength(data->length()));
602   if (encodeSize()) {
603     encodeVarintToIOBuf(data->length(), out.get());
604   }
605
606   int n;
607   auto input = reinterpret_cast<const char*>(data->data());
608   auto output = reinterpret_cast<char*>(out->writableTail());
609   const auto inputLength = data->length();
610 #if LZ4_VERSION_NUMBER >= 10700
611   if (highCompression_) {
612     n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
613   } else {
614     n = LZ4_compress_default(input, output, inputLength, out->tailroom());
615   }
616 #else
617   if (highCompression_) {
618     n = LZ4_compressHC(input, output, inputLength);
619   } else {
620     n = LZ4_compress(input, output, inputLength);
621   }
622 #endif
623
624   CHECK_GE(n, 0);
625   CHECK_LE(n, out->capacity());
626
627   out->append(n);
628   return out;
629 }
630
631 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
632     const IOBuf* data,
633     Optional<uint64_t> uncompressedLength) {
634   IOBuf clone;
635   if (data->isChained()) {
636     // LZ4 doesn't support streaming, so we have to coalesce
637     clone = data->cloneCoalescedAsValue();
638     data = &clone;
639   }
640
641   folly::io::Cursor cursor(data);
642   uint64_t actualUncompressedLength;
643   if (encodeSize()) {
644     actualUncompressedLength = decodeVarintFromCursor(cursor);
645     if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
646       throw std::runtime_error("LZ4Codec: invalid uncompressed length");
647     }
648   } else {
649     // Invariants
650     DCHECK(uncompressedLength.hasValue());
651     DCHECK(*uncompressedLength <= maxUncompressedLength());
652     actualUncompressedLength = *uncompressedLength;
653   }
654
655   auto sp = StringPiece{cursor.peekBytes()};
656   auto out = IOBuf::create(actualUncompressedLength);
657   int n = LZ4_decompress_safe(
658       sp.data(),
659       reinterpret_cast<char*>(out->writableTail()),
660       sp.size(),
661       actualUncompressedLength);
662
663   if (n < 0 || uint64_t(n) != actualUncompressedLength) {
664     throw std::runtime_error(to<std::string>(
665         "LZ4 decompression returned invalid value ", n));
666   }
667   out->append(actualUncompressedLength);
668   return out;
669 }
670
671 #if LZ4_VERSION_NUMBER >= 10301
672
673 class LZ4FrameCodec final : public Codec {
674  public:
675   static std::unique_ptr<Codec> create(int level, CodecType type);
676   explicit LZ4FrameCodec(int level, CodecType type);
677   ~LZ4FrameCodec() override;
678
679   std::vector<std::string> validPrefixes() const override;
680   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
681       const override;
682
683  private:
684   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
685
686   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
687   std::unique_ptr<IOBuf> doUncompress(
688       const IOBuf* data,
689       Optional<uint64_t> uncompressedLength) override;
690
691   // Reset the dctx_ if it is dirty or null.
692   void resetDCtx();
693
694   int level_;
695   LZ4F_decompressionContext_t dctx_{nullptr};
696   bool dirty_{false};
697 };
698
699 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
700     int level,
701     CodecType type) {
702   return std::make_unique<LZ4FrameCodec>(level, type);
703 }
704
705 static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204;
706
707 std::vector<std::string> LZ4FrameCodec::validPrefixes() const {
708   return {prefixToStringLE(kLZ4FrameMagicLE)};
709 }
710
711 bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
712   return dataStartsWithLE(data, kLZ4FrameMagicLE);
713 }
714
715 uint64_t LZ4FrameCodec::doMaxCompressedLength(
716     uint64_t uncompressedLength) const {
717   LZ4F_preferences_t prefs{};
718   prefs.compressionLevel = level_;
719   prefs.frameInfo.contentSize = uncompressedLength;
720   return LZ4F_compressFrameBound(uncompressedLength, &prefs);
721 }
722
723 static size_t lz4FrameThrowOnError(size_t code) {
724   if (LZ4F_isError(code)) {
725     throw std::runtime_error(
726         to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
727   }
728   return code;
729 }
730
731 void LZ4FrameCodec::resetDCtx() {
732   if (dctx_ && !dirty_) {
733     return;
734   }
735   if (dctx_) {
736     LZ4F_freeDecompressionContext(dctx_);
737   }
738   lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
739   dirty_ = false;
740 }
741
742 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
743   DCHECK(type == CodecType::LZ4_FRAME);
744   switch (level) {
745     case COMPRESSION_LEVEL_FASTEST:
746     case COMPRESSION_LEVEL_DEFAULT:
747       level_ = 0;
748       break;
749     case COMPRESSION_LEVEL_BEST:
750       level_ = 16;
751       break;
752     default:
753       level_ = level;
754       break;
755   }
756 }
757
758 LZ4FrameCodec::~LZ4FrameCodec() {
759   if (dctx_) {
760     LZ4F_freeDecompressionContext(dctx_);
761   }
762 }
763
764 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
765   // LZ4 Frame compression doesn't support streaming so we have to coalesce
766   IOBuf clone;
767   if (data->isChained()) {
768     clone = data->cloneCoalescedAsValue();
769     data = &clone;
770   }
771   // Set preferences
772   const auto uncompressedLength = data->length();
773   LZ4F_preferences_t prefs{};
774   prefs.compressionLevel = level_;
775   prefs.frameInfo.contentSize = uncompressedLength;
776   // Compress
777   auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
778   const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
779       buf->writableTail(),
780       buf->tailroom(),
781       data->data(),
782       data->length(),
783       &prefs));
784   buf->append(written);
785   return buf;
786 }
787
788 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
789     const IOBuf* data,
790     Optional<uint64_t> uncompressedLength) {
791   // Reset the dctx if any errors have occurred
792   resetDCtx();
793   // Coalesce the data
794   ByteRange in = *data->begin();
795   IOBuf clone;
796   if (data->isChained()) {
797     clone = data->cloneCoalescedAsValue();
798     in = clone.coalesce();
799   }
800   data = nullptr;
801   // Select decompression options
802   LZ4F_decompressOptions_t options;
803   options.stableDst = 1;
804   // Select blockSize and growthSize for the IOBufQueue
805   IOBufQueue queue(IOBufQueue::cacheChainLength());
806   auto blockSize = uint64_t{64} << 10;
807   auto growthSize = uint64_t{4} << 20;
808   if (uncompressedLength) {
809     // Allocate uncompressedLength in one chunk (up to 64 MB)
810     const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20);
811     queue.preallocate(allocateSize, allocateSize);
812     blockSize = std::min(*uncompressedLength, blockSize);
813     growthSize = std::min(*uncompressedLength, growthSize);
814   } else {
815     // Reduce growthSize for small data
816     const auto guessUncompressedLen =
817         4 * std::max<uint64_t>(blockSize, in.size());
818     growthSize = std::min(guessUncompressedLen, growthSize);
819   }
820   // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
821   // returns 0
822   dirty_ = true;
823   // Decompress until the frame is over
824   size_t code = 0;
825   do {
826     // Allocate enough space to decompress at least a block
827     void* out;
828     size_t outSize;
829     std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
830     // Decompress
831     size_t inSize = in.size();
832     code = lz4FrameThrowOnError(
833         LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
834     if (in.empty() && outSize == 0 && code != 0) {
835       // We passed no input, no output was produced, and the frame isn't over
836       // No more forward progress is possible
837       throw std::runtime_error("LZ4Frame error: Incomplete frame");
838     }
839     in.uncheckedAdvance(inSize);
840     queue.postallocate(outSize);
841   } while (code != 0);
842   // At this point the decompression context can be reused
843   dirty_ = false;
844   if (uncompressedLength && queue.chainLength() != *uncompressedLength) {
845     throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
846   }
847   return queue.move();
848 }
849
850 #endif // LZ4_VERSION_NUMBER >= 10301
851 #endif // FOLLY_HAVE_LIBLZ4
852
853 #if FOLLY_HAVE_LIBSNAPPY
854
855 /**
856  * Snappy compression
857  */
858
859 /**
860  * Implementation of snappy::Source that reads from a IOBuf chain.
861  */
862 class IOBufSnappySource final : public snappy::Source {
863  public:
864   explicit IOBufSnappySource(const IOBuf* data);
865   size_t Available() const override;
866   const char* Peek(size_t* len) override;
867   void Skip(size_t n) override;
868  private:
869   size_t available_;
870   io::Cursor cursor_;
871 };
872
873 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
874   : available_(data->computeChainDataLength()),
875     cursor_(data) {
876 }
877
878 size_t IOBufSnappySource::Available() const {
879   return available_;
880 }
881
882 const char* IOBufSnappySource::Peek(size_t* len) {
883   auto sp = StringPiece{cursor_.peekBytes()};
884   *len = sp.size();
885   return sp.data();
886 }
887
888 void IOBufSnappySource::Skip(size_t n) {
889   CHECK_LE(n, available_);
890   cursor_.skip(n);
891   available_ -= n;
892 }
893
894 class SnappyCodec final : public Codec {
895  public:
896   static std::unique_ptr<Codec> create(int level, CodecType type);
897   explicit SnappyCodec(int level, CodecType type);
898
899  private:
900   uint64_t doMaxUncompressedLength() const override;
901   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
902   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
903   std::unique_ptr<IOBuf> doUncompress(
904       const IOBuf* data,
905       Optional<uint64_t> uncompressedLength) override;
906 };
907
908 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
909   return std::make_unique<SnappyCodec>(level, type);
910 }
911
912 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
913   DCHECK(type == CodecType::SNAPPY);
914   switch (level) {
915     case COMPRESSION_LEVEL_FASTEST:
916     case COMPRESSION_LEVEL_DEFAULT:
917     case COMPRESSION_LEVEL_BEST:
918       level = 1;
919   }
920   if (level != 1) {
921     throw std::invalid_argument(to<std::string>(
922         "SnappyCodec: invalid level: ", level));
923   }
924 }
925
926 uint64_t SnappyCodec::doMaxUncompressedLength() const {
927   // snappy.h uses uint32_t for lengths, so there's that.
928   return std::numeric_limits<uint32_t>::max();
929 }
930
931 uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
932   return snappy::MaxCompressedLength(uncompressedLength);
933 }
934
935 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
936   IOBufSnappySource source(data);
937   auto out = IOBuf::create(maxCompressedLength(source.Available()));
938
939   snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
940       out->writableTail()));
941
942   size_t n = snappy::Compress(&source, &sink);
943
944   CHECK_LE(n, out->capacity());
945   out->append(n);
946   return out;
947 }
948
949 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(
950     const IOBuf* data,
951     Optional<uint64_t> uncompressedLength) {
952   uint32_t actualUncompressedLength = 0;
953
954   {
955     IOBufSnappySource source(data);
956     if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
957       throw std::runtime_error("snappy::GetUncompressedLength failed");
958     }
959     if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
960       throw std::runtime_error("snappy: invalid uncompressed length");
961     }
962   }
963
964   auto out = IOBuf::create(actualUncompressedLength);
965
966   {
967     IOBufSnappySource source(data);
968     if (!snappy::RawUncompress(&source,
969                                reinterpret_cast<char*>(out->writableTail()))) {
970       throw std::runtime_error("snappy::RawUncompress failed");
971     }
972   }
973
974   out->append(actualUncompressedLength);
975   return out;
976 }
977
978 #endif  // FOLLY_HAVE_LIBSNAPPY
979
980 #if FOLLY_HAVE_LIBLZMA
981
982 /**
983  * LZMA2 compression
984  */
985 class LZMA2StreamCodec final : public StreamCodec {
986  public:
987   static std::unique_ptr<Codec> createCodec(int level, CodecType type);
988   static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
989   explicit LZMA2StreamCodec(int level, CodecType type);
990   ~LZMA2StreamCodec() override;
991
992   std::vector<std::string> validPrefixes() const override;
993   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
994       const override;
995
996  private:
997   bool doNeedsDataLength() const override;
998   uint64_t doMaxUncompressedLength() const override;
999   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1000
1001   bool encodeSize() const {
1002     return type() == CodecType::LZMA2_VARINT_SIZE;
1003   }
1004
1005   void doResetStream() override;
1006   bool doCompressStream(
1007       ByteRange& input,
1008       MutableByteRange& output,
1009       StreamCodec::FlushOp flushOp) override;
1010   bool doUncompressStream(
1011       ByteRange& input,
1012       MutableByteRange& output,
1013       StreamCodec::FlushOp flushOp) override;
1014
1015   void resetCStream();
1016   void resetDStream();
1017
1018   bool decodeAndCheckVarint(ByteRange& input);
1019   bool flushVarintBuffer(MutableByteRange& output);
1020   void resetVarintBuffer();
1021
1022   Optional<lzma_stream> cstream_{};
1023   Optional<lzma_stream> dstream_{};
1024
1025   std::array<uint8_t, kMaxVarintLength64> varintBuffer_;
1026   ByteRange varintToEncode_;
1027   size_t varintBufferPos_{0};
1028
1029   int level_;
1030   bool needReset_{true};
1031   bool needDecodeSize_{false};
1032 };
1033
1034 static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD;
1035 static constexpr unsigned kLZMA2MagicBytes = 6;
1036
1037 std::vector<std::string> LZMA2StreamCodec::validPrefixes() const {
1038   if (type() == CodecType::LZMA2_VARINT_SIZE) {
1039     return {};
1040   }
1041   return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)};
1042 }
1043
1044 bool LZMA2StreamCodec::doNeedsDataLength() const {
1045   return encodeSize();
1046 }
1047
1048 bool LZMA2StreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1049     const {
1050   if (type() == CodecType::LZMA2_VARINT_SIZE) {
1051     return false;
1052   }
1053   // Returns false for all inputs less than 8 bytes.
1054   // This is okay, because no valid LZMA2 streams are less than 8 bytes.
1055   return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes);
1056 }
1057
1058 std::unique_ptr<Codec> LZMA2StreamCodec::createCodec(
1059     int level,
1060     CodecType type) {
1061   return make_unique<LZMA2StreamCodec>(level, type);
1062 }
1063
1064 std::unique_ptr<StreamCodec> LZMA2StreamCodec::createStream(
1065     int level,
1066     CodecType type) {
1067   return make_unique<LZMA2StreamCodec>(level, type);
1068 }
1069
1070 LZMA2StreamCodec::LZMA2StreamCodec(int level, CodecType type)
1071     : StreamCodec(type) {
1072   DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
1073   switch (level) {
1074     case COMPRESSION_LEVEL_FASTEST:
1075       level = 0;
1076       break;
1077     case COMPRESSION_LEVEL_DEFAULT:
1078       level = LZMA_PRESET_DEFAULT;
1079       break;
1080     case COMPRESSION_LEVEL_BEST:
1081       level = 9;
1082       break;
1083   }
1084   if (level < 0 || level > 9) {
1085     throw std::invalid_argument(
1086         to<std::string>("LZMA2Codec: invalid level: ", level));
1087   }
1088   level_ = level;
1089 }
1090
1091 LZMA2StreamCodec::~LZMA2StreamCodec() {
1092   if (cstream_) {
1093     lzma_end(cstream_.get_pointer());
1094     cstream_.clear();
1095   }
1096   if (dstream_) {
1097     lzma_end(dstream_.get_pointer());
1098     dstream_.clear();
1099   }
1100 }
1101
1102 uint64_t LZMA2StreamCodec::doMaxUncompressedLength() const {
1103   // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
1104   return uint64_t(1) << 63;
1105 }
1106
1107 uint64_t LZMA2StreamCodec::doMaxCompressedLength(
1108     uint64_t uncompressedLength) const {
1109   return lzma_stream_buffer_bound(uncompressedLength) +
1110       (encodeSize() ? kMaxVarintLength64 : 0);
1111 }
1112
1113 void LZMA2StreamCodec::doResetStream() {
1114   needReset_ = true;
1115 }
1116
1117 void LZMA2StreamCodec::resetCStream() {
1118   if (!cstream_) {
1119     cstream_.assign(LZMA_STREAM_INIT);
1120   }
1121   lzma_ret const rc =
1122       lzma_easy_encoder(cstream_.get_pointer(), level_, LZMA_CHECK_NONE);
1123   if (rc != LZMA_OK) {
1124     throw std::runtime_error(folly::to<std::string>(
1125         "LZMA2StreamCodec: lzma_easy_encoder error: ", rc));
1126   }
1127 }
1128
1129 void LZMA2StreamCodec::resetDStream() {
1130   if (!dstream_) {
1131     dstream_.assign(LZMA_STREAM_INIT);
1132   }
1133   lzma_ret const rc = lzma_auto_decoder(
1134       dstream_.get_pointer(), std::numeric_limits<uint64_t>::max(), 0);
1135   if (rc != LZMA_OK) {
1136     throw std::runtime_error(folly::to<std::string>(
1137         "LZMA2StreamCodec: lzma_auto_decoder error: ", rc));
1138   }
1139 }
1140
1141 static lzma_ret lzmaThrowOnError(lzma_ret const rc) {
1142   switch (rc) {
1143     case LZMA_OK:
1144     case LZMA_STREAM_END:
1145     case LZMA_BUF_ERROR: // not fatal: returned if no progress was made twice
1146       return rc;
1147     default:
1148       throw std::runtime_error(
1149           to<std::string>("LZMA2StreamCodec: error: ", rc));
1150   }
1151 }
1152
1153 static lzma_action lzmaTranslateFlush(StreamCodec::FlushOp flush) {
1154   switch (flush) {
1155     case StreamCodec::FlushOp::NONE:
1156       return LZMA_RUN;
1157     case StreamCodec::FlushOp::FLUSH:
1158       return LZMA_SYNC_FLUSH;
1159     case StreamCodec::FlushOp::END:
1160       return LZMA_FINISH;
1161     default:
1162       throw std::invalid_argument("LZMA2StreamCodec: Invalid flush");
1163   }
1164 }
1165
1166 /**
1167  * Flushes the varint buffer.
1168  * Advances output by the number of bytes written.
1169  * Returns true when flushing is complete.
1170  */
1171 bool LZMA2StreamCodec::flushVarintBuffer(MutableByteRange& output) {
1172   if (varintToEncode_.empty()) {
1173     return true;
1174   }
1175   const size_t numBytesToCopy = std::min(varintToEncode_.size(), output.size());
1176   if (numBytesToCopy > 0) {
1177     memcpy(output.data(), varintToEncode_.data(), numBytesToCopy);
1178   }
1179   varintToEncode_.advance(numBytesToCopy);
1180   output.advance(numBytesToCopy);
1181   return varintToEncode_.empty();
1182 }
1183
1184 bool LZMA2StreamCodec::doCompressStream(
1185     ByteRange& input,
1186     MutableByteRange& output,
1187     StreamCodec::FlushOp flushOp) {
1188   if (needReset_) {
1189     resetCStream();
1190     if (encodeSize()) {
1191       varintBufferPos_ = 0;
1192       size_t const varintSize =
1193           encodeVarint(*uncompressedLength(), varintBuffer_.data());
1194       varintToEncode_ = {varintBuffer_.data(), varintSize};
1195     }
1196     needReset_ = false;
1197   }
1198
1199   if (!flushVarintBuffer(output)) {
1200     return false;
1201   }
1202
1203   cstream_->next_in = const_cast<uint8_t*>(input.data());
1204   cstream_->avail_in = input.size();
1205   cstream_->next_out = output.data();
1206   cstream_->avail_out = output.size();
1207   SCOPE_EXIT {
1208     input.uncheckedAdvance(input.size() - cstream_->avail_in);
1209     output.uncheckedAdvance(output.size() - cstream_->avail_out);
1210   };
1211   lzma_ret const rc = lzmaThrowOnError(
1212       lzma_code(cstream_.get_pointer(), lzmaTranslateFlush(flushOp)));
1213   switch (flushOp) {
1214     case StreamCodec::FlushOp::NONE:
1215       return false;
1216     case StreamCodec::FlushOp::FLUSH:
1217       return cstream_->avail_in == 0 && cstream_->avail_out != 0;
1218     case StreamCodec::FlushOp::END:
1219       return rc == LZMA_STREAM_END;
1220     default:
1221       throw std::invalid_argument("LZMA2StreamCodec: invalid FlushOp");
1222   }
1223 }
1224
1225 /**
1226  * Attempts to decode a varint from input.
1227  * The function advances input by the number of bytes read.
1228  *
1229  * If there are too many bytes and the varint is not valid, throw a
1230  * runtime_error.
1231  *
1232  * If the uncompressed length was provided and a decoded varint does not match
1233  * the provided length, throw a runtime_error.
1234  *
1235  * Returns true if the varint was successfully decoded and matches the
1236  * uncompressed length if provided, and false if more bytes are needed.
1237  */
1238 bool LZMA2StreamCodec::decodeAndCheckVarint(ByteRange& input) {
1239   if (input.empty()) {
1240     return false;
1241   }
1242   size_t const numBytesToCopy =
1243       std::min(kMaxVarintLength64 - varintBufferPos_, input.size());
1244   memcpy(varintBuffer_.data() + varintBufferPos_, input.data(), numBytesToCopy);
1245
1246   size_t const rangeSize = varintBufferPos_ + numBytesToCopy;
1247   ByteRange range{varintBuffer_.data(), rangeSize};
1248   auto const ret = tryDecodeVarint(range);
1249
1250   if (ret.hasValue()) {
1251     size_t const varintSize = rangeSize - range.size();
1252     input.advance(varintSize - varintBufferPos_);
1253     if (uncompressedLength() && *uncompressedLength() != ret.value()) {
1254       throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1255     }
1256     return true;
1257   } else if (ret.error() == DecodeVarintError::TooManyBytes) {
1258     throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1259   } else {
1260     // Too few bytes
1261     input.advance(numBytesToCopy);
1262     varintBufferPos_ += numBytesToCopy;
1263     return false;
1264   }
1265 }
1266
1267 bool LZMA2StreamCodec::doUncompressStream(
1268     ByteRange& input,
1269     MutableByteRange& output,
1270     StreamCodec::FlushOp flushOp) {
1271   if (needReset_) {
1272     resetDStream();
1273     needReset_ = false;
1274     needDecodeSize_ = encodeSize();
1275     if (encodeSize()) {
1276       // Reset buffer
1277       varintBufferPos_ = 0;
1278     }
1279   }
1280
1281   if (needDecodeSize_) {
1282     // Try decoding the varint. If the input does not contain the entire varint,
1283     // buffer the input. If the varint can not be decoded, fail.
1284     if (!decodeAndCheckVarint(input)) {
1285       return false;
1286     }
1287     needDecodeSize_ = false;
1288   }
1289
1290   dstream_->next_in = const_cast<uint8_t*>(input.data());
1291   dstream_->avail_in = input.size();
1292   dstream_->next_out = output.data();
1293   dstream_->avail_out = output.size();
1294   SCOPE_EXIT {
1295     input.advance(input.size() - dstream_->avail_in);
1296     output.advance(output.size() - dstream_->avail_out);
1297   };
1298
1299   lzma_ret rc;
1300   switch (flushOp) {
1301     case StreamCodec::FlushOp::NONE:
1302     case StreamCodec::FlushOp::FLUSH:
1303       rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_RUN));
1304       break;
1305     case StreamCodec::FlushOp::END:
1306       rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_FINISH));
1307       break;
1308     default:
1309       throw std::invalid_argument("LZMA2StreamCodec: invalid flush");
1310   }
1311   return rc == LZMA_STREAM_END;
1312 }
1313 #endif // FOLLY_HAVE_LIBLZMA
1314
1315 #ifdef FOLLY_HAVE_LIBZSTD
1316
1317 namespace {
1318 void zstdFreeCStream(ZSTD_CStream* zcs) {
1319   ZSTD_freeCStream(zcs);
1320 }
1321
1322 void zstdFreeDStream(ZSTD_DStream* zds) {
1323   ZSTD_freeDStream(zds);
1324 }
1325 } // namespace
1326
1327 /**
1328  * ZSTD compression
1329  */
1330 class ZSTDStreamCodec final : public StreamCodec {
1331  public:
1332   static std::unique_ptr<Codec> createCodec(int level, CodecType);
1333   static std::unique_ptr<StreamCodec> createStream(int level, CodecType);
1334   explicit ZSTDStreamCodec(int level, CodecType type);
1335
1336   std::vector<std::string> validPrefixes() const override;
1337   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1338       const override;
1339
1340  private:
1341   bool doNeedsUncompressedLength() const override;
1342   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1343   Optional<uint64_t> doGetUncompressedLength(
1344       IOBuf const* data,
1345       Optional<uint64_t> uncompressedLength) const override;
1346
1347   void doResetStream() override;
1348   bool doCompressStream(
1349       ByteRange& input,
1350       MutableByteRange& output,
1351       StreamCodec::FlushOp flushOp) override;
1352   bool doUncompressStream(
1353       ByteRange& input,
1354       MutableByteRange& output,
1355       StreamCodec::FlushOp flushOp) override;
1356
1357   void resetCStream();
1358   void resetDStream();
1359
1360   bool tryBlockCompress(ByteRange& input, MutableByteRange& output) const;
1361   bool tryBlockUncompress(ByteRange& input, MutableByteRange& output) const;
1362
1363   int level_;
1364   bool needReset_{true};
1365   std::unique_ptr<
1366       ZSTD_CStream,
1367       folly::static_function_deleter<ZSTD_CStream, &zstdFreeCStream>>
1368       cstream_{nullptr};
1369   std::unique_ptr<
1370       ZSTD_DStream,
1371       folly::static_function_deleter<ZSTD_DStream, &zstdFreeDStream>>
1372       dstream_{nullptr};
1373 };
1374
1375 static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528;
1376
1377 std::vector<std::string> ZSTDStreamCodec::validPrefixes() const {
1378   return {prefixToStringLE(kZSTDMagicLE)};
1379 }
1380
1381 bool ZSTDStreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1382     const {
1383   return dataStartsWithLE(data, kZSTDMagicLE);
1384 }
1385
1386 std::unique_ptr<Codec> ZSTDStreamCodec::createCodec(int level, CodecType type) {
1387   return make_unique<ZSTDStreamCodec>(level, type);
1388 }
1389
1390 std::unique_ptr<StreamCodec> ZSTDStreamCodec::createStream(
1391     int level,
1392     CodecType type) {
1393   return make_unique<ZSTDStreamCodec>(level, type);
1394 }
1395
1396 ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
1397     : StreamCodec(type) {
1398   DCHECK(type == CodecType::ZSTD);
1399   switch (level) {
1400     case COMPRESSION_LEVEL_FASTEST:
1401       level = 1;
1402       break;
1403     case COMPRESSION_LEVEL_DEFAULT:
1404       level = 1;
1405       break;
1406     case COMPRESSION_LEVEL_BEST:
1407       level = 19;
1408       break;
1409   }
1410   if (level < 1 || level > ZSTD_maxCLevel()) {
1411     throw std::invalid_argument(
1412         to<std::string>("ZSTD: invalid level: ", level));
1413   }
1414   level_ = level;
1415 }
1416
1417 bool ZSTDStreamCodec::doNeedsUncompressedLength() const {
1418   return false;
1419 }
1420
1421 uint64_t ZSTDStreamCodec::doMaxCompressedLength(
1422     uint64_t uncompressedLength) const {
1423   return ZSTD_compressBound(uncompressedLength);
1424 }
1425
1426 void zstdThrowIfError(size_t rc) {
1427   if (!ZSTD_isError(rc)) {
1428     return;
1429   }
1430   throw std::runtime_error(
1431       to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1432 }
1433
1434 Optional<uint64_t> ZSTDStreamCodec::doGetUncompressedLength(
1435     IOBuf const* data,
1436     Optional<uint64_t> uncompressedLength) const {
1437   // Read decompressed size from frame if available in first IOBuf.
1438   auto const decompressedSize =
1439       ZSTD_getDecompressedSize(data->data(), data->length());
1440   if (decompressedSize != 0) {
1441     if (uncompressedLength && *uncompressedLength != decompressedSize) {
1442       throw std::runtime_error("ZSTD: invalid uncompressed length");
1443     }
1444     uncompressedLength = decompressedSize;
1445   }
1446   return uncompressedLength;
1447 }
1448
1449 void ZSTDStreamCodec::doResetStream() {
1450   needReset_ = true;
1451 }
1452
1453 bool ZSTDStreamCodec::tryBlockCompress(
1454     ByteRange& input,
1455     MutableByteRange& output) const {
1456   DCHECK(needReset_);
1457   // We need to know that we have enough output space to use block compression
1458   if (output.size() < ZSTD_compressBound(input.size())) {
1459     return false;
1460   }
1461   size_t const length = ZSTD_compress(
1462       output.data(), output.size(), input.data(), input.size(), level_);
1463   zstdThrowIfError(length);
1464   input.uncheckedAdvance(input.size());
1465   output.uncheckedAdvance(length);
1466   return true;
1467 }
1468
1469 void ZSTDStreamCodec::resetCStream() {
1470   if (!cstream_) {
1471     cstream_.reset(ZSTD_createCStream());
1472     if (!cstream_) {
1473       throw std::bad_alloc{};
1474     }
1475   }
1476   // Advanced API usage works for all supported versions of zstd.
1477   // Required to set contentSizeFlag.
1478   auto params = ZSTD_getParams(level_, uncompressedLength().value_or(0), 0);
1479   params.fParams.contentSizeFlag = uncompressedLength().hasValue();
1480   zstdThrowIfError(ZSTD_initCStream_advanced(
1481       cstream_.get(), nullptr, 0, params, uncompressedLength().value_or(0)));
1482 }
1483
1484 bool ZSTDStreamCodec::doCompressStream(
1485     ByteRange& input,
1486     MutableByteRange& output,
1487     StreamCodec::FlushOp flushOp) {
1488   if (needReset_) {
1489     // If we are given all the input in one chunk try to use block compression
1490     if (flushOp == StreamCodec::FlushOp::END &&
1491         tryBlockCompress(input, output)) {
1492       return true;
1493     }
1494     resetCStream();
1495     needReset_ = false;
1496   }
1497   ZSTD_inBuffer in = {input.data(), input.size(), 0};
1498   ZSTD_outBuffer out = {output.data(), output.size(), 0};
1499   SCOPE_EXIT {
1500     input.uncheckedAdvance(in.pos);
1501     output.uncheckedAdvance(out.pos);
1502   };
1503   if (flushOp == StreamCodec::FlushOp::NONE || !input.empty()) {
1504     zstdThrowIfError(ZSTD_compressStream(cstream_.get(), &out, &in));
1505   }
1506   if (in.pos == in.size && flushOp != StreamCodec::FlushOp::NONE) {
1507     size_t rc;
1508     switch (flushOp) {
1509       case StreamCodec::FlushOp::FLUSH:
1510         rc = ZSTD_flushStream(cstream_.get(), &out);
1511         break;
1512       case StreamCodec::FlushOp::END:
1513         rc = ZSTD_endStream(cstream_.get(), &out);
1514         break;
1515       default:
1516         throw std::invalid_argument("ZSTD: invalid FlushOp");
1517     }
1518     zstdThrowIfError(rc);
1519     if (rc == 0) {
1520       return true;
1521     }
1522   }
1523   return false;
1524 }
1525
1526 bool ZSTDStreamCodec::tryBlockUncompress(
1527     ByteRange& input,
1528     MutableByteRange& output) const {
1529   DCHECK(needReset_);
1530 #if ZSTD_VERSION_NUMBER < 10104
1531   // We require ZSTD_findFrameCompressedSize() to perform this optimization.
1532   return false;
1533 #else
1534   // We need to know the uncompressed length and have enough output space.
1535   if (!uncompressedLength() || output.size() < *uncompressedLength()) {
1536     return false;
1537   }
1538   size_t const compressedLength =
1539       ZSTD_findFrameCompressedSize(input.data(), input.size());
1540   zstdThrowIfError(compressedLength);
1541   size_t const length = ZSTD_decompress(
1542       output.data(), *uncompressedLength(), input.data(), compressedLength);
1543   zstdThrowIfError(length);
1544   if (length != *uncompressedLength()) {
1545     throw std::runtime_error("ZSTDStreamCodec: Incorrect uncompressed length");
1546   }
1547   input.uncheckedAdvance(compressedLength);
1548   output.uncheckedAdvance(length);
1549   return true;
1550 #endif
1551 }
1552
1553 void ZSTDStreamCodec::resetDStream() {
1554   if (!dstream_) {
1555     dstream_.reset(ZSTD_createDStream());
1556     if (!dstream_) {
1557       throw std::bad_alloc{};
1558     }
1559   }
1560   zstdThrowIfError(ZSTD_initDStream(dstream_.get()));
1561 }
1562
1563 bool ZSTDStreamCodec::doUncompressStream(
1564     ByteRange& input,
1565     MutableByteRange& output,
1566     StreamCodec::FlushOp flushOp) {
1567   if (needReset_) {
1568     // If we are given all the input in one chunk try to use block uncompression
1569     if (flushOp == StreamCodec::FlushOp::END &&
1570         tryBlockUncompress(input, output)) {
1571       return true;
1572     }
1573     resetDStream();
1574     needReset_ = false;
1575   }
1576   ZSTD_inBuffer in = {input.data(), input.size(), 0};
1577   ZSTD_outBuffer out = {output.data(), output.size(), 0};
1578   SCOPE_EXIT {
1579     input.uncheckedAdvance(in.pos);
1580     output.uncheckedAdvance(out.pos);
1581   };
1582   size_t const rc = ZSTD_decompressStream(dstream_.get(), &out, &in);
1583   zstdThrowIfError(rc);
1584   return rc == 0;
1585 }
1586
1587 #endif // FOLLY_HAVE_LIBZSTD
1588
1589 #if FOLLY_HAVE_LIBBZ2
1590
1591 class Bzip2Codec final : public Codec {
1592  public:
1593   static std::unique_ptr<Codec> create(int level, CodecType type);
1594   explicit Bzip2Codec(int level, CodecType type);
1595
1596   std::vector<std::string> validPrefixes() const override;
1597   bool canUncompress(IOBuf const* data, Optional<uint64_t> uncompressedLength)
1598       const override;
1599
1600  private:
1601   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1602   std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
1603   std::unique_ptr<IOBuf> doUncompress(
1604       IOBuf const* data,
1605       Optional<uint64_t> uncompressedLength) override;
1606
1607   int level_;
1608 };
1609
1610 /* static */ std::unique_ptr<Codec> Bzip2Codec::create(
1611     int level,
1612     CodecType type) {
1613   return std::make_unique<Bzip2Codec>(level, type);
1614 }
1615
1616 Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
1617   DCHECK(type == CodecType::BZIP2);
1618   switch (level) {
1619     case COMPRESSION_LEVEL_FASTEST:
1620       level = 1;
1621       break;
1622     case COMPRESSION_LEVEL_DEFAULT:
1623       level = 9;
1624       break;
1625     case COMPRESSION_LEVEL_BEST:
1626       level = 9;
1627       break;
1628   }
1629   if (level < 1 || level > 9) {
1630     throw std::invalid_argument(
1631         to<std::string>("Bzip2: invalid level: ", level));
1632   }
1633   level_ = level;
1634 }
1635
1636 static uint32_t constexpr kBzip2MagicLE = 0x685a42;
1637 static uint64_t constexpr kBzip2MagicBytes = 3;
1638
1639 std::vector<std::string> Bzip2Codec::validPrefixes() const {
1640   return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
1641 }
1642
1643 bool Bzip2Codec::canUncompress(IOBuf const* data, Optional<uint64_t>) const {
1644   return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
1645 }
1646
1647 uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1648   // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
1649   //   To guarantee that the compressed data will fit in its buffer, allocate an
1650   //   output buffer of size 1% larger than the uncompressed data, plus six
1651   //   hundred extra bytes.
1652   return uncompressedLength + uncompressedLength / 100 + 600;
1653 }
1654
1655 static bz_stream createBzStream() {
1656   bz_stream stream;
1657   stream.bzalloc = nullptr;
1658   stream.bzfree = nullptr;
1659   stream.opaque = nullptr;
1660   stream.next_in = stream.next_out = nullptr;
1661   stream.avail_in = stream.avail_out = 0;
1662   return stream;
1663 }
1664
1665 // Throws on error condition, otherwise returns the code.
1666 static int bzCheck(int const rc) {
1667   switch (rc) {
1668     case BZ_OK:
1669     case BZ_RUN_OK:
1670     case BZ_FLUSH_OK:
1671     case BZ_FINISH_OK:
1672     case BZ_STREAM_END:
1673       return rc;
1674     default:
1675       throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
1676   }
1677 }
1678
1679 static std::unique_ptr<IOBuf> addOutputBuffer(
1680     bz_stream* stream,
1681     uint64_t const bufferLength) {
1682   DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
1683   DCHECK_EQ(stream->avail_out, 0);
1684
1685   auto buf = IOBuf::create(bufferLength);
1686   buf->append(buf->capacity());
1687
1688   stream->next_out = reinterpret_cast<char*>(buf->writableData());
1689   stream->avail_out = buf->length();
1690
1691   return buf;
1692 }
1693
1694 std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
1695   bz_stream stream = createBzStream();
1696   bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
1697   SCOPE_EXIT {
1698     bzCheck(BZ2_bzCompressEnd(&stream));
1699   };
1700
1701   uint64_t const uncompressedLength = data->computeChainDataLength();
1702   uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
1703   uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1704   uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
1705
1706   auto out = addOutputBuffer(
1707       &stream,
1708       maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
1709                                                : kDefaultBufferLength);
1710
1711   for (auto range : *data) {
1712     while (!range.empty()) {
1713       auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1714       stream.next_in =
1715           const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1716       stream.avail_in = inSize;
1717
1718       if (stream.avail_out == 0) {
1719         out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1720       }
1721
1722       bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
1723       range.uncheckedAdvance(inSize - stream.avail_in);
1724     }
1725   }
1726   do {
1727     if (stream.avail_out == 0) {
1728       out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1729     }
1730   } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
1731
1732   out->prev()->trimEnd(stream.avail_out);
1733
1734   return out;
1735 }
1736
1737 std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
1738     const IOBuf* data,
1739     Optional<uint64_t> uncompressedLength) {
1740   bz_stream stream = createBzStream();
1741   bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
1742   SCOPE_EXIT {
1743     bzCheck(BZ2_bzDecompressEnd(&stream));
1744   };
1745
1746   uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1747   uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
1748   uint64_t const kDefaultBufferLength =
1749       computeBufferLength(data->computeChainDataLength(), kBlockSize);
1750
1751   auto out = addOutputBuffer(
1752       &stream,
1753       ((uncompressedLength && *uncompressedLength <= kMaxSingleStepLength)
1754            ? *uncompressedLength
1755            : kDefaultBufferLength));
1756
1757   int rc = BZ_OK;
1758   for (auto range : *data) {
1759     while (!range.empty()) {
1760       auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1761       stream.next_in =
1762           const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1763       stream.avail_in = inSize;
1764
1765       if (stream.avail_out == 0) {
1766         out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1767       }
1768
1769       rc = bzCheck(BZ2_bzDecompress(&stream));
1770       range.uncheckedAdvance(inSize - stream.avail_in);
1771     }
1772   }
1773   while (rc != BZ_STREAM_END) {
1774     if (stream.avail_out == 0) {
1775       out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1776     }
1777     size_t const outputSize = stream.avail_out;
1778     rc = bzCheck(BZ2_bzDecompress(&stream));
1779     if (outputSize == stream.avail_out) {
1780       throw std::runtime_error("Bzip2Codec: Truncated input");
1781     }
1782   }
1783
1784   out->prev()->trimEnd(stream.avail_out);
1785
1786   uint64_t const totalOut =
1787       (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
1788   if (uncompressedLength && uncompressedLength != totalOut) {
1789     throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
1790   }
1791
1792   return out;
1793 }
1794
1795 #endif // FOLLY_HAVE_LIBBZ2
1796
1797 #if FOLLY_HAVE_LIBZ
1798
1799 zlib::Options getZlibOptions(CodecType type) {
1800   DCHECK(type == CodecType::GZIP || type == CodecType::ZLIB);
1801   return type == CodecType::GZIP ? zlib::defaultGzipOptions()
1802                                  : zlib::defaultZlibOptions();
1803 }
1804
1805 std::unique_ptr<Codec> getZlibCodec(int level, CodecType type) {
1806   return zlib::getCodec(getZlibOptions(type), level);
1807 }
1808
1809 std::unique_ptr<StreamCodec> getZlibStreamCodec(int level, CodecType type) {
1810   return zlib::getStreamCodec(getZlibOptions(type), level);
1811 }
1812
1813 #endif // FOLLY_HAVE_LIBZ
1814
1815 /**
1816  * Automatic decompression
1817  */
1818 class AutomaticCodec final : public Codec {
1819  public:
1820   static std::unique_ptr<Codec> create(
1821       std::vector<std::unique_ptr<Codec>> customCodecs,
1822       std::unique_ptr<Codec> terminalCodec);
1823   explicit AutomaticCodec(
1824       std::vector<std::unique_ptr<Codec>> customCodecs,
1825       std::unique_ptr<Codec> terminalCodec);
1826
1827   std::vector<std::string> validPrefixes() const override;
1828   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1829       const override;
1830
1831  private:
1832   bool doNeedsUncompressedLength() const override;
1833   uint64_t doMaxUncompressedLength() const override;
1834
1835   uint64_t doMaxCompressedLength(uint64_t) const override {
1836     throw std::runtime_error(
1837         "AutomaticCodec error: maxCompressedLength() not supported.");
1838   }
1839   std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
1840     throw std::runtime_error("AutomaticCodec error: compress() not supported.");
1841   }
1842   std::unique_ptr<IOBuf> doUncompress(
1843       const IOBuf* data,
1844       Optional<uint64_t> uncompressedLength) override;
1845
1846   void addCodecIfSupported(CodecType type);
1847
1848   // Throws iff the codecs aren't compatible (very slow)
1849   void checkCompatibleCodecs() const;
1850
1851   std::vector<std::unique_ptr<Codec>> codecs_;
1852   std::unique_ptr<Codec> terminalCodec_;
1853   bool needsUncompressedLength_;
1854   uint64_t maxUncompressedLength_;
1855 };
1856
1857 std::vector<std::string> AutomaticCodec::validPrefixes() const {
1858   std::unordered_set<std::string> prefixes;
1859   for (const auto& codec : codecs_) {
1860     const auto codecPrefixes = codec->validPrefixes();
1861     prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
1862   }
1863   return std::vector<std::string>{prefixes.begin(), prefixes.end()};
1864 }
1865
1866 bool AutomaticCodec::canUncompress(
1867     const IOBuf* data,
1868     Optional<uint64_t> uncompressedLength) const {
1869   return std::any_of(
1870       codecs_.begin(),
1871       codecs_.end(),
1872       [data, uncompressedLength](std::unique_ptr<Codec> const& codec) {
1873         return codec->canUncompress(data, uncompressedLength);
1874       });
1875 }
1876
1877 void AutomaticCodec::addCodecIfSupported(CodecType type) {
1878   const bool present = std::any_of(
1879       codecs_.begin(),
1880       codecs_.end(),
1881       [&type](std::unique_ptr<Codec> const& codec) {
1882         return codec->type() == type;
1883       });
1884   bool const isTerminalType = terminalCodec_ && terminalCodec_->type() == type;
1885   if (hasCodec(type) && !present && !isTerminalType) {
1886     codecs_.push_back(getCodec(type));
1887   }
1888 }
1889
1890 /* static */ std::unique_ptr<Codec> AutomaticCodec::create(
1891     std::vector<std::unique_ptr<Codec>> customCodecs,
1892     std::unique_ptr<Codec> terminalCodec) {
1893   return std::make_unique<AutomaticCodec>(
1894       std::move(customCodecs), std::move(terminalCodec));
1895 }
1896
1897 AutomaticCodec::AutomaticCodec(
1898     std::vector<std::unique_ptr<Codec>> customCodecs,
1899     std::unique_ptr<Codec> terminalCodec)
1900     : Codec(CodecType::USER_DEFINED),
1901       codecs_(std::move(customCodecs)),
1902       terminalCodec_(std::move(terminalCodec)) {
1903   // Fastest -> slowest
1904   std::array<CodecType, 6> defaultTypes{{
1905       CodecType::LZ4_FRAME,
1906       CodecType::ZSTD,
1907       CodecType::ZLIB,
1908       CodecType::GZIP,
1909       CodecType::LZMA2,
1910       CodecType::BZIP2,
1911   }};
1912
1913   for (auto type : defaultTypes) {
1914     addCodecIfSupported(type);
1915   }
1916
1917   if (kIsDebug) {
1918     checkCompatibleCodecs();
1919   }
1920
1921   // Check that none of the codecs are null
1922   DCHECK(std::none_of(
1923       codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1924         return codec == nullptr;
1925       }));
1926
1927   // Check that the terminal codec's type is not duplicated (with the exception
1928   // of USER_DEFINED).
1929   if (terminalCodec_) {
1930     DCHECK(std::none_of(
1931         codecs_.begin(),
1932         codecs_.end(),
1933         [&](std::unique_ptr<Codec> const& codec) {
1934           return codec->type() != CodecType::USER_DEFINED &&
1935               codec->type() == terminalCodec_->type();
1936         }));
1937   }
1938
1939   bool const terminalNeedsUncompressedLength =
1940       terminalCodec_ && terminalCodec_->needsUncompressedLength();
1941   needsUncompressedLength_ = std::any_of(
1942                                  codecs_.begin(),
1943                                  codecs_.end(),
1944                                  [](std::unique_ptr<Codec> const& codec) {
1945                                    return codec->needsUncompressedLength();
1946                                  }) ||
1947       terminalNeedsUncompressedLength;
1948
1949   const auto it = std::max_element(
1950       codecs_.begin(),
1951       codecs_.end(),
1952       [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) {
1953         return lhs->maxUncompressedLength() < rhs->maxUncompressedLength();
1954       });
1955   DCHECK(it != codecs_.end());
1956   auto const terminalMaxUncompressedLength =
1957       terminalCodec_ ? terminalCodec_->maxUncompressedLength() : 0;
1958   maxUncompressedLength_ =
1959       std::max((*it)->maxUncompressedLength(), terminalMaxUncompressedLength);
1960 }
1961
1962 void AutomaticCodec::checkCompatibleCodecs() const {
1963   // Keep track of all the possible headers.
1964   std::unordered_set<std::string> headers;
1965   // The empty header is not allowed.
1966   headers.insert("");
1967   // Step 1:
1968   // Construct a set of headers and check that none of the headers occur twice.
1969   // Eliminate edge cases.
1970   for (auto&& codec : codecs_) {
1971     const auto codecHeaders = codec->validPrefixes();
1972     // Codecs without any valid headers are not allowed.
1973     if (codecHeaders.empty()) {
1974       throw std::invalid_argument{
1975           "AutomaticCodec: validPrefixes() must not be empty."};
1976     }
1977     // Insert all the headers for the current codec.
1978     const size_t beforeSize = headers.size();
1979     headers.insert(codecHeaders.begin(), codecHeaders.end());
1980     // Codecs are not compatible if any header occurred twice.
1981     if (beforeSize + codecHeaders.size() != headers.size()) {
1982       throw std::invalid_argument{
1983           "AutomaticCodec: Two valid prefixes collide."};
1984     }
1985   }
1986   // Step 2:
1987   // Check if any strict non-empty prefix of any header is a header.
1988   for (const auto& header : headers) {
1989     for (size_t i = 1; i < header.size(); ++i) {
1990       if (headers.count(header.substr(0, i))) {
1991         throw std::invalid_argument{
1992             "AutomaticCodec: One valid prefix is a prefix of another valid "
1993             "prefix."};
1994       }
1995     }
1996   }
1997 }
1998
1999 bool AutomaticCodec::doNeedsUncompressedLength() const {
2000   return needsUncompressedLength_;
2001 }
2002
2003 uint64_t AutomaticCodec::doMaxUncompressedLength() const {
2004   return maxUncompressedLength_;
2005 }
2006
2007 std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
2008     const IOBuf* data,
2009     Optional<uint64_t> uncompressedLength) {
2010   try {
2011     for (auto&& codec : codecs_) {
2012       if (codec->canUncompress(data, uncompressedLength)) {
2013         return codec->uncompress(data, uncompressedLength);
2014       }
2015     }
2016   } catch (std::exception const& e) {
2017     if (!terminalCodec_) {
2018       throw e;
2019     }
2020   }
2021
2022   // Try terminal codec
2023   if (terminalCodec_) {
2024     return terminalCodec_->uncompress(data, uncompressedLength);
2025   }
2026
2027   throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
2028 }
2029
2030 using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
2031 using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
2032 struct Factory {
2033   CodecFactory codec;
2034   StreamCodecFactory stream;
2035 };
2036
2037 constexpr Factory
2038     codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
2039         {}, // USER_DEFINED
2040         {NoCompressionCodec::create, nullptr},
2041
2042 #if FOLLY_HAVE_LIBLZ4
2043         {LZ4Codec::create, nullptr},
2044 #else
2045         {},
2046 #endif
2047
2048 #if FOLLY_HAVE_LIBSNAPPY
2049         {SnappyCodec::create, nullptr},
2050 #else
2051         {},
2052 #endif
2053
2054 #if FOLLY_HAVE_LIBZ
2055         {getZlibCodec, getZlibStreamCodec},
2056 #else
2057         {},
2058 #endif
2059
2060 #if FOLLY_HAVE_LIBLZ4
2061         {LZ4Codec::create, nullptr},
2062 #else
2063         {},
2064 #endif
2065
2066 #if FOLLY_HAVE_LIBLZMA
2067         {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2068         {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2069 #else
2070         {},
2071         {},
2072 #endif
2073
2074 #if FOLLY_HAVE_LIBZSTD
2075         {ZSTDStreamCodec::createCodec, ZSTDStreamCodec::createStream},
2076 #else
2077         {},
2078 #endif
2079
2080 #if FOLLY_HAVE_LIBZ
2081         {getZlibCodec, getZlibStreamCodec},
2082 #else
2083         {},
2084 #endif
2085
2086 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
2087         {LZ4FrameCodec::create, nullptr},
2088 #else
2089         {},
2090 #endif
2091
2092 #if FOLLY_HAVE_LIBBZ2
2093         {Bzip2Codec::create, nullptr},
2094 #else
2095         {},
2096 #endif
2097 };
2098
2099 Factory const& getFactory(CodecType type) {
2100   size_t const idx = static_cast<size_t>(type);
2101   if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
2102     throw std::invalid_argument(
2103         to<std::string>("Compression type ", idx, " invalid"));
2104   }
2105   return codecFactories[idx];
2106 }
2107 } // namespace
2108
2109 bool hasCodec(CodecType type) {
2110   return getFactory(type).codec != nullptr;
2111 }
2112
2113 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
2114   auto const factory = getFactory(type).codec;
2115   if (!factory) {
2116     throw std::invalid_argument(
2117         to<std::string>("Compression type ", type, " not supported"));
2118   }
2119   auto codec = (*factory)(level, type);
2120   DCHECK(codec->type() == type);
2121   return codec;
2122 }
2123
2124 bool hasStreamCodec(CodecType type) {
2125   return getFactory(type).stream != nullptr;
2126 }
2127
2128 std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
2129   auto const factory = getFactory(type).stream;
2130   if (!factory) {
2131     throw std::invalid_argument(
2132         to<std::string>("Compression type ", type, " not supported"));
2133   }
2134   auto codec = (*factory)(level, type);
2135   DCHECK(codec->type() == type);
2136   return codec;
2137 }
2138
2139 std::unique_ptr<Codec> getAutoUncompressionCodec(
2140     std::vector<std::unique_ptr<Codec>> customCodecs,
2141     std::unique_ptr<Codec> terminalCodec) {
2142   return AutomaticCodec::create(
2143       std::move(customCodecs), std::move(terminalCodec));
2144 }
2145 } // namespace io
2146 } // namespace folly