Enforce forward progress with StreamCodec
[folly.git] / folly / io / Compression.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/Compression.h>
18
19 #if FOLLY_HAVE_LIBLZ4
20 #include <lz4.h>
21 #include <lz4hc.h>
22 #if LZ4_VERSION_NUMBER >= 10301
23 #include <lz4frame.h>
24 #endif
25 #endif
26
27 #include <glog/logging.h>
28
29 #if FOLLY_HAVE_LIBSNAPPY
30 #include <snappy-sinksource.h>
31 #include <snappy.h>
32 #endif
33
34 #if FOLLY_HAVE_LIBZ
35 #include <folly/io/compression/Zlib.h>
36 #endif
37
38 #if FOLLY_HAVE_LIBLZMA
39 #include <lzma.h>
40 #endif
41
42 #if FOLLY_HAVE_LIBZSTD
43 #define ZSTD_STATIC_LINKING_ONLY
44 #include <zstd.h>
45 #endif
46
47 #if FOLLY_HAVE_LIBBZ2
48 #include <bzlib.h>
49 #endif
50
51 #include <folly/Bits.h>
52 #include <folly/Conv.h>
53 #include <folly/Memory.h>
54 #include <folly/Portability.h>
55 #include <folly/ScopeGuard.h>
56 #include <folly/Varint.h>
57 #include <folly/io/Cursor.h>
58 #include <folly/io/compression/Utils.h>
59 #include <algorithm>
60 #include <unordered_set>
61
62 using folly::io::compression::detail::dataStartsWithLE;
63 using folly::io::compression::detail::prefixToStringLE;
64
65 namespace zlib = folly::io::zlib;
66
67 namespace folly {
68 namespace io {
69
70 Codec::Codec(CodecType type) : type_(type) { }
71
72 // Ensure consistent behavior in the nullptr case
73 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
74   if (data == nullptr) {
75     throw std::invalid_argument("Codec: data must not be nullptr");
76   }
77   uint64_t len = data->computeChainDataLength();
78   if (len == 0) {
79     return IOBuf::create(0);
80   }
81   if (len > maxUncompressedLength()) {
82     throw std::runtime_error("Codec: uncompressed length too large");
83   }
84
85   return doCompress(data);
86 }
87
88 std::string Codec::compress(const StringPiece data) {
89   const uint64_t len = data.size();
90   if (len == 0) {
91     return "";
92   }
93   if (len > maxUncompressedLength()) {
94     throw std::runtime_error("Codec: uncompressed length too large");
95   }
96
97   return doCompressString(data);
98 }
99
100 std::unique_ptr<IOBuf> Codec::uncompress(
101     const IOBuf* data,
102     Optional<uint64_t> uncompressedLength) {
103   if (data == nullptr) {
104     throw std::invalid_argument("Codec: data must not be nullptr");
105   }
106   if (!uncompressedLength) {
107     if (needsUncompressedLength()) {
108       throw std::invalid_argument("Codec: uncompressed length required");
109     }
110   } else if (*uncompressedLength > maxUncompressedLength()) {
111     throw std::runtime_error("Codec: uncompressed length too large");
112   }
113
114   if (data->empty()) {
115     if (uncompressedLength.value_or(0) != 0) {
116       throw std::runtime_error("Codec: invalid uncompressed length");
117     }
118     return IOBuf::create(0);
119   }
120
121   return doUncompress(data, uncompressedLength);
122 }
123
124 std::string Codec::uncompress(
125     const StringPiece data,
126     Optional<uint64_t> uncompressedLength) {
127   if (!uncompressedLength) {
128     if (needsUncompressedLength()) {
129       throw std::invalid_argument("Codec: uncompressed length required");
130     }
131   } else if (*uncompressedLength > maxUncompressedLength()) {
132     throw std::runtime_error("Codec: uncompressed length too large");
133   }
134
135   if (data.empty()) {
136     if (uncompressedLength.value_or(0) != 0) {
137       throw std::runtime_error("Codec: invalid uncompressed length");
138     }
139     return "";
140   }
141
142   return doUncompressString(data, uncompressedLength);
143 }
144
145 bool Codec::needsUncompressedLength() const {
146   return doNeedsUncompressedLength();
147 }
148
149 uint64_t Codec::maxUncompressedLength() const {
150   return doMaxUncompressedLength();
151 }
152
153 bool Codec::doNeedsUncompressedLength() const {
154   return false;
155 }
156
157 uint64_t Codec::doMaxUncompressedLength() const {
158   return UNLIMITED_UNCOMPRESSED_LENGTH;
159 }
160
161 std::vector<std::string> Codec::validPrefixes() const {
162   return {};
163 }
164
165 bool Codec::canUncompress(const IOBuf*, Optional<uint64_t>) const {
166   return false;
167 }
168
169 std::string Codec::doCompressString(const StringPiece data) {
170   const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
171   auto outputBuffer = doCompress(&inputBuffer);
172   std::string output;
173   output.reserve(outputBuffer->computeChainDataLength());
174   for (auto range : *outputBuffer) {
175     output.append(reinterpret_cast<const char*>(range.data()), range.size());
176   }
177   return output;
178 }
179
180 std::string Codec::doUncompressString(
181     const StringPiece data,
182     Optional<uint64_t> uncompressedLength) {
183   const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
184   auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
185   std::string output;
186   output.reserve(outputBuffer->computeChainDataLength());
187   for (auto range : *outputBuffer) {
188     output.append(reinterpret_cast<const char*>(range.data()), range.size());
189   }
190   return output;
191 }
192
193 uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
194   if (uncompressedLength == 0) {
195     return 0;
196   }
197   return doMaxCompressedLength(uncompressedLength);
198 }
199
200 Optional<uint64_t> Codec::getUncompressedLength(
201     const folly::IOBuf* data,
202     Optional<uint64_t> uncompressedLength) const {
203   auto const compressedLength = data->computeChainDataLength();
204   if (uncompressedLength == uint64_t(0) || compressedLength == 0) {
205     if (uncompressedLength.value_or(0) != 0 || compressedLength != 0) {
206       throw std::runtime_error("Invalid uncompressed length");
207     }
208     return 0;
209   }
210   return doGetUncompressedLength(data, uncompressedLength);
211 }
212
213 Optional<uint64_t> Codec::doGetUncompressedLength(
214     const folly::IOBuf*,
215     Optional<uint64_t> uncompressedLength) const {
216   return uncompressedLength;
217 }
218
219 bool StreamCodec::needsDataLength() const {
220   return doNeedsDataLength();
221 }
222
223 bool StreamCodec::doNeedsDataLength() const {
224   return false;
225 }
226
227 void StreamCodec::assertStateIs(State expected) const {
228   if (state_ != expected) {
229     throw std::logic_error(folly::to<std::string>(
230         "Codec: state is ", state_, "; expected state ", expected));
231   }
232 }
233
234 void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
235   state_ = State::RESET;
236   uncompressedLength_ = uncompressedLength;
237   progressMade_ = true;
238   doResetStream();
239 }
240
241 bool StreamCodec::compressStream(
242     ByteRange& input,
243     MutableByteRange& output,
244     StreamCodec::FlushOp flushOp) {
245   if (state_ == State::RESET && input.empty()) {
246     if (flushOp == StreamCodec::FlushOp::NONE) {
247       return false;
248     }
249     if (flushOp == StreamCodec::FlushOp::END &&
250         uncompressedLength().value_or(0) != 0) {
251       throw std::runtime_error("Codec: invalid uncompressed length");
252     }
253     return true;
254   }
255   if (!uncompressedLength() && needsDataLength()) {
256     throw std::runtime_error("Codec: uncompressed length required");
257   }
258   if (state_ == State::RESET && !input.empty() &&
259       uncompressedLength() == uint64_t(0)) {
260     throw std::runtime_error("Codec: invalid uncompressed length");
261   }
262   // Handle input state transitions
263   switch (flushOp) {
264     case StreamCodec::FlushOp::NONE:
265       if (state_ == State::RESET) {
266         state_ = State::COMPRESS;
267       }
268       assertStateIs(State::COMPRESS);
269       break;
270     case StreamCodec::FlushOp::FLUSH:
271       if (state_ == State::RESET || state_ == State::COMPRESS) {
272         state_ = State::COMPRESS_FLUSH;
273       }
274       assertStateIs(State::COMPRESS_FLUSH);
275       break;
276     case StreamCodec::FlushOp::END:
277       if (state_ == State::RESET || state_ == State::COMPRESS) {
278         state_ = State::COMPRESS_END;
279       }
280       assertStateIs(State::COMPRESS_END);
281       break;
282   }
283   size_t const inputSize = input.size();
284   size_t const outputSize = output.size();
285   bool const done = doCompressStream(input, output, flushOp);
286   if (!done && inputSize == input.size() && outputSize == output.size()) {
287     if (!progressMade_) {
288       throw std::runtime_error("Codec: No forward progress made");
289     }
290     // Throw an exception if there is no progress again next time
291     progressMade_ = false;
292   } else {
293     progressMade_ = true;
294   }
295   // Handle output state transitions
296   if (done) {
297     if (state_ == State::COMPRESS_FLUSH) {
298       state_ = State::COMPRESS;
299     } else if (state_ == State::COMPRESS_END) {
300       state_ = State::END;
301     }
302     // Check internal invariants
303     DCHECK(input.empty());
304     DCHECK(flushOp != StreamCodec::FlushOp::NONE);
305   }
306   return done;
307 }
308
309 bool StreamCodec::uncompressStream(
310     ByteRange& input,
311     MutableByteRange& output,
312     StreamCodec::FlushOp flushOp) {
313   if (state_ == State::RESET && input.empty()) {
314     if (uncompressedLength().value_or(0) == 0) {
315       return true;
316     }
317     return false;
318   }
319   // Handle input state transitions
320   if (state_ == State::RESET) {
321     state_ = State::UNCOMPRESS;
322   }
323   assertStateIs(State::UNCOMPRESS);
324   size_t const inputSize = input.size();
325   size_t const outputSize = output.size();
326   bool const done = doUncompressStream(input, output, flushOp);
327   if (!done && inputSize == input.size() && outputSize == output.size()) {
328     if (!progressMade_) {
329       throw std::runtime_error("Codec: no forward progress made");
330     }
331     // Throw an exception if there is no progress again next time
332     progressMade_ = false;
333   } else {
334     progressMade_ = true;
335   }
336   // Handle output state transitions
337   if (done) {
338     state_ = State::END;
339   }
340   return done;
341 }
342
343 static std::unique_ptr<IOBuf> addOutputBuffer(
344     MutableByteRange& output,
345     uint64_t size) {
346   DCHECK(output.empty());
347   auto buffer = IOBuf::create(size);
348   buffer->append(buffer->capacity());
349   output = {buffer->writableData(), buffer->length()};
350   return buffer;
351 }
352
353 std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
354   uint64_t const uncompressedLength = data->computeChainDataLength();
355   resetStream(uncompressedLength);
356   uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
357
358   auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
359   auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
360
361   MutableByteRange output;
362   auto buffer = addOutputBuffer(
363       output,
364       maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
365                                                : kDefaultBufferLength);
366
367   // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
368   IOBuf const* current = data;
369   ByteRange input{current->data(), current->length()};
370   StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
371   bool done = false;
372   while (!done) {
373     while (input.empty() && current->next() != data) {
374       current = current->next();
375       input = {current->data(), current->length()};
376     }
377     if (current->next() == data) {
378       // This is the last input buffer so end the stream
379       flushOp = StreamCodec::FlushOp::END;
380     }
381     if (output.empty()) {
382       buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
383     }
384     done = compressStream(input, output, flushOp);
385     if (done) {
386       DCHECK(input.empty());
387       DCHECK(flushOp == StreamCodec::FlushOp::END);
388       DCHECK_EQ(current->next(), data);
389     }
390   }
391   buffer->prev()->trimEnd(output.size());
392   return buffer;
393 }
394
395 static uint64_t computeBufferLength(
396     uint64_t const compressedLength,
397     uint64_t const blockSize) {
398   uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
399   uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
400   return std::min(goodBufferSize, kMaxBufferLength);
401 }
402
403 std::unique_ptr<IOBuf> StreamCodec::doUncompress(
404     IOBuf const* data,
405     Optional<uint64_t> uncompressedLength) {
406   auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
407   auto constexpr kBlockSize = uint64_t(128) << 10;
408   auto const defaultBufferLength =
409       computeBufferLength(data->computeChainDataLength(), kBlockSize);
410
411   uncompressedLength = getUncompressedLength(data, uncompressedLength);
412   resetStream(uncompressedLength);
413
414   MutableByteRange output;
415   auto buffer = addOutputBuffer(
416       output,
417       (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
418            ? *uncompressedLength
419            : defaultBufferLength));
420
421   // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
422   IOBuf const* current = data;
423   ByteRange input{current->data(), current->length()};
424   StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
425   bool done = false;
426   while (!done) {
427     while (input.empty() && current->next() != data) {
428       current = current->next();
429       input = {current->data(), current->length()};
430     }
431     if (current->next() == data) {
432       // Tell the uncompressor there is no more input (it may optimize)
433       flushOp = StreamCodec::FlushOp::END;
434     }
435     if (output.empty()) {
436       buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
437     }
438     done = uncompressStream(input, output, flushOp);
439   }
440   if (!input.empty()) {
441     throw std::runtime_error("Codec: Junk after end of data");
442   }
443
444   buffer->prev()->trimEnd(output.size());
445   if (uncompressedLength &&
446       *uncompressedLength != buffer->computeChainDataLength()) {
447     throw std::runtime_error("Codec: invalid uncompressed length");
448   }
449
450   return buffer;
451 }
452
453 namespace {
454
455 /**
456  * No compression
457  */
458 class NoCompressionCodec final : public Codec {
459  public:
460   static std::unique_ptr<Codec> create(int level, CodecType type);
461   explicit NoCompressionCodec(int level, CodecType type);
462
463  private:
464   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
465   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
466   std::unique_ptr<IOBuf> doUncompress(
467       const IOBuf* data,
468       Optional<uint64_t> uncompressedLength) override;
469 };
470
471 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
472   return std::make_unique<NoCompressionCodec>(level, type);
473 }
474
475 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
476     : Codec(type) {
477   DCHECK(type == CodecType::NO_COMPRESSION);
478   switch (level) {
479     case COMPRESSION_LEVEL_DEFAULT:
480     case COMPRESSION_LEVEL_FASTEST:
481     case COMPRESSION_LEVEL_BEST:
482       level = 0;
483   }
484   if (level != 0) {
485     throw std::invalid_argument(to<std::string>(
486         "NoCompressionCodec: invalid level ", level));
487   }
488 }
489
490 uint64_t NoCompressionCodec::doMaxCompressedLength(
491     uint64_t uncompressedLength) const {
492   return uncompressedLength;
493 }
494
495 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
496     const IOBuf* data) {
497   return data->clone();
498 }
499
500 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
501     const IOBuf* data,
502     Optional<uint64_t> uncompressedLength) {
503   if (uncompressedLength &&
504       data->computeChainDataLength() != *uncompressedLength) {
505     throw std::runtime_error(
506         to<std::string>("NoCompressionCodec: invalid uncompressed length"));
507   }
508   return data->clone();
509 }
510
511 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
512
513 namespace {
514
515 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
516   DCHECK_GE(out->tailroom(), kMaxVarintLength64);
517   out->append(encodeVarint(val, out->writableTail()));
518 }
519
520 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
521   uint64_t val = 0;
522   int8_t b = 0;
523   for (int shift = 0; shift <= 63; shift += 7) {
524     b = cursor.read<int8_t>();
525     val |= static_cast<uint64_t>(b & 0x7f) << shift;
526     if (b >= 0) {
527       break;
528     }
529   }
530   if (b < 0) {
531     throw std::invalid_argument("Invalid varint value. Too big.");
532   }
533   return val;
534 }
535
536 } // namespace
537
538 #endif  // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
539
540 #if FOLLY_HAVE_LIBLZ4
541
542 /**
543  * LZ4 compression
544  */
545 class LZ4Codec final : public Codec {
546  public:
547   static std::unique_ptr<Codec> create(int level, CodecType type);
548   explicit LZ4Codec(int level, CodecType type);
549
550  private:
551   bool doNeedsUncompressedLength() const override;
552   uint64_t doMaxUncompressedLength() const override;
553   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
554
555   bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
556
557   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
558   std::unique_ptr<IOBuf> doUncompress(
559       const IOBuf* data,
560       Optional<uint64_t> uncompressedLength) override;
561
562   bool highCompression_;
563 };
564
565 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
566   return std::make_unique<LZ4Codec>(level, type);
567 }
568
569 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
570   DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
571
572   switch (level) {
573     case COMPRESSION_LEVEL_FASTEST:
574     case COMPRESSION_LEVEL_DEFAULT:
575       level = 1;
576       break;
577     case COMPRESSION_LEVEL_BEST:
578       level = 2;
579       break;
580   }
581   if (level < 1 || level > 2) {
582     throw std::invalid_argument(to<std::string>(
583         "LZ4Codec: invalid level: ", level));
584   }
585   highCompression_ = (level > 1);
586 }
587
588 bool LZ4Codec::doNeedsUncompressedLength() const {
589   return !encodeSize();
590 }
591
592 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
593 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
594 // here.
595 #ifndef LZ4_MAX_INPUT_SIZE
596 # define LZ4_MAX_INPUT_SIZE 0x7E000000
597 #endif
598
599 uint64_t LZ4Codec::doMaxUncompressedLength() const {
600   return LZ4_MAX_INPUT_SIZE;
601 }
602
603 uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
604   return LZ4_compressBound(uncompressedLength) +
605       (encodeSize() ? kMaxVarintLength64 : 0);
606 }
607
608 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
609   IOBuf clone;
610   if (data->isChained()) {
611     // LZ4 doesn't support streaming, so we have to coalesce
612     clone = data->cloneCoalescedAsValue();
613     data = &clone;
614   }
615
616   auto out = IOBuf::create(maxCompressedLength(data->length()));
617   if (encodeSize()) {
618     encodeVarintToIOBuf(data->length(), out.get());
619   }
620
621   int n;
622   auto input = reinterpret_cast<const char*>(data->data());
623   auto output = reinterpret_cast<char*>(out->writableTail());
624   const auto inputLength = data->length();
625 #if LZ4_VERSION_NUMBER >= 10700
626   if (highCompression_) {
627     n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
628   } else {
629     n = LZ4_compress_default(input, output, inputLength, out->tailroom());
630   }
631 #else
632   if (highCompression_) {
633     n = LZ4_compressHC(input, output, inputLength);
634   } else {
635     n = LZ4_compress(input, output, inputLength);
636   }
637 #endif
638
639   CHECK_GE(n, 0);
640   CHECK_LE(n, out->capacity());
641
642   out->append(n);
643   return out;
644 }
645
646 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
647     const IOBuf* data,
648     Optional<uint64_t> uncompressedLength) {
649   IOBuf clone;
650   if (data->isChained()) {
651     // LZ4 doesn't support streaming, so we have to coalesce
652     clone = data->cloneCoalescedAsValue();
653     data = &clone;
654   }
655
656   folly::io::Cursor cursor(data);
657   uint64_t actualUncompressedLength;
658   if (encodeSize()) {
659     actualUncompressedLength = decodeVarintFromCursor(cursor);
660     if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
661       throw std::runtime_error("LZ4Codec: invalid uncompressed length");
662     }
663   } else {
664     // Invariants
665     DCHECK(uncompressedLength.hasValue());
666     DCHECK(*uncompressedLength <= maxUncompressedLength());
667     actualUncompressedLength = *uncompressedLength;
668   }
669
670   auto sp = StringPiece{cursor.peekBytes()};
671   auto out = IOBuf::create(actualUncompressedLength);
672   int n = LZ4_decompress_safe(
673       sp.data(),
674       reinterpret_cast<char*>(out->writableTail()),
675       sp.size(),
676       actualUncompressedLength);
677
678   if (n < 0 || uint64_t(n) != actualUncompressedLength) {
679     throw std::runtime_error(to<std::string>(
680         "LZ4 decompression returned invalid value ", n));
681   }
682   out->append(actualUncompressedLength);
683   return out;
684 }
685
686 #if LZ4_VERSION_NUMBER >= 10301
687
688 class LZ4FrameCodec final : public Codec {
689  public:
690   static std::unique_ptr<Codec> create(int level, CodecType type);
691   explicit LZ4FrameCodec(int level, CodecType type);
692   ~LZ4FrameCodec() override;
693
694   std::vector<std::string> validPrefixes() const override;
695   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
696       const override;
697
698  private:
699   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
700
701   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
702   std::unique_ptr<IOBuf> doUncompress(
703       const IOBuf* data,
704       Optional<uint64_t> uncompressedLength) override;
705
706   // Reset the dctx_ if it is dirty or null.
707   void resetDCtx();
708
709   int level_;
710   LZ4F_decompressionContext_t dctx_{nullptr};
711   bool dirty_{false};
712 };
713
714 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
715     int level,
716     CodecType type) {
717   return std::make_unique<LZ4FrameCodec>(level, type);
718 }
719
720 static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204;
721
722 std::vector<std::string> LZ4FrameCodec::validPrefixes() const {
723   return {prefixToStringLE(kLZ4FrameMagicLE)};
724 }
725
726 bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
727   return dataStartsWithLE(data, kLZ4FrameMagicLE);
728 }
729
730 uint64_t LZ4FrameCodec::doMaxCompressedLength(
731     uint64_t uncompressedLength) const {
732   LZ4F_preferences_t prefs{};
733   prefs.compressionLevel = level_;
734   prefs.frameInfo.contentSize = uncompressedLength;
735   return LZ4F_compressFrameBound(uncompressedLength, &prefs);
736 }
737
738 static size_t lz4FrameThrowOnError(size_t code) {
739   if (LZ4F_isError(code)) {
740     throw std::runtime_error(
741         to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
742   }
743   return code;
744 }
745
746 void LZ4FrameCodec::resetDCtx() {
747   if (dctx_ && !dirty_) {
748     return;
749   }
750   if (dctx_) {
751     LZ4F_freeDecompressionContext(dctx_);
752   }
753   lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
754   dirty_ = false;
755 }
756
757 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
758   DCHECK(type == CodecType::LZ4_FRAME);
759   switch (level) {
760     case COMPRESSION_LEVEL_FASTEST:
761     case COMPRESSION_LEVEL_DEFAULT:
762       level_ = 0;
763       break;
764     case COMPRESSION_LEVEL_BEST:
765       level_ = 16;
766       break;
767     default:
768       level_ = level;
769       break;
770   }
771 }
772
773 LZ4FrameCodec::~LZ4FrameCodec() {
774   if (dctx_) {
775     LZ4F_freeDecompressionContext(dctx_);
776   }
777 }
778
779 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
780   // LZ4 Frame compression doesn't support streaming so we have to coalesce
781   IOBuf clone;
782   if (data->isChained()) {
783     clone = data->cloneCoalescedAsValue();
784     data = &clone;
785   }
786   // Set preferences
787   const auto uncompressedLength = data->length();
788   LZ4F_preferences_t prefs{};
789   prefs.compressionLevel = level_;
790   prefs.frameInfo.contentSize = uncompressedLength;
791   // Compress
792   auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
793   const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
794       buf->writableTail(),
795       buf->tailroom(),
796       data->data(),
797       data->length(),
798       &prefs));
799   buf->append(written);
800   return buf;
801 }
802
803 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
804     const IOBuf* data,
805     Optional<uint64_t> uncompressedLength) {
806   // Reset the dctx if any errors have occurred
807   resetDCtx();
808   // Coalesce the data
809   ByteRange in = *data->begin();
810   IOBuf clone;
811   if (data->isChained()) {
812     clone = data->cloneCoalescedAsValue();
813     in = clone.coalesce();
814   }
815   data = nullptr;
816   // Select decompression options
817   LZ4F_decompressOptions_t options;
818   options.stableDst = 1;
819   // Select blockSize and growthSize for the IOBufQueue
820   IOBufQueue queue(IOBufQueue::cacheChainLength());
821   auto blockSize = uint64_t{64} << 10;
822   auto growthSize = uint64_t{4} << 20;
823   if (uncompressedLength) {
824     // Allocate uncompressedLength in one chunk (up to 64 MB)
825     const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20);
826     queue.preallocate(allocateSize, allocateSize);
827     blockSize = std::min(*uncompressedLength, blockSize);
828     growthSize = std::min(*uncompressedLength, growthSize);
829   } else {
830     // Reduce growthSize for small data
831     const auto guessUncompressedLen =
832         4 * std::max<uint64_t>(blockSize, in.size());
833     growthSize = std::min(guessUncompressedLen, growthSize);
834   }
835   // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
836   // returns 0
837   dirty_ = true;
838   // Decompress until the frame is over
839   size_t code = 0;
840   do {
841     // Allocate enough space to decompress at least a block
842     void* out;
843     size_t outSize;
844     std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
845     // Decompress
846     size_t inSize = in.size();
847     code = lz4FrameThrowOnError(
848         LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
849     if (in.empty() && outSize == 0 && code != 0) {
850       // We passed no input, no output was produced, and the frame isn't over
851       // No more forward progress is possible
852       throw std::runtime_error("LZ4Frame error: Incomplete frame");
853     }
854     in.uncheckedAdvance(inSize);
855     queue.postallocate(outSize);
856   } while (code != 0);
857   // At this point the decompression context can be reused
858   dirty_ = false;
859   if (uncompressedLength && queue.chainLength() != *uncompressedLength) {
860     throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
861   }
862   return queue.move();
863 }
864
865 #endif // LZ4_VERSION_NUMBER >= 10301
866 #endif // FOLLY_HAVE_LIBLZ4
867
868 #if FOLLY_HAVE_LIBSNAPPY
869
870 /**
871  * Snappy compression
872  */
873
874 /**
875  * Implementation of snappy::Source that reads from a IOBuf chain.
876  */
877 class IOBufSnappySource final : public snappy::Source {
878  public:
879   explicit IOBufSnappySource(const IOBuf* data);
880   size_t Available() const override;
881   const char* Peek(size_t* len) override;
882   void Skip(size_t n) override;
883  private:
884   size_t available_;
885   io::Cursor cursor_;
886 };
887
888 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
889   : available_(data->computeChainDataLength()),
890     cursor_(data) {
891 }
892
893 size_t IOBufSnappySource::Available() const {
894   return available_;
895 }
896
897 const char* IOBufSnappySource::Peek(size_t* len) {
898   auto sp = StringPiece{cursor_.peekBytes()};
899   *len = sp.size();
900   return sp.data();
901 }
902
903 void IOBufSnappySource::Skip(size_t n) {
904   CHECK_LE(n, available_);
905   cursor_.skip(n);
906   available_ -= n;
907 }
908
909 class SnappyCodec final : public Codec {
910  public:
911   static std::unique_ptr<Codec> create(int level, CodecType type);
912   explicit SnappyCodec(int level, CodecType type);
913
914  private:
915   uint64_t doMaxUncompressedLength() const override;
916   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
917   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
918   std::unique_ptr<IOBuf> doUncompress(
919       const IOBuf* data,
920       Optional<uint64_t> uncompressedLength) override;
921 };
922
923 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
924   return std::make_unique<SnappyCodec>(level, type);
925 }
926
927 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
928   DCHECK(type == CodecType::SNAPPY);
929   switch (level) {
930     case COMPRESSION_LEVEL_FASTEST:
931     case COMPRESSION_LEVEL_DEFAULT:
932     case COMPRESSION_LEVEL_BEST:
933       level = 1;
934   }
935   if (level != 1) {
936     throw std::invalid_argument(to<std::string>(
937         "SnappyCodec: invalid level: ", level));
938   }
939 }
940
941 uint64_t SnappyCodec::doMaxUncompressedLength() const {
942   // snappy.h uses uint32_t for lengths, so there's that.
943   return std::numeric_limits<uint32_t>::max();
944 }
945
946 uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
947   return snappy::MaxCompressedLength(uncompressedLength);
948 }
949
950 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
951   IOBufSnappySource source(data);
952   auto out = IOBuf::create(maxCompressedLength(source.Available()));
953
954   snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
955       out->writableTail()));
956
957   size_t n = snappy::Compress(&source, &sink);
958
959   CHECK_LE(n, out->capacity());
960   out->append(n);
961   return out;
962 }
963
964 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(
965     const IOBuf* data,
966     Optional<uint64_t> uncompressedLength) {
967   uint32_t actualUncompressedLength = 0;
968
969   {
970     IOBufSnappySource source(data);
971     if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
972       throw std::runtime_error("snappy::GetUncompressedLength failed");
973     }
974     if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
975       throw std::runtime_error("snappy: invalid uncompressed length");
976     }
977   }
978
979   auto out = IOBuf::create(actualUncompressedLength);
980
981   {
982     IOBufSnappySource source(data);
983     if (!snappy::RawUncompress(&source,
984                                reinterpret_cast<char*>(out->writableTail()))) {
985       throw std::runtime_error("snappy::RawUncompress failed");
986     }
987   }
988
989   out->append(actualUncompressedLength);
990   return out;
991 }
992
993 #endif  // FOLLY_HAVE_LIBSNAPPY
994
995 #if FOLLY_HAVE_LIBLZMA
996
997 /**
998  * LZMA2 compression
999  */
1000 class LZMA2StreamCodec final : public StreamCodec {
1001  public:
1002   static std::unique_ptr<Codec> createCodec(int level, CodecType type);
1003   static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
1004   explicit LZMA2StreamCodec(int level, CodecType type);
1005   ~LZMA2StreamCodec() override;
1006
1007   std::vector<std::string> validPrefixes() const override;
1008   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1009       const override;
1010
1011  private:
1012   bool doNeedsDataLength() const override;
1013   uint64_t doMaxUncompressedLength() const override;
1014   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1015
1016   bool encodeSize() const {
1017     return type() == CodecType::LZMA2_VARINT_SIZE;
1018   }
1019
1020   void doResetStream() override;
1021   bool doCompressStream(
1022       ByteRange& input,
1023       MutableByteRange& output,
1024       StreamCodec::FlushOp flushOp) override;
1025   bool doUncompressStream(
1026       ByteRange& input,
1027       MutableByteRange& output,
1028       StreamCodec::FlushOp flushOp) override;
1029
1030   void resetCStream();
1031   void resetDStream();
1032
1033   size_t decodeVarint(ByteRange& input);
1034   bool flushVarintBuffer(MutableByteRange& output);
1035   void resetVarintBuffer();
1036
1037   Optional<lzma_stream> cstream_{};
1038   Optional<lzma_stream> dstream_{};
1039
1040   std::array<uint8_t, kMaxVarintLength64> varintBuffer_;
1041   ByteRange varintToEncode_;
1042   size_t varintBufferPos_{0};
1043
1044   int level_;
1045   bool needReset_{true};
1046   bool needDecodeSize_{false};
1047 };
1048
1049 static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD;
1050 static constexpr unsigned kLZMA2MagicBytes = 6;
1051
1052 std::vector<std::string> LZMA2StreamCodec::validPrefixes() const {
1053   if (type() == CodecType::LZMA2_VARINT_SIZE) {
1054     return {};
1055   }
1056   return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)};
1057 }
1058
1059 bool LZMA2StreamCodec::doNeedsDataLength() const {
1060   return encodeSize();
1061 }
1062
1063 bool LZMA2StreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1064     const {
1065   if (type() == CodecType::LZMA2_VARINT_SIZE) {
1066     return false;
1067   }
1068   // Returns false for all inputs less than 8 bytes.
1069   // This is okay, because no valid LZMA2 streams are less than 8 bytes.
1070   return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes);
1071 }
1072
1073 std::unique_ptr<Codec> LZMA2StreamCodec::createCodec(
1074     int level,
1075     CodecType type) {
1076   return make_unique<LZMA2StreamCodec>(level, type);
1077 }
1078
1079 std::unique_ptr<StreamCodec> LZMA2StreamCodec::createStream(
1080     int level,
1081     CodecType type) {
1082   return make_unique<LZMA2StreamCodec>(level, type);
1083 }
1084
1085 LZMA2StreamCodec::LZMA2StreamCodec(int level, CodecType type)
1086     : StreamCodec(type) {
1087   DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
1088   switch (level) {
1089     case COMPRESSION_LEVEL_FASTEST:
1090       level = 0;
1091       break;
1092     case COMPRESSION_LEVEL_DEFAULT:
1093       level = LZMA_PRESET_DEFAULT;
1094       break;
1095     case COMPRESSION_LEVEL_BEST:
1096       level = 9;
1097       break;
1098   }
1099   if (level < 0 || level > 9) {
1100     throw std::invalid_argument(
1101         to<std::string>("LZMA2Codec: invalid level: ", level));
1102   }
1103   level_ = level;
1104 }
1105
1106 LZMA2StreamCodec::~LZMA2StreamCodec() {
1107   if (cstream_) {
1108     lzma_end(cstream_.get_pointer());
1109     cstream_.clear();
1110   }
1111   if (dstream_) {
1112     lzma_end(dstream_.get_pointer());
1113     dstream_.clear();
1114   }
1115 }
1116
1117 uint64_t LZMA2StreamCodec::doMaxUncompressedLength() const {
1118   // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
1119   return uint64_t(1) << 63;
1120 }
1121
1122 uint64_t LZMA2StreamCodec::doMaxCompressedLength(
1123     uint64_t uncompressedLength) const {
1124   return lzma_stream_buffer_bound(uncompressedLength) +
1125       (encodeSize() ? kMaxVarintLength64 : 0);
1126 }
1127
1128 void LZMA2StreamCodec::doResetStream() {
1129   needReset_ = true;
1130 }
1131
1132 void LZMA2StreamCodec::resetCStream() {
1133   if (!cstream_) {
1134     cstream_.assign(LZMA_STREAM_INIT);
1135   }
1136   lzma_ret const rc =
1137       lzma_easy_encoder(cstream_.get_pointer(), level_, LZMA_CHECK_NONE);
1138   if (rc != LZMA_OK) {
1139     throw std::runtime_error(folly::to<std::string>(
1140         "LZMA2StreamCodec: lzma_easy_encoder error: ", rc));
1141   }
1142 }
1143
1144 void LZMA2StreamCodec::resetDStream() {
1145   if (!dstream_) {
1146     dstream_.assign(LZMA_STREAM_INIT);
1147   }
1148   lzma_ret const rc = lzma_auto_decoder(
1149       dstream_.get_pointer(), std::numeric_limits<uint64_t>::max(), 0);
1150   if (rc != LZMA_OK) {
1151     throw std::runtime_error(folly::to<std::string>(
1152         "LZMA2StreamCodec: lzma_auto_decoder error: ", rc));
1153   }
1154 }
1155
1156 static lzma_ret lzmaThrowOnError(lzma_ret const rc) {
1157   switch (rc) {
1158     case LZMA_OK:
1159     case LZMA_STREAM_END:
1160     case LZMA_BUF_ERROR: // not fatal: returned if no progress was made twice
1161       return rc;
1162     default:
1163       throw std::runtime_error(
1164           to<std::string>("LZMA2StreamCodec: error: ", rc));
1165   }
1166 }
1167
1168 static lzma_action lzmaTranslateFlush(StreamCodec::FlushOp flush) {
1169   switch (flush) {
1170     case StreamCodec::FlushOp::NONE:
1171       return LZMA_RUN;
1172     case StreamCodec::FlushOp::FLUSH:
1173       return LZMA_SYNC_FLUSH;
1174     case StreamCodec::FlushOp::END:
1175       return LZMA_FINISH;
1176     default:
1177       throw std::invalid_argument("LZMA2StreamCodec: Invalid flush");
1178   }
1179 }
1180
1181 /**
1182  * Flushes the varint buffer.
1183  * Advances output by the number of bytes written.
1184  * Returns true when flushing is complete.
1185  */
1186 bool LZMA2StreamCodec::flushVarintBuffer(MutableByteRange& output) {
1187   if (varintToEncode_.empty()) {
1188     return true;
1189   }
1190   const size_t numBytesToCopy = std::min(varintToEncode_.size(), output.size());
1191   if (numBytesToCopy > 0) {
1192     memcpy(output.data(), varintToEncode_.data(), numBytesToCopy);
1193   }
1194   varintToEncode_.advance(numBytesToCopy);
1195   output.advance(numBytesToCopy);
1196   return varintToEncode_.empty();
1197 }
1198
1199 bool LZMA2StreamCodec::doCompressStream(
1200     ByteRange& input,
1201     MutableByteRange& output,
1202     StreamCodec::FlushOp flushOp) {
1203   if (needReset_) {
1204     resetCStream();
1205     if (encodeSize()) {
1206       varintBufferPos_ = 0;
1207       size_t const varintSize =
1208           encodeVarint(*uncompressedLength(), varintBuffer_.data());
1209       varintToEncode_ = {varintBuffer_.data(), varintSize};
1210     }
1211     needReset_ = false;
1212   }
1213
1214   if (!flushVarintBuffer(output)) {
1215     return false;
1216   }
1217
1218   cstream_->next_in = const_cast<uint8_t*>(input.data());
1219   cstream_->avail_in = input.size();
1220   cstream_->next_out = output.data();
1221   cstream_->avail_out = output.size();
1222   SCOPE_EXIT {
1223     input.uncheckedAdvance(input.size() - cstream_->avail_in);
1224     output.uncheckedAdvance(output.size() - cstream_->avail_out);
1225   };
1226   lzma_ret const rc = lzmaThrowOnError(
1227       lzma_code(cstream_.get_pointer(), lzmaTranslateFlush(flushOp)));
1228   switch (flushOp) {
1229     case StreamCodec::FlushOp::NONE:
1230       return false;
1231     case StreamCodec::FlushOp::FLUSH:
1232       return cstream_->avail_in == 0 && cstream_->avail_out != 0;
1233     case StreamCodec::FlushOp::END:
1234       return rc == LZMA_STREAM_END;
1235     default:
1236       throw std::invalid_argument("LZMA2StreamCodec: invalid FlushOp");
1237   }
1238 }
1239
1240 /**
1241  * Attempts to decode a varint from input.
1242  * The function advances input by the number of bytes read.
1243  *
1244  * If there are too many bytes and the varint is not valid, throw a
1245  * runtime_error.
1246  * Returns the decoded size or 0 if more bytes are needed.
1247  */
1248 size_t LZMA2StreamCodec::decodeVarint(ByteRange& input) {
1249   if (input.empty()) {
1250     return 0;
1251   }
1252   size_t const numBytesToCopy =
1253       std::min(kMaxVarintLength64 - varintBufferPos_, input.size());
1254   memcpy(varintBuffer_.data() + varintBufferPos_, input.data(), numBytesToCopy);
1255
1256   size_t const rangeSize = varintBufferPos_ + numBytesToCopy;
1257   ByteRange range{varintBuffer_.data(), rangeSize};
1258   auto const ret = tryDecodeVarint(range);
1259
1260   if (ret.hasValue()) {
1261     size_t const varintSize = rangeSize - range.size();
1262     input.advance(varintSize - varintBufferPos_);
1263     return ret.value();
1264   } else if (ret.error() == DecodeVarintError::TooManyBytes) {
1265     throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1266   } else {
1267     // Too few bytes
1268     input.advance(numBytesToCopy);
1269     varintBufferPos_ += numBytesToCopy;
1270     return 0;
1271   }
1272 }
1273
1274 bool LZMA2StreamCodec::doUncompressStream(
1275     ByteRange& input,
1276     MutableByteRange& output,
1277     StreamCodec::FlushOp flushOp) {
1278   if (needReset_) {
1279     resetDStream();
1280     needReset_ = false;
1281     needDecodeSize_ = encodeSize();
1282     if (encodeSize()) {
1283       // Reset buffer
1284       varintBufferPos_ = 0;
1285     }
1286   }
1287
1288   if (needDecodeSize_) {
1289     // Try decoding the varint. If the input does not contain the entire varint,
1290     // buffer the input. If the varint can not be decoded, fail.
1291     size_t const size = decodeVarint(input);
1292     if (!size) {
1293       return false;
1294     }
1295     if (uncompressedLength() && *uncompressedLength() != size) {
1296       throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1297     }
1298     needDecodeSize_ = false;
1299   }
1300
1301   dstream_->next_in = const_cast<uint8_t*>(input.data());
1302   dstream_->avail_in = input.size();
1303   dstream_->next_out = output.data();
1304   dstream_->avail_out = output.size();
1305   SCOPE_EXIT {
1306     input.advance(input.size() - dstream_->avail_in);
1307     output.advance(output.size() - dstream_->avail_out);
1308   };
1309
1310   lzma_ret rc;
1311   switch (flushOp) {
1312     case StreamCodec::FlushOp::NONE:
1313     case StreamCodec::FlushOp::FLUSH:
1314       rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_RUN));
1315       break;
1316     case StreamCodec::FlushOp::END:
1317       rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_FINISH));
1318       break;
1319     default:
1320       throw std::invalid_argument("LZMA2StreamCodec: invalid flush");
1321   }
1322   return rc == LZMA_STREAM_END;
1323 }
1324 #endif // FOLLY_HAVE_LIBLZMA
1325
1326 #ifdef FOLLY_HAVE_LIBZSTD
1327
1328 namespace {
1329 void zstdFreeCStream(ZSTD_CStream* zcs) {
1330   ZSTD_freeCStream(zcs);
1331 }
1332
1333 void zstdFreeDStream(ZSTD_DStream* zds) {
1334   ZSTD_freeDStream(zds);
1335 }
1336 }
1337
1338 /**
1339  * ZSTD compression
1340  */
1341 class ZSTDStreamCodec final : public StreamCodec {
1342  public:
1343   static std::unique_ptr<Codec> createCodec(int level, CodecType);
1344   static std::unique_ptr<StreamCodec> createStream(int level, CodecType);
1345   explicit ZSTDStreamCodec(int level, CodecType type);
1346
1347   std::vector<std::string> validPrefixes() const override;
1348   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1349       const override;
1350
1351  private:
1352   bool doNeedsUncompressedLength() const override;
1353   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1354   Optional<uint64_t> doGetUncompressedLength(
1355       IOBuf const* data,
1356       Optional<uint64_t> uncompressedLength) const override;
1357
1358   void doResetStream() override;
1359   bool doCompressStream(
1360       ByteRange& input,
1361       MutableByteRange& output,
1362       StreamCodec::FlushOp flushOp) override;
1363   bool doUncompressStream(
1364       ByteRange& input,
1365       MutableByteRange& output,
1366       StreamCodec::FlushOp flushOp) override;
1367
1368   void resetCStream();
1369   void resetDStream();
1370
1371   bool tryBlockCompress(ByteRange& input, MutableByteRange& output) const;
1372   bool tryBlockUncompress(ByteRange& input, MutableByteRange& output) const;
1373
1374   int level_;
1375   bool needReset_{true};
1376   std::unique_ptr<
1377       ZSTD_CStream,
1378       folly::static_function_deleter<ZSTD_CStream, &zstdFreeCStream>>
1379       cstream_{nullptr};
1380   std::unique_ptr<
1381       ZSTD_DStream,
1382       folly::static_function_deleter<ZSTD_DStream, &zstdFreeDStream>>
1383       dstream_{nullptr};
1384 };
1385
1386 static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528;
1387
1388 std::vector<std::string> ZSTDStreamCodec::validPrefixes() const {
1389   return {prefixToStringLE(kZSTDMagicLE)};
1390 }
1391
1392 bool ZSTDStreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1393     const {
1394   return dataStartsWithLE(data, kZSTDMagicLE);
1395 }
1396
1397 std::unique_ptr<Codec> ZSTDStreamCodec::createCodec(int level, CodecType type) {
1398   return make_unique<ZSTDStreamCodec>(level, type);
1399 }
1400
1401 std::unique_ptr<StreamCodec> ZSTDStreamCodec::createStream(
1402     int level,
1403     CodecType type) {
1404   return make_unique<ZSTDStreamCodec>(level, type);
1405 }
1406
1407 ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
1408     : StreamCodec(type) {
1409   DCHECK(type == CodecType::ZSTD);
1410   switch (level) {
1411     case COMPRESSION_LEVEL_FASTEST:
1412       level = 1;
1413       break;
1414     case COMPRESSION_LEVEL_DEFAULT:
1415       level = 1;
1416       break;
1417     case COMPRESSION_LEVEL_BEST:
1418       level = 19;
1419       break;
1420   }
1421   if (level < 1 || level > ZSTD_maxCLevel()) {
1422     throw std::invalid_argument(
1423         to<std::string>("ZSTD: invalid level: ", level));
1424   }
1425   level_ = level;
1426 }
1427
1428 bool ZSTDStreamCodec::doNeedsUncompressedLength() const {
1429   return false;
1430 }
1431
1432 uint64_t ZSTDStreamCodec::doMaxCompressedLength(
1433     uint64_t uncompressedLength) const {
1434   return ZSTD_compressBound(uncompressedLength);
1435 }
1436
1437 void zstdThrowIfError(size_t rc) {
1438   if (!ZSTD_isError(rc)) {
1439     return;
1440   }
1441   throw std::runtime_error(
1442       to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1443 }
1444
1445 Optional<uint64_t> ZSTDStreamCodec::doGetUncompressedLength(
1446     IOBuf const* data,
1447     Optional<uint64_t> uncompressedLength) const {
1448   // Read decompressed size from frame if available in first IOBuf.
1449   auto const decompressedSize =
1450       ZSTD_getDecompressedSize(data->data(), data->length());
1451   if (decompressedSize != 0) {
1452     if (uncompressedLength && *uncompressedLength != decompressedSize) {
1453       throw std::runtime_error("ZSTD: invalid uncompressed length");
1454     }
1455     uncompressedLength = decompressedSize;
1456   }
1457   return uncompressedLength;
1458 }
1459
1460 void ZSTDStreamCodec::doResetStream() {
1461   needReset_ = true;
1462 }
1463
1464 bool ZSTDStreamCodec::tryBlockCompress(
1465     ByteRange& input,
1466     MutableByteRange& output) const {
1467   DCHECK(needReset_);
1468   // We need to know that we have enough output space to use block compression
1469   if (output.size() < ZSTD_compressBound(input.size())) {
1470     return false;
1471   }
1472   size_t const length = ZSTD_compress(
1473       output.data(), output.size(), input.data(), input.size(), level_);
1474   zstdThrowIfError(length);
1475   input.uncheckedAdvance(input.size());
1476   output.uncheckedAdvance(length);
1477   return true;
1478 }
1479
1480 void ZSTDStreamCodec::resetCStream() {
1481   if (!cstream_) {
1482     cstream_.reset(ZSTD_createCStream());
1483     if (!cstream_) {
1484       throw std::bad_alloc{};
1485     }
1486   }
1487   // Advanced API usage works for all supported versions of zstd.
1488   // Required to set contentSizeFlag.
1489   auto params = ZSTD_getParams(level_, uncompressedLength().value_or(0), 0);
1490   params.fParams.contentSizeFlag = uncompressedLength().hasValue();
1491   zstdThrowIfError(ZSTD_initCStream_advanced(
1492       cstream_.get(), nullptr, 0, params, uncompressedLength().value_or(0)));
1493 }
1494
1495 bool ZSTDStreamCodec::doCompressStream(
1496     ByteRange& input,
1497     MutableByteRange& output,
1498     StreamCodec::FlushOp flushOp) {
1499   if (needReset_) {
1500     // If we are given all the input in one chunk try to use block compression
1501     if (flushOp == StreamCodec::FlushOp::END &&
1502         tryBlockCompress(input, output)) {
1503       return true;
1504     }
1505     resetCStream();
1506     needReset_ = false;
1507   }
1508   ZSTD_inBuffer in = {input.data(), input.size(), 0};
1509   ZSTD_outBuffer out = {output.data(), output.size(), 0};
1510   SCOPE_EXIT {
1511     input.uncheckedAdvance(in.pos);
1512     output.uncheckedAdvance(out.pos);
1513   };
1514   if (flushOp == StreamCodec::FlushOp::NONE || !input.empty()) {
1515     zstdThrowIfError(ZSTD_compressStream(cstream_.get(), &out, &in));
1516   }
1517   if (in.pos == in.size && flushOp != StreamCodec::FlushOp::NONE) {
1518     size_t rc;
1519     switch (flushOp) {
1520       case StreamCodec::FlushOp::FLUSH:
1521         rc = ZSTD_flushStream(cstream_.get(), &out);
1522         break;
1523       case StreamCodec::FlushOp::END:
1524         rc = ZSTD_endStream(cstream_.get(), &out);
1525         break;
1526       default:
1527         throw std::invalid_argument("ZSTD: invalid FlushOp");
1528     }
1529     zstdThrowIfError(rc);
1530     if (rc == 0) {
1531       return true;
1532     }
1533   }
1534   return false;
1535 }
1536
1537 bool ZSTDStreamCodec::tryBlockUncompress(
1538     ByteRange& input,
1539     MutableByteRange& output) const {
1540   DCHECK(needReset_);
1541 #if ZSTD_VERSION_NUMBER < 10104
1542   // We require ZSTD_findFrameCompressedSize() to perform this optimization.
1543   return false;
1544 #else
1545   // We need to know the uncompressed length and have enough output space.
1546   if (!uncompressedLength() || output.size() < *uncompressedLength()) {
1547     return false;
1548   }
1549   size_t const compressedLength =
1550       ZSTD_findFrameCompressedSize(input.data(), input.size());
1551   zstdThrowIfError(compressedLength);
1552   size_t const length = ZSTD_decompress(
1553       output.data(), *uncompressedLength(), input.data(), compressedLength);
1554   zstdThrowIfError(length);
1555   if (length != *uncompressedLength()) {
1556     throw std::runtime_error("ZSTDStreamCodec: Incorrect uncompressed length");
1557   }
1558   input.uncheckedAdvance(compressedLength);
1559   output.uncheckedAdvance(length);
1560   return true;
1561 #endif
1562 }
1563
1564 void ZSTDStreamCodec::resetDStream() {
1565   if (!dstream_) {
1566     dstream_.reset(ZSTD_createDStream());
1567     if (!dstream_) {
1568       throw std::bad_alloc{};
1569     }
1570   }
1571   zstdThrowIfError(ZSTD_initDStream(dstream_.get()));
1572 }
1573
1574 bool ZSTDStreamCodec::doUncompressStream(
1575     ByteRange& input,
1576     MutableByteRange& output,
1577     StreamCodec::FlushOp flushOp) {
1578   if (needReset_) {
1579     // If we are given all the input in one chunk try to use block uncompression
1580     if (flushOp == StreamCodec::FlushOp::END &&
1581         tryBlockUncompress(input, output)) {
1582       return true;
1583     }
1584     resetDStream();
1585     needReset_ = false;
1586   }
1587   ZSTD_inBuffer in = {input.data(), input.size(), 0};
1588   ZSTD_outBuffer out = {output.data(), output.size(), 0};
1589   SCOPE_EXIT {
1590     input.uncheckedAdvance(in.pos);
1591     output.uncheckedAdvance(out.pos);
1592   };
1593   size_t const rc = ZSTD_decompressStream(dstream_.get(), &out, &in);
1594   zstdThrowIfError(rc);
1595   return rc == 0;
1596 }
1597
1598 #endif // FOLLY_HAVE_LIBZSTD
1599
1600 #if FOLLY_HAVE_LIBBZ2
1601
1602 class Bzip2Codec final : public Codec {
1603  public:
1604   static std::unique_ptr<Codec> create(int level, CodecType type);
1605   explicit Bzip2Codec(int level, CodecType type);
1606
1607   std::vector<std::string> validPrefixes() const override;
1608   bool canUncompress(IOBuf const* data, Optional<uint64_t> uncompressedLength)
1609       const override;
1610
1611  private:
1612   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1613   std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
1614   std::unique_ptr<IOBuf> doUncompress(
1615       IOBuf const* data,
1616       Optional<uint64_t> uncompressedLength) override;
1617
1618   int level_;
1619 };
1620
1621 /* static */ std::unique_ptr<Codec> Bzip2Codec::create(
1622     int level,
1623     CodecType type) {
1624   return std::make_unique<Bzip2Codec>(level, type);
1625 }
1626
1627 Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
1628   DCHECK(type == CodecType::BZIP2);
1629   switch (level) {
1630     case COMPRESSION_LEVEL_FASTEST:
1631       level = 1;
1632       break;
1633     case COMPRESSION_LEVEL_DEFAULT:
1634       level = 9;
1635       break;
1636     case COMPRESSION_LEVEL_BEST:
1637       level = 9;
1638       break;
1639   }
1640   if (level < 1 || level > 9) {
1641     throw std::invalid_argument(
1642         to<std::string>("Bzip2: invalid level: ", level));
1643   }
1644   level_ = level;
1645 }
1646
1647 static uint32_t constexpr kBzip2MagicLE = 0x685a42;
1648 static uint64_t constexpr kBzip2MagicBytes = 3;
1649
1650 std::vector<std::string> Bzip2Codec::validPrefixes() const {
1651   return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
1652 }
1653
1654 bool Bzip2Codec::canUncompress(IOBuf const* data, Optional<uint64_t>) const {
1655   return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
1656 }
1657
1658 uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1659   // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
1660   //   To guarantee that the compressed data will fit in its buffer, allocate an
1661   //   output buffer of size 1% larger than the uncompressed data, plus six
1662   //   hundred extra bytes.
1663   return uncompressedLength + uncompressedLength / 100 + 600;
1664 }
1665
1666 static bz_stream createBzStream() {
1667   bz_stream stream;
1668   stream.bzalloc = nullptr;
1669   stream.bzfree = nullptr;
1670   stream.opaque = nullptr;
1671   stream.next_in = stream.next_out = nullptr;
1672   stream.avail_in = stream.avail_out = 0;
1673   return stream;
1674 }
1675
1676 // Throws on error condition, otherwise returns the code.
1677 static int bzCheck(int const rc) {
1678   switch (rc) {
1679     case BZ_OK:
1680     case BZ_RUN_OK:
1681     case BZ_FLUSH_OK:
1682     case BZ_FINISH_OK:
1683     case BZ_STREAM_END:
1684       return rc;
1685     default:
1686       throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
1687   }
1688 }
1689
1690 static std::unique_ptr<IOBuf> addOutputBuffer(
1691     bz_stream* stream,
1692     uint64_t const bufferLength) {
1693   DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
1694   DCHECK_EQ(stream->avail_out, 0);
1695
1696   auto buf = IOBuf::create(bufferLength);
1697   buf->append(buf->capacity());
1698
1699   stream->next_out = reinterpret_cast<char*>(buf->writableData());
1700   stream->avail_out = buf->length();
1701
1702   return buf;
1703 }
1704
1705 std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
1706   bz_stream stream = createBzStream();
1707   bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
1708   SCOPE_EXIT {
1709     bzCheck(BZ2_bzCompressEnd(&stream));
1710   };
1711
1712   uint64_t const uncompressedLength = data->computeChainDataLength();
1713   uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
1714   uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1715   uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
1716
1717   auto out = addOutputBuffer(
1718       &stream,
1719       maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
1720                                                : kDefaultBufferLength);
1721
1722   for (auto range : *data) {
1723     while (!range.empty()) {
1724       auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1725       stream.next_in =
1726           const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1727       stream.avail_in = inSize;
1728
1729       if (stream.avail_out == 0) {
1730         out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1731       }
1732
1733       bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
1734       range.uncheckedAdvance(inSize - stream.avail_in);
1735     }
1736   }
1737   do {
1738     if (stream.avail_out == 0) {
1739       out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1740     }
1741   } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
1742
1743   out->prev()->trimEnd(stream.avail_out);
1744
1745   return out;
1746 }
1747
1748 std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
1749     const IOBuf* data,
1750     Optional<uint64_t> uncompressedLength) {
1751   bz_stream stream = createBzStream();
1752   bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
1753   SCOPE_EXIT {
1754     bzCheck(BZ2_bzDecompressEnd(&stream));
1755   };
1756
1757   uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1758   uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
1759   uint64_t const kDefaultBufferLength =
1760       computeBufferLength(data->computeChainDataLength(), kBlockSize);
1761
1762   auto out = addOutputBuffer(
1763       &stream,
1764       ((uncompressedLength && *uncompressedLength <= kMaxSingleStepLength)
1765            ? *uncompressedLength
1766            : kDefaultBufferLength));
1767
1768   int rc = BZ_OK;
1769   for (auto range : *data) {
1770     while (!range.empty()) {
1771       auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1772       stream.next_in =
1773           const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1774       stream.avail_in = inSize;
1775
1776       if (stream.avail_out == 0) {
1777         out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1778       }
1779
1780       rc = bzCheck(BZ2_bzDecompress(&stream));
1781       range.uncheckedAdvance(inSize - stream.avail_in);
1782     }
1783   }
1784   while (rc != BZ_STREAM_END) {
1785     if (stream.avail_out == 0) {
1786       out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1787     }
1788     size_t const outputSize = stream.avail_out;
1789     rc = bzCheck(BZ2_bzDecompress(&stream));
1790     if (outputSize == stream.avail_out) {
1791       throw std::runtime_error("Bzip2Codec: Truncated input");
1792     }
1793   }
1794
1795   out->prev()->trimEnd(stream.avail_out);
1796
1797   uint64_t const totalOut =
1798       (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
1799   if (uncompressedLength && uncompressedLength != totalOut) {
1800     throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
1801   }
1802
1803   return out;
1804 }
1805
1806 #endif // FOLLY_HAVE_LIBBZ2
1807
1808 #if FOLLY_HAVE_LIBZ
1809
1810 zlib::Options getZlibOptions(CodecType type) {
1811   DCHECK(type == CodecType::GZIP || type == CodecType::ZLIB);
1812   return type == CodecType::GZIP ? zlib::defaultGzipOptions()
1813                                  : zlib::defaultZlibOptions();
1814 }
1815
1816 std::unique_ptr<Codec> getZlibCodec(int level, CodecType type) {
1817   return zlib::getCodec(getZlibOptions(type), level);
1818 }
1819
1820 std::unique_ptr<StreamCodec> getZlibStreamCodec(int level, CodecType type) {
1821   return zlib::getStreamCodec(getZlibOptions(type), level);
1822 }
1823
1824 #endif // FOLLY_HAVE_LIBZ
1825
1826 /**
1827  * Automatic decompression
1828  */
1829 class AutomaticCodec final : public Codec {
1830  public:
1831   static std::unique_ptr<Codec> create(
1832       std::vector<std::unique_ptr<Codec>> customCodecs);
1833   explicit AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs);
1834
1835   std::vector<std::string> validPrefixes() const override;
1836   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1837       const override;
1838
1839  private:
1840   bool doNeedsUncompressedLength() const override;
1841   uint64_t doMaxUncompressedLength() const override;
1842
1843   uint64_t doMaxCompressedLength(uint64_t) const override {
1844     throw std::runtime_error(
1845         "AutomaticCodec error: maxCompressedLength() not supported.");
1846   }
1847   std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
1848     throw std::runtime_error("AutomaticCodec error: compress() not supported.");
1849   }
1850   std::unique_ptr<IOBuf> doUncompress(
1851       const IOBuf* data,
1852       Optional<uint64_t> uncompressedLength) override;
1853
1854   void addCodecIfSupported(CodecType type);
1855
1856   // Throws iff the codecs aren't compatible (very slow)
1857   void checkCompatibleCodecs() const;
1858
1859   std::vector<std::unique_ptr<Codec>> codecs_;
1860   bool needsUncompressedLength_;
1861   uint64_t maxUncompressedLength_;
1862 };
1863
1864 std::vector<std::string> AutomaticCodec::validPrefixes() const {
1865   std::unordered_set<std::string> prefixes;
1866   for (const auto& codec : codecs_) {
1867     const auto codecPrefixes = codec->validPrefixes();
1868     prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
1869   }
1870   return std::vector<std::string>{prefixes.begin(), prefixes.end()};
1871 }
1872
1873 bool AutomaticCodec::canUncompress(
1874     const IOBuf* data,
1875     Optional<uint64_t> uncompressedLength) const {
1876   return std::any_of(
1877       codecs_.begin(),
1878       codecs_.end(),
1879       [data, uncompressedLength](std::unique_ptr<Codec> const& codec) {
1880         return codec->canUncompress(data, uncompressedLength);
1881       });
1882 }
1883
1884 void AutomaticCodec::addCodecIfSupported(CodecType type) {
1885   const bool present = std::any_of(
1886       codecs_.begin(),
1887       codecs_.end(),
1888       [&type](std::unique_ptr<Codec> const& codec) {
1889         return codec->type() == type;
1890       });
1891   if (hasCodec(type) && !present) {
1892     codecs_.push_back(getCodec(type));
1893   }
1894 }
1895
1896 /* static */ std::unique_ptr<Codec> AutomaticCodec::create(
1897     std::vector<std::unique_ptr<Codec>> customCodecs) {
1898   return std::make_unique<AutomaticCodec>(std::move(customCodecs));
1899 }
1900
1901 AutomaticCodec::AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs)
1902     : Codec(CodecType::USER_DEFINED), codecs_(std::move(customCodecs)) {
1903   // Fastest -> slowest
1904   addCodecIfSupported(CodecType::LZ4_FRAME);
1905   addCodecIfSupported(CodecType::ZSTD);
1906   addCodecIfSupported(CodecType::ZLIB);
1907   addCodecIfSupported(CodecType::GZIP);
1908   addCodecIfSupported(CodecType::LZMA2);
1909   addCodecIfSupported(CodecType::BZIP2);
1910   if (kIsDebug) {
1911     checkCompatibleCodecs();
1912   }
1913   // Check that none of the codes are are null
1914   DCHECK(std::none_of(
1915       codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1916         return codec == nullptr;
1917       }));
1918
1919   needsUncompressedLength_ = std::any_of(
1920       codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1921         return codec->needsUncompressedLength();
1922       });
1923
1924   const auto it = std::max_element(
1925       codecs_.begin(),
1926       codecs_.end(),
1927       [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) {
1928         return lhs->maxUncompressedLength() < rhs->maxUncompressedLength();
1929       });
1930   DCHECK(it != codecs_.end());
1931   maxUncompressedLength_ = (*it)->maxUncompressedLength();
1932 }
1933
1934 void AutomaticCodec::checkCompatibleCodecs() const {
1935   // Keep track of all the possible headers.
1936   std::unordered_set<std::string> headers;
1937   // The empty header is not allowed.
1938   headers.insert("");
1939   // Step 1:
1940   // Construct a set of headers and check that none of the headers occur twice.
1941   // Eliminate edge cases.
1942   for (auto&& codec : codecs_) {
1943     const auto codecHeaders = codec->validPrefixes();
1944     // Codecs without any valid headers are not allowed.
1945     if (codecHeaders.empty()) {
1946       throw std::invalid_argument{
1947           "AutomaticCodec: validPrefixes() must not be empty."};
1948     }
1949     // Insert all the headers for the current codec.
1950     const size_t beforeSize = headers.size();
1951     headers.insert(codecHeaders.begin(), codecHeaders.end());
1952     // Codecs are not compatible if any header occurred twice.
1953     if (beforeSize + codecHeaders.size() != headers.size()) {
1954       throw std::invalid_argument{
1955           "AutomaticCodec: Two valid prefixes collide."};
1956     }
1957   }
1958   // Step 2:
1959   // Check if any strict non-empty prefix of any header is a header.
1960   for (const auto& header : headers) {
1961     for (size_t i = 1; i < header.size(); ++i) {
1962       if (headers.count(header.substr(0, i))) {
1963         throw std::invalid_argument{
1964             "AutomaticCodec: One valid prefix is a prefix of another valid "
1965             "prefix."};
1966       }
1967     }
1968   }
1969 }
1970
1971 bool AutomaticCodec::doNeedsUncompressedLength() const {
1972   return needsUncompressedLength_;
1973 }
1974
1975 uint64_t AutomaticCodec::doMaxUncompressedLength() const {
1976   return maxUncompressedLength_;
1977 }
1978
1979 std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
1980     const IOBuf* data,
1981     Optional<uint64_t> uncompressedLength) {
1982   for (auto&& codec : codecs_) {
1983     if (codec->canUncompress(data, uncompressedLength)) {
1984       return codec->uncompress(data, uncompressedLength);
1985     }
1986   }
1987   throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
1988 }
1989
1990 using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
1991 using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
1992 struct Factory {
1993   CodecFactory codec;
1994   StreamCodecFactory stream;
1995 };
1996
1997 constexpr Factory
1998     codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
1999         {}, // USER_DEFINED
2000         {NoCompressionCodec::create, nullptr},
2001
2002 #if FOLLY_HAVE_LIBLZ4
2003         {LZ4Codec::create, nullptr},
2004 #else
2005         {},
2006 #endif
2007
2008 #if FOLLY_HAVE_LIBSNAPPY
2009         {SnappyCodec::create, nullptr},
2010 #else
2011         {},
2012 #endif
2013
2014 #if FOLLY_HAVE_LIBZ
2015         {getZlibCodec, getZlibStreamCodec},
2016 #else
2017         {},
2018 #endif
2019
2020 #if FOLLY_HAVE_LIBLZ4
2021         {LZ4Codec::create, nullptr},
2022 #else
2023         {},
2024 #endif
2025
2026 #if FOLLY_HAVE_LIBLZMA
2027         {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2028         {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2029 #else
2030         {},
2031         {},
2032 #endif
2033
2034 #if FOLLY_HAVE_LIBZSTD
2035         {ZSTDStreamCodec::createCodec, ZSTDStreamCodec::createStream},
2036 #else
2037         {},
2038 #endif
2039
2040 #if FOLLY_HAVE_LIBZ
2041         {getZlibCodec, getZlibStreamCodec},
2042 #else
2043         {},
2044 #endif
2045
2046 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
2047         {LZ4FrameCodec::create, nullptr},
2048 #else
2049         {},
2050 #endif
2051
2052 #if FOLLY_HAVE_LIBBZ2
2053         {Bzip2Codec::create, nullptr},
2054 #else
2055         {},
2056 #endif
2057 };
2058
2059 Factory const& getFactory(CodecType type) {
2060   size_t const idx = static_cast<size_t>(type);
2061   if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
2062     throw std::invalid_argument(
2063         to<std::string>("Compression type ", idx, " invalid"));
2064   }
2065   return codecFactories[idx];
2066 }
2067 } // namespace
2068
2069 bool hasCodec(CodecType type) {
2070   return getFactory(type).codec != nullptr;
2071 }
2072
2073 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
2074   auto const factory = getFactory(type).codec;
2075   if (!factory) {
2076     throw std::invalid_argument(
2077         to<std::string>("Compression type ", type, " not supported"));
2078   }
2079   auto codec = (*factory)(level, type);
2080   DCHECK(codec->type() == type);
2081   return codec;
2082 }
2083
2084 bool hasStreamCodec(CodecType type) {
2085   return getFactory(type).stream != nullptr;
2086 }
2087
2088 std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
2089   auto const factory = getFactory(type).stream;
2090   if (!factory) {
2091     throw std::invalid_argument(
2092         to<std::string>("Compression type ", type, " not supported"));
2093   }
2094   auto codec = (*factory)(level, type);
2095   DCHECK(codec->type() == type);
2096   return codec;
2097 }
2098
2099 std::unique_ptr<Codec> getAutoUncompressionCodec(
2100     std::vector<std::unique_ptr<Codec>> customCodecs) {
2101   return AutomaticCodec::create(std::move(customCodecs));
2102 }
2103 } // namespace io
2104 } // namespace folly