Add LZMA streaming interface
[folly.git] / folly / io / Compression.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/Compression.h>
18
19 #if FOLLY_HAVE_LIBLZ4
20 #include <lz4.h>
21 #include <lz4hc.h>
22 #if LZ4_VERSION_NUMBER >= 10301
23 #include <lz4frame.h>
24 #endif
25 #endif
26
27 #include <glog/logging.h>
28
29 #if FOLLY_HAVE_LIBSNAPPY
30 #include <snappy-sinksource.h>
31 #include <snappy.h>
32 #endif
33
34 #if FOLLY_HAVE_LIBZ
35 #include <folly/io/compression/Zlib.h>
36 #endif
37
38 #if FOLLY_HAVE_LIBLZMA
39 #include <lzma.h>
40 #endif
41
42 #if FOLLY_HAVE_LIBZSTD
43 #define ZSTD_STATIC_LINKING_ONLY
44 #include <zstd.h>
45 #endif
46
47 #if FOLLY_HAVE_LIBBZ2
48 #include <bzlib.h>
49 #endif
50
51 #include <folly/Bits.h>
52 #include <folly/Conv.h>
53 #include <folly/Memory.h>
54 #include <folly/Portability.h>
55 #include <folly/ScopeGuard.h>
56 #include <folly/Varint.h>
57 #include <folly/io/Cursor.h>
58 #include <folly/io/compression/Utils.h>
59 #include <algorithm>
60 #include <unordered_set>
61
62 using folly::io::compression::detail::dataStartsWithLE;
63 using folly::io::compression::detail::prefixToStringLE;
64
65 namespace zlib = folly::io::zlib;
66
67 namespace folly {
68 namespace io {
69
70 Codec::Codec(CodecType type) : type_(type) { }
71
72 // Ensure consistent behavior in the nullptr case
73 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
74   if (data == nullptr) {
75     throw std::invalid_argument("Codec: data must not be nullptr");
76   }
77   uint64_t len = data->computeChainDataLength();
78   if (len == 0) {
79     return IOBuf::create(0);
80   }
81   if (len > maxUncompressedLength()) {
82     throw std::runtime_error("Codec: uncompressed length too large");
83   }
84
85   return doCompress(data);
86 }
87
88 std::string Codec::compress(const StringPiece data) {
89   const uint64_t len = data.size();
90   if (len == 0) {
91     return "";
92   }
93   if (len > maxUncompressedLength()) {
94     throw std::runtime_error("Codec: uncompressed length too large");
95   }
96
97   return doCompressString(data);
98 }
99
100 std::unique_ptr<IOBuf> Codec::uncompress(
101     const IOBuf* data,
102     Optional<uint64_t> uncompressedLength) {
103   if (data == nullptr) {
104     throw std::invalid_argument("Codec: data must not be nullptr");
105   }
106   if (!uncompressedLength) {
107     if (needsUncompressedLength()) {
108       throw std::invalid_argument("Codec: uncompressed length required");
109     }
110   } else if (*uncompressedLength > maxUncompressedLength()) {
111     throw std::runtime_error("Codec: uncompressed length too large");
112   }
113
114   if (data->empty()) {
115     if (uncompressedLength.value_or(0) != 0) {
116       throw std::runtime_error("Codec: invalid uncompressed length");
117     }
118     return IOBuf::create(0);
119   }
120
121   return doUncompress(data, uncompressedLength);
122 }
123
124 std::string Codec::uncompress(
125     const StringPiece data,
126     Optional<uint64_t> uncompressedLength) {
127   if (!uncompressedLength) {
128     if (needsUncompressedLength()) {
129       throw std::invalid_argument("Codec: uncompressed length required");
130     }
131   } else if (*uncompressedLength > maxUncompressedLength()) {
132     throw std::runtime_error("Codec: uncompressed length too large");
133   }
134
135   if (data.empty()) {
136     if (uncompressedLength.value_or(0) != 0) {
137       throw std::runtime_error("Codec: invalid uncompressed length");
138     }
139     return "";
140   }
141
142   return doUncompressString(data, uncompressedLength);
143 }
144
145 bool Codec::needsUncompressedLength() const {
146   return doNeedsUncompressedLength();
147 }
148
149 uint64_t Codec::maxUncompressedLength() const {
150   return doMaxUncompressedLength();
151 }
152
153 bool Codec::doNeedsUncompressedLength() const {
154   return false;
155 }
156
157 uint64_t Codec::doMaxUncompressedLength() const {
158   return UNLIMITED_UNCOMPRESSED_LENGTH;
159 }
160
161 std::vector<std::string> Codec::validPrefixes() const {
162   return {};
163 }
164
165 bool Codec::canUncompress(const IOBuf*, Optional<uint64_t>) const {
166   return false;
167 }
168
169 std::string Codec::doCompressString(const StringPiece data) {
170   const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
171   auto outputBuffer = doCompress(&inputBuffer);
172   std::string output;
173   output.reserve(outputBuffer->computeChainDataLength());
174   for (auto range : *outputBuffer) {
175     output.append(reinterpret_cast<const char*>(range.data()), range.size());
176   }
177   return output;
178 }
179
180 std::string Codec::doUncompressString(
181     const StringPiece data,
182     Optional<uint64_t> uncompressedLength) {
183   const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
184   auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
185   std::string output;
186   output.reserve(outputBuffer->computeChainDataLength());
187   for (auto range : *outputBuffer) {
188     output.append(reinterpret_cast<const char*>(range.data()), range.size());
189   }
190   return output;
191 }
192
193 uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
194   if (uncompressedLength == 0) {
195     return 0;
196   }
197   return doMaxCompressedLength(uncompressedLength);
198 }
199
200 Optional<uint64_t> Codec::getUncompressedLength(
201     const folly::IOBuf* data,
202     Optional<uint64_t> uncompressedLength) const {
203   auto const compressedLength = data->computeChainDataLength();
204   if (uncompressedLength == uint64_t(0) || compressedLength == 0) {
205     if (uncompressedLength.value_or(0) != 0 || compressedLength != 0) {
206       throw std::runtime_error("Invalid uncompressed length");
207     }
208     return 0;
209   }
210   return doGetUncompressedLength(data, uncompressedLength);
211 }
212
213 Optional<uint64_t> Codec::doGetUncompressedLength(
214     const folly::IOBuf*,
215     Optional<uint64_t> uncompressedLength) const {
216   return uncompressedLength;
217 }
218
219 bool StreamCodec::needsDataLength() const {
220   return doNeedsDataLength();
221 }
222
223 bool StreamCodec::doNeedsDataLength() const {
224   return false;
225 }
226
227 void StreamCodec::assertStateIs(State expected) const {
228   if (state_ != expected) {
229     throw std::logic_error(folly::to<std::string>(
230         "Codec: state is ", state_, "; expected state ", expected));
231   }
232 }
233
234 void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
235   state_ = State::RESET;
236   uncompressedLength_ = uncompressedLength;
237   doResetStream();
238 }
239
240 bool StreamCodec::compressStream(
241     ByteRange& input,
242     MutableByteRange& output,
243     StreamCodec::FlushOp flushOp) {
244   if (state_ == State::RESET && input.empty()) {
245     if (flushOp == StreamCodec::FlushOp::NONE) {
246       return false;
247     }
248     if (flushOp == StreamCodec::FlushOp::END &&
249         uncompressedLength().value_or(0) != 0) {
250       throw std::runtime_error("Codec: invalid uncompressed length");
251     }
252     return true;
253   }
254   if (!uncompressedLength() && needsDataLength()) {
255     throw std::runtime_error("Codec: uncompressed length required");
256   }
257   if (state_ == State::RESET && !input.empty() &&
258       uncompressedLength() == uint64_t(0)) {
259     throw std::runtime_error("Codec: invalid uncompressed length");
260   }
261   // Handle input state transitions
262   switch (flushOp) {
263     case StreamCodec::FlushOp::NONE:
264       if (state_ == State::RESET) {
265         state_ = State::COMPRESS;
266       }
267       assertStateIs(State::COMPRESS);
268       break;
269     case StreamCodec::FlushOp::FLUSH:
270       if (state_ == State::RESET || state_ == State::COMPRESS) {
271         state_ = State::COMPRESS_FLUSH;
272       }
273       assertStateIs(State::COMPRESS_FLUSH);
274       break;
275     case StreamCodec::FlushOp::END:
276       if (state_ == State::RESET || state_ == State::COMPRESS) {
277         state_ = State::COMPRESS_END;
278       }
279       assertStateIs(State::COMPRESS_END);
280       break;
281   }
282   bool const done = doCompressStream(input, output, flushOp);
283   // Handle output state transitions
284   if (done) {
285     if (state_ == State::COMPRESS_FLUSH) {
286       state_ = State::COMPRESS;
287     } else if (state_ == State::COMPRESS_END) {
288       state_ = State::END;
289     }
290     // Check internal invariants
291     DCHECK(input.empty());
292     DCHECK(flushOp != StreamCodec::FlushOp::NONE);
293   }
294   return done;
295 }
296
297 bool StreamCodec::uncompressStream(
298     ByteRange& input,
299     MutableByteRange& output,
300     StreamCodec::FlushOp flushOp) {
301   if (state_ == State::RESET && input.empty()) {
302     if (uncompressedLength().value_or(0) == 0) {
303       return true;
304     }
305     return false;
306   }
307   // Handle input state transitions
308   if (state_ == State::RESET) {
309     state_ = State::UNCOMPRESS;
310   }
311   assertStateIs(State::UNCOMPRESS);
312   bool const done = doUncompressStream(input, output, flushOp);
313   // Handle output state transitions
314   if (done) {
315     state_ = State::END;
316   }
317   return done;
318 }
319
320 static std::unique_ptr<IOBuf> addOutputBuffer(
321     MutableByteRange& output,
322     uint64_t size) {
323   DCHECK(output.empty());
324   auto buffer = IOBuf::create(size);
325   buffer->append(buffer->capacity());
326   output = {buffer->writableData(), buffer->length()};
327   return buffer;
328 }
329
330 std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
331   uint64_t const uncompressedLength = data->computeChainDataLength();
332   resetStream(uncompressedLength);
333   uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
334
335   auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
336   auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
337
338   MutableByteRange output;
339   auto buffer = addOutputBuffer(
340       output,
341       maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
342                                                : kDefaultBufferLength);
343
344   // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
345   IOBuf const* current = data;
346   ByteRange input{current->data(), current->length()};
347   StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
348   for (;;) {
349     while (input.empty() && current->next() != data) {
350       current = current->next();
351       input = {current->data(), current->length()};
352     }
353     if (current->next() == data) {
354       // This is the last input buffer so end the stream
355       flushOp = StreamCodec::FlushOp::END;
356     }
357     if (output.empty()) {
358       buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
359     }
360     size_t const inputSize = input.size();
361     size_t const outputSize = output.size();
362     bool const done = compressStream(input, output, flushOp);
363     if (done) {
364       DCHECK(input.empty());
365       DCHECK(flushOp == StreamCodec::FlushOp::END);
366       DCHECK_EQ(current->next(), data);
367       break;
368     }
369     if (inputSize == input.size() && outputSize == output.size()) {
370       throw std::runtime_error("Codec: No forward progress made");
371     }
372   }
373   buffer->prev()->trimEnd(output.size());
374   return buffer;
375 }
376
377 static uint64_t computeBufferLength(
378     uint64_t const compressedLength,
379     uint64_t const blockSize) {
380   uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
381   uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
382   return std::min(goodBufferSize, kMaxBufferLength);
383 }
384
385 std::unique_ptr<IOBuf> StreamCodec::doUncompress(
386     IOBuf const* data,
387     Optional<uint64_t> uncompressedLength) {
388   auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
389   auto constexpr kBlockSize = uint64_t(128) << 10;
390   auto const defaultBufferLength =
391       computeBufferLength(data->computeChainDataLength(), kBlockSize);
392
393   uncompressedLength = getUncompressedLength(data, uncompressedLength);
394   resetStream(uncompressedLength);
395
396   MutableByteRange output;
397   auto buffer = addOutputBuffer(
398       output,
399       (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
400            ? *uncompressedLength
401            : defaultBufferLength));
402
403   // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
404   IOBuf const* current = data;
405   ByteRange input{current->data(), current->length()};
406   StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
407   for (;;) {
408     while (input.empty() && current->next() != data) {
409       current = current->next();
410       input = {current->data(), current->length()};
411     }
412     if (current->next() == data) {
413       // Tell the uncompressor there is no more input (it may optimize)
414       flushOp = StreamCodec::FlushOp::END;
415     }
416     if (output.empty()) {
417       buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
418     }
419     size_t const inputSize = input.size();
420     size_t const outputSize = output.size();
421     bool const done = uncompressStream(input, output, flushOp);
422     if (done) {
423       break;
424     }
425     if (inputSize == input.size() && outputSize == output.size()) {
426       throw std::runtime_error("Codec: Truncated data");
427     }
428   }
429   if (!input.empty()) {
430     throw std::runtime_error("Codec: Junk after end of data");
431   }
432
433   buffer->prev()->trimEnd(output.size());
434   if (uncompressedLength &&
435       *uncompressedLength != buffer->computeChainDataLength()) {
436     throw std::runtime_error("Codec: invalid uncompressed length");
437   }
438
439   return buffer;
440 }
441
442 namespace {
443
444 /**
445  * No compression
446  */
447 class NoCompressionCodec final : public Codec {
448  public:
449   static std::unique_ptr<Codec> create(int level, CodecType type);
450   explicit NoCompressionCodec(int level, CodecType type);
451
452  private:
453   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
454   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
455   std::unique_ptr<IOBuf> doUncompress(
456       const IOBuf* data,
457       Optional<uint64_t> uncompressedLength) override;
458 };
459
460 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
461   return std::make_unique<NoCompressionCodec>(level, type);
462 }
463
464 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
465     : Codec(type) {
466   DCHECK(type == CodecType::NO_COMPRESSION);
467   switch (level) {
468     case COMPRESSION_LEVEL_DEFAULT:
469     case COMPRESSION_LEVEL_FASTEST:
470     case COMPRESSION_LEVEL_BEST:
471       level = 0;
472   }
473   if (level != 0) {
474     throw std::invalid_argument(to<std::string>(
475         "NoCompressionCodec: invalid level ", level));
476   }
477 }
478
479 uint64_t NoCompressionCodec::doMaxCompressedLength(
480     uint64_t uncompressedLength) const {
481   return uncompressedLength;
482 }
483
484 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
485     const IOBuf* data) {
486   return data->clone();
487 }
488
489 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
490     const IOBuf* data,
491     Optional<uint64_t> uncompressedLength) {
492   if (uncompressedLength &&
493       data->computeChainDataLength() != *uncompressedLength) {
494     throw std::runtime_error(
495         to<std::string>("NoCompressionCodec: invalid uncompressed length"));
496   }
497   return data->clone();
498 }
499
500 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
501
502 namespace {
503
504 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
505   DCHECK_GE(out->tailroom(), kMaxVarintLength64);
506   out->append(encodeVarint(val, out->writableTail()));
507 }
508
509 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
510   uint64_t val = 0;
511   int8_t b = 0;
512   for (int shift = 0; shift <= 63; shift += 7) {
513     b = cursor.read<int8_t>();
514     val |= static_cast<uint64_t>(b & 0x7f) << shift;
515     if (b >= 0) {
516       break;
517     }
518   }
519   if (b < 0) {
520     throw std::invalid_argument("Invalid varint value. Too big.");
521   }
522   return val;
523 }
524
525 } // namespace
526
527 #endif  // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
528
529 #if FOLLY_HAVE_LIBLZ4
530
531 /**
532  * LZ4 compression
533  */
534 class LZ4Codec final : public Codec {
535  public:
536   static std::unique_ptr<Codec> create(int level, CodecType type);
537   explicit LZ4Codec(int level, CodecType type);
538
539  private:
540   bool doNeedsUncompressedLength() const override;
541   uint64_t doMaxUncompressedLength() const override;
542   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
543
544   bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
545
546   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
547   std::unique_ptr<IOBuf> doUncompress(
548       const IOBuf* data,
549       Optional<uint64_t> uncompressedLength) override;
550
551   bool highCompression_;
552 };
553
554 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
555   return std::make_unique<LZ4Codec>(level, type);
556 }
557
558 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
559   DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
560
561   switch (level) {
562     case COMPRESSION_LEVEL_FASTEST:
563     case COMPRESSION_LEVEL_DEFAULT:
564       level = 1;
565       break;
566     case COMPRESSION_LEVEL_BEST:
567       level = 2;
568       break;
569   }
570   if (level < 1 || level > 2) {
571     throw std::invalid_argument(to<std::string>(
572         "LZ4Codec: invalid level: ", level));
573   }
574   highCompression_ = (level > 1);
575 }
576
577 bool LZ4Codec::doNeedsUncompressedLength() const {
578   return !encodeSize();
579 }
580
581 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
582 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
583 // here.
584 #ifndef LZ4_MAX_INPUT_SIZE
585 # define LZ4_MAX_INPUT_SIZE 0x7E000000
586 #endif
587
588 uint64_t LZ4Codec::doMaxUncompressedLength() const {
589   return LZ4_MAX_INPUT_SIZE;
590 }
591
592 uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
593   return LZ4_compressBound(uncompressedLength) +
594       (encodeSize() ? kMaxVarintLength64 : 0);
595 }
596
597 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
598   IOBuf clone;
599   if (data->isChained()) {
600     // LZ4 doesn't support streaming, so we have to coalesce
601     clone = data->cloneCoalescedAsValue();
602     data = &clone;
603   }
604
605   auto out = IOBuf::create(maxCompressedLength(data->length()));
606   if (encodeSize()) {
607     encodeVarintToIOBuf(data->length(), out.get());
608   }
609
610   int n;
611   auto input = reinterpret_cast<const char*>(data->data());
612   auto output = reinterpret_cast<char*>(out->writableTail());
613   const auto inputLength = data->length();
614 #if LZ4_VERSION_NUMBER >= 10700
615   if (highCompression_) {
616     n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
617   } else {
618     n = LZ4_compress_default(input, output, inputLength, out->tailroom());
619   }
620 #else
621   if (highCompression_) {
622     n = LZ4_compressHC(input, output, inputLength);
623   } else {
624     n = LZ4_compress(input, output, inputLength);
625   }
626 #endif
627
628   CHECK_GE(n, 0);
629   CHECK_LE(n, out->capacity());
630
631   out->append(n);
632   return out;
633 }
634
635 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
636     const IOBuf* data,
637     Optional<uint64_t> uncompressedLength) {
638   IOBuf clone;
639   if (data->isChained()) {
640     // LZ4 doesn't support streaming, so we have to coalesce
641     clone = data->cloneCoalescedAsValue();
642     data = &clone;
643   }
644
645   folly::io::Cursor cursor(data);
646   uint64_t actualUncompressedLength;
647   if (encodeSize()) {
648     actualUncompressedLength = decodeVarintFromCursor(cursor);
649     if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
650       throw std::runtime_error("LZ4Codec: invalid uncompressed length");
651     }
652   } else {
653     // Invariants
654     DCHECK(uncompressedLength.hasValue());
655     DCHECK(*uncompressedLength <= maxUncompressedLength());
656     actualUncompressedLength = *uncompressedLength;
657   }
658
659   auto sp = StringPiece{cursor.peekBytes()};
660   auto out = IOBuf::create(actualUncompressedLength);
661   int n = LZ4_decompress_safe(
662       sp.data(),
663       reinterpret_cast<char*>(out->writableTail()),
664       sp.size(),
665       actualUncompressedLength);
666
667   if (n < 0 || uint64_t(n) != actualUncompressedLength) {
668     throw std::runtime_error(to<std::string>(
669         "LZ4 decompression returned invalid value ", n));
670   }
671   out->append(actualUncompressedLength);
672   return out;
673 }
674
675 #if LZ4_VERSION_NUMBER >= 10301
676
677 class LZ4FrameCodec final : public Codec {
678  public:
679   static std::unique_ptr<Codec> create(int level, CodecType type);
680   explicit LZ4FrameCodec(int level, CodecType type);
681   ~LZ4FrameCodec() override;
682
683   std::vector<std::string> validPrefixes() const override;
684   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
685       const override;
686
687  private:
688   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
689
690   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
691   std::unique_ptr<IOBuf> doUncompress(
692       const IOBuf* data,
693       Optional<uint64_t> uncompressedLength) override;
694
695   // Reset the dctx_ if it is dirty or null.
696   void resetDCtx();
697
698   int level_;
699   LZ4F_decompressionContext_t dctx_{nullptr};
700   bool dirty_{false};
701 };
702
703 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
704     int level,
705     CodecType type) {
706   return std::make_unique<LZ4FrameCodec>(level, type);
707 }
708
709 static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204;
710
711 std::vector<std::string> LZ4FrameCodec::validPrefixes() const {
712   return {prefixToStringLE(kLZ4FrameMagicLE)};
713 }
714
715 bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
716   return dataStartsWithLE(data, kLZ4FrameMagicLE);
717 }
718
719 uint64_t LZ4FrameCodec::doMaxCompressedLength(
720     uint64_t uncompressedLength) const {
721   LZ4F_preferences_t prefs{};
722   prefs.compressionLevel = level_;
723   prefs.frameInfo.contentSize = uncompressedLength;
724   return LZ4F_compressFrameBound(uncompressedLength, &prefs);
725 }
726
727 static size_t lz4FrameThrowOnError(size_t code) {
728   if (LZ4F_isError(code)) {
729     throw std::runtime_error(
730         to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
731   }
732   return code;
733 }
734
735 void LZ4FrameCodec::resetDCtx() {
736   if (dctx_ && !dirty_) {
737     return;
738   }
739   if (dctx_) {
740     LZ4F_freeDecompressionContext(dctx_);
741   }
742   lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
743   dirty_ = false;
744 }
745
746 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
747   DCHECK(type == CodecType::LZ4_FRAME);
748   switch (level) {
749     case COMPRESSION_LEVEL_FASTEST:
750     case COMPRESSION_LEVEL_DEFAULT:
751       level_ = 0;
752       break;
753     case COMPRESSION_LEVEL_BEST:
754       level_ = 16;
755       break;
756     default:
757       level_ = level;
758       break;
759   }
760 }
761
762 LZ4FrameCodec::~LZ4FrameCodec() {
763   if (dctx_) {
764     LZ4F_freeDecompressionContext(dctx_);
765   }
766 }
767
768 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
769   // LZ4 Frame compression doesn't support streaming so we have to coalesce
770   IOBuf clone;
771   if (data->isChained()) {
772     clone = data->cloneCoalescedAsValue();
773     data = &clone;
774   }
775   // Set preferences
776   const auto uncompressedLength = data->length();
777   LZ4F_preferences_t prefs{};
778   prefs.compressionLevel = level_;
779   prefs.frameInfo.contentSize = uncompressedLength;
780   // Compress
781   auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
782   const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
783       buf->writableTail(),
784       buf->tailroom(),
785       data->data(),
786       data->length(),
787       &prefs));
788   buf->append(written);
789   return buf;
790 }
791
792 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
793     const IOBuf* data,
794     Optional<uint64_t> uncompressedLength) {
795   // Reset the dctx if any errors have occurred
796   resetDCtx();
797   // Coalesce the data
798   ByteRange in = *data->begin();
799   IOBuf clone;
800   if (data->isChained()) {
801     clone = data->cloneCoalescedAsValue();
802     in = clone.coalesce();
803   }
804   data = nullptr;
805   // Select decompression options
806   LZ4F_decompressOptions_t options;
807   options.stableDst = 1;
808   // Select blockSize and growthSize for the IOBufQueue
809   IOBufQueue queue(IOBufQueue::cacheChainLength());
810   auto blockSize = uint64_t{64} << 10;
811   auto growthSize = uint64_t{4} << 20;
812   if (uncompressedLength) {
813     // Allocate uncompressedLength in one chunk (up to 64 MB)
814     const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20);
815     queue.preallocate(allocateSize, allocateSize);
816     blockSize = std::min(*uncompressedLength, blockSize);
817     growthSize = std::min(*uncompressedLength, growthSize);
818   } else {
819     // Reduce growthSize for small data
820     const auto guessUncompressedLen =
821         4 * std::max<uint64_t>(blockSize, in.size());
822     growthSize = std::min(guessUncompressedLen, growthSize);
823   }
824   // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
825   // returns 0
826   dirty_ = true;
827   // Decompress until the frame is over
828   size_t code = 0;
829   do {
830     // Allocate enough space to decompress at least a block
831     void* out;
832     size_t outSize;
833     std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
834     // Decompress
835     size_t inSize = in.size();
836     code = lz4FrameThrowOnError(
837         LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
838     if (in.empty() && outSize == 0 && code != 0) {
839       // We passed no input, no output was produced, and the frame isn't over
840       // No more forward progress is possible
841       throw std::runtime_error("LZ4Frame error: Incomplete frame");
842     }
843     in.uncheckedAdvance(inSize);
844     queue.postallocate(outSize);
845   } while (code != 0);
846   // At this point the decompression context can be reused
847   dirty_ = false;
848   if (uncompressedLength && queue.chainLength() != *uncompressedLength) {
849     throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
850   }
851   return queue.move();
852 }
853
854 #endif // LZ4_VERSION_NUMBER >= 10301
855 #endif // FOLLY_HAVE_LIBLZ4
856
857 #if FOLLY_HAVE_LIBSNAPPY
858
859 /**
860  * Snappy compression
861  */
862
863 /**
864  * Implementation of snappy::Source that reads from a IOBuf chain.
865  */
866 class IOBufSnappySource final : public snappy::Source {
867  public:
868   explicit IOBufSnappySource(const IOBuf* data);
869   size_t Available() const override;
870   const char* Peek(size_t* len) override;
871   void Skip(size_t n) override;
872  private:
873   size_t available_;
874   io::Cursor cursor_;
875 };
876
877 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
878   : available_(data->computeChainDataLength()),
879     cursor_(data) {
880 }
881
882 size_t IOBufSnappySource::Available() const {
883   return available_;
884 }
885
886 const char* IOBufSnappySource::Peek(size_t* len) {
887   auto sp = StringPiece{cursor_.peekBytes()};
888   *len = sp.size();
889   return sp.data();
890 }
891
892 void IOBufSnappySource::Skip(size_t n) {
893   CHECK_LE(n, available_);
894   cursor_.skip(n);
895   available_ -= n;
896 }
897
898 class SnappyCodec final : public Codec {
899  public:
900   static std::unique_ptr<Codec> create(int level, CodecType type);
901   explicit SnappyCodec(int level, CodecType type);
902
903  private:
904   uint64_t doMaxUncompressedLength() const override;
905   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
906   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
907   std::unique_ptr<IOBuf> doUncompress(
908       const IOBuf* data,
909       Optional<uint64_t> uncompressedLength) override;
910 };
911
912 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
913   return std::make_unique<SnappyCodec>(level, type);
914 }
915
916 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
917   DCHECK(type == CodecType::SNAPPY);
918   switch (level) {
919     case COMPRESSION_LEVEL_FASTEST:
920     case COMPRESSION_LEVEL_DEFAULT:
921     case COMPRESSION_LEVEL_BEST:
922       level = 1;
923   }
924   if (level != 1) {
925     throw std::invalid_argument(to<std::string>(
926         "SnappyCodec: invalid level: ", level));
927   }
928 }
929
930 uint64_t SnappyCodec::doMaxUncompressedLength() const {
931   // snappy.h uses uint32_t for lengths, so there's that.
932   return std::numeric_limits<uint32_t>::max();
933 }
934
935 uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
936   return snappy::MaxCompressedLength(uncompressedLength);
937 }
938
939 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
940   IOBufSnappySource source(data);
941   auto out = IOBuf::create(maxCompressedLength(source.Available()));
942
943   snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
944       out->writableTail()));
945
946   size_t n = snappy::Compress(&source, &sink);
947
948   CHECK_LE(n, out->capacity());
949   out->append(n);
950   return out;
951 }
952
953 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(
954     const IOBuf* data,
955     Optional<uint64_t> uncompressedLength) {
956   uint32_t actualUncompressedLength = 0;
957
958   {
959     IOBufSnappySource source(data);
960     if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
961       throw std::runtime_error("snappy::GetUncompressedLength failed");
962     }
963     if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
964       throw std::runtime_error("snappy: invalid uncompressed length");
965     }
966   }
967
968   auto out = IOBuf::create(actualUncompressedLength);
969
970   {
971     IOBufSnappySource source(data);
972     if (!snappy::RawUncompress(&source,
973                                reinterpret_cast<char*>(out->writableTail()))) {
974       throw std::runtime_error("snappy::RawUncompress failed");
975     }
976   }
977
978   out->append(actualUncompressedLength);
979   return out;
980 }
981
982 #endif  // FOLLY_HAVE_LIBSNAPPY
983
984 #if FOLLY_HAVE_LIBLZMA
985
986 /**
987  * LZMA2 compression
988  */
989 class LZMA2StreamCodec final : public StreamCodec {
990  public:
991   static std::unique_ptr<Codec> createCodec(int level, CodecType type);
992   static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
993   explicit LZMA2StreamCodec(int level, CodecType type);
994   ~LZMA2StreamCodec() override;
995
996   std::vector<std::string> validPrefixes() const override;
997   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
998       const override;
999
1000  private:
1001   bool doNeedsDataLength() const override;
1002   uint64_t doMaxUncompressedLength() const override;
1003   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1004
1005   bool encodeSize() const {
1006     return type() == CodecType::LZMA2_VARINT_SIZE;
1007   }
1008
1009   void doResetStream() override;
1010   bool doCompressStream(
1011       ByteRange& input,
1012       MutableByteRange& output,
1013       StreamCodec::FlushOp flushOp) override;
1014   bool doUncompressStream(
1015       ByteRange& input,
1016       MutableByteRange& output,
1017       StreamCodec::FlushOp flushOp) override;
1018
1019   void resetCStream();
1020   void resetDStream();
1021
1022   size_t decodeVarint(ByteRange& input);
1023   bool flushVarintBuffer(MutableByteRange& output);
1024   void resetVarintBuffer();
1025
1026   Optional<lzma_stream> cstream_{};
1027   Optional<lzma_stream> dstream_{};
1028
1029   std::array<uint8_t, kMaxVarintLength64> varintBuffer_;
1030   ByteRange varintToEncode_;
1031   size_t varintBufferPos_{0};
1032
1033   int level_;
1034   bool needReset_{true};
1035   bool needDecodeSize_{false};
1036 };
1037
1038 static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD;
1039 static constexpr unsigned kLZMA2MagicBytes = 6;
1040
1041 std::vector<std::string> LZMA2StreamCodec::validPrefixes() const {
1042   if (type() == CodecType::LZMA2_VARINT_SIZE) {
1043     return {};
1044   }
1045   return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)};
1046 }
1047
1048 bool LZMA2StreamCodec::doNeedsDataLength() const {
1049   return encodeSize();
1050 }
1051
1052 bool LZMA2StreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1053     const {
1054   if (type() == CodecType::LZMA2_VARINT_SIZE) {
1055     return false;
1056   }
1057   // Returns false for all inputs less than 8 bytes.
1058   // This is okay, because no valid LZMA2 streams are less than 8 bytes.
1059   return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes);
1060 }
1061
1062 std::unique_ptr<Codec> LZMA2StreamCodec::createCodec(
1063     int level,
1064     CodecType type) {
1065   return make_unique<LZMA2StreamCodec>(level, type);
1066 }
1067
1068 std::unique_ptr<StreamCodec> LZMA2StreamCodec::createStream(
1069     int level,
1070     CodecType type) {
1071   return make_unique<LZMA2StreamCodec>(level, type);
1072 }
1073
1074 LZMA2StreamCodec::LZMA2StreamCodec(int level, CodecType type)
1075     : StreamCodec(type) {
1076   DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
1077   switch (level) {
1078     case COMPRESSION_LEVEL_FASTEST:
1079       level = 0;
1080       break;
1081     case COMPRESSION_LEVEL_DEFAULT:
1082       level = LZMA_PRESET_DEFAULT;
1083       break;
1084     case COMPRESSION_LEVEL_BEST:
1085       level = 9;
1086       break;
1087   }
1088   if (level < 0 || level > 9) {
1089     throw std::invalid_argument(
1090         to<std::string>("LZMA2Codec: invalid level: ", level));
1091   }
1092   level_ = level;
1093 }
1094
1095 LZMA2StreamCodec::~LZMA2StreamCodec() {
1096   if (cstream_) {
1097     lzma_end(cstream_.get_pointer());
1098     cstream_.clear();
1099   }
1100   if (dstream_) {
1101     lzma_end(dstream_.get_pointer());
1102     dstream_.clear();
1103   }
1104 }
1105
1106 uint64_t LZMA2StreamCodec::doMaxUncompressedLength() const {
1107   // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
1108   return uint64_t(1) << 63;
1109 }
1110
1111 uint64_t LZMA2StreamCodec::doMaxCompressedLength(
1112     uint64_t uncompressedLength) const {
1113   return lzma_stream_buffer_bound(uncompressedLength) +
1114       (encodeSize() ? kMaxVarintLength64 : 0);
1115 }
1116
1117 void LZMA2StreamCodec::doResetStream() {
1118   needReset_ = true;
1119 }
1120
1121 void LZMA2StreamCodec::resetCStream() {
1122   if (!cstream_) {
1123     cstream_.assign(LZMA_STREAM_INIT);
1124   }
1125   lzma_ret const rc =
1126       lzma_easy_encoder(cstream_.get_pointer(), level_, LZMA_CHECK_NONE);
1127   if (rc != LZMA_OK) {
1128     throw std::runtime_error(folly::to<std::string>(
1129         "LZMA2StreamCodec: lzma_easy_encoder error: ", rc));
1130   }
1131 }
1132
1133 void LZMA2StreamCodec::resetDStream() {
1134   if (!dstream_) {
1135     dstream_.assign(LZMA_STREAM_INIT);
1136   }
1137   lzma_ret const rc = lzma_auto_decoder(
1138       dstream_.get_pointer(), std::numeric_limits<uint64_t>::max(), 0);
1139   if (rc != LZMA_OK) {
1140     throw std::runtime_error(folly::to<std::string>(
1141         "LZMA2StreamCodec: lzma_auto_decoder error: ", rc));
1142   }
1143 }
1144
1145 static lzma_ret lzmaThrowOnError(lzma_ret const rc) {
1146   switch (rc) {
1147     case LZMA_OK:
1148     case LZMA_STREAM_END:
1149     case LZMA_BUF_ERROR: // not fatal: returned if no progress was made twice
1150       return rc;
1151     default:
1152       throw std::runtime_error(
1153           to<std::string>("LZMA2StreamCodec: error: ", rc));
1154   }
1155 }
1156
1157 static lzma_action lzmaTranslateFlush(StreamCodec::FlushOp flush) {
1158   switch (flush) {
1159     case StreamCodec::FlushOp::NONE:
1160       return LZMA_RUN;
1161     case StreamCodec::FlushOp::FLUSH:
1162       return LZMA_SYNC_FLUSH;
1163     case StreamCodec::FlushOp::END:
1164       return LZMA_FINISH;
1165     default:
1166       throw std::invalid_argument("LZMA2StreamCodec: Invalid flush");
1167   }
1168 }
1169
1170 /**
1171  * Flushes the varint buffer.
1172  * Advances output by the number of bytes written.
1173  * Returns true when flushing is complete.
1174  */
1175 bool LZMA2StreamCodec::flushVarintBuffer(MutableByteRange& output) {
1176   if (varintToEncode_.empty()) {
1177     return true;
1178   }
1179   const size_t numBytesToCopy = std::min(varintToEncode_.size(), output.size());
1180   if (numBytesToCopy > 0) {
1181     memcpy(output.data(), varintToEncode_.data(), numBytesToCopy);
1182   }
1183   varintToEncode_.advance(numBytesToCopy);
1184   output.advance(numBytesToCopy);
1185   return varintToEncode_.empty();
1186 }
1187
1188 bool LZMA2StreamCodec::doCompressStream(
1189     ByteRange& input,
1190     MutableByteRange& output,
1191     StreamCodec::FlushOp flushOp) {
1192   if (needReset_) {
1193     resetCStream();
1194     if (encodeSize()) {
1195       varintBufferPos_ = 0;
1196       size_t const varintSize =
1197           encodeVarint(*uncompressedLength(), varintBuffer_.data());
1198       varintToEncode_ = {varintBuffer_.data(), varintSize};
1199     }
1200     needReset_ = false;
1201   }
1202
1203   if (!flushVarintBuffer(output)) {
1204     return false;
1205   }
1206
1207   cstream_->next_in = const_cast<uint8_t*>(input.data());
1208   cstream_->avail_in = input.size();
1209   cstream_->next_out = output.data();
1210   cstream_->avail_out = output.size();
1211   SCOPE_EXIT {
1212     input.uncheckedAdvance(input.size() - cstream_->avail_in);
1213     output.uncheckedAdvance(output.size() - cstream_->avail_out);
1214   };
1215   lzma_ret const rc = lzmaThrowOnError(
1216       lzma_code(cstream_.get_pointer(), lzmaTranslateFlush(flushOp)));
1217   switch (flushOp) {
1218     case StreamCodec::FlushOp::NONE:
1219       return false;
1220     case StreamCodec::FlushOp::FLUSH:
1221       return cstream_->avail_in == 0 && cstream_->avail_out != 0;
1222     case StreamCodec::FlushOp::END:
1223       return rc == LZMA_STREAM_END;
1224     default:
1225       throw std::invalid_argument("LZMA2StreamCodec: invalid FlushOp");
1226   }
1227 }
1228
1229 /**
1230  * Attempts to decode a varint from input.
1231  * The function advances input by the number of bytes read.
1232  *
1233  * If there are too many bytes and the varint is not valid, throw a
1234  * runtime_error.
1235  * Returns the decoded size or 0 if more bytes are needed.
1236  */
1237 size_t LZMA2StreamCodec::decodeVarint(ByteRange& input) {
1238   if (input.empty()) {
1239     return 0;
1240   }
1241   size_t const numBytesToCopy =
1242       std::min(kMaxVarintLength64 - varintBufferPos_, input.size());
1243   memcpy(varintBuffer_.data() + varintBufferPos_, input.data(), numBytesToCopy);
1244
1245   size_t const rangeSize = varintBufferPos_ + numBytesToCopy;
1246   ByteRange range{varintBuffer_.data(), rangeSize};
1247   auto const ret = tryDecodeVarint(range);
1248
1249   if (ret.hasValue()) {
1250     size_t const varintSize = rangeSize - range.size();
1251     input.advance(varintSize - varintBufferPos_);
1252     return ret.value();
1253   } else if (ret.error() == DecodeVarintError::TooManyBytes) {
1254     throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1255   } else {
1256     // Too few bytes
1257     input.advance(numBytesToCopy);
1258     varintBufferPos_ += numBytesToCopy;
1259     return 0;
1260   }
1261 }
1262
1263 bool LZMA2StreamCodec::doUncompressStream(
1264     ByteRange& input,
1265     MutableByteRange& output,
1266     StreamCodec::FlushOp flushOp) {
1267   if (needReset_) {
1268     resetDStream();
1269     needReset_ = false;
1270     needDecodeSize_ = encodeSize();
1271     if (encodeSize()) {
1272       // Reset buffer
1273       varintBufferPos_ = 0;
1274     }
1275   }
1276
1277   if (needDecodeSize_) {
1278     // Try decoding the varint. If the input does not contain the entire varint,
1279     // buffer the input. If the varint can not be decoded, fail.
1280     size_t const size = decodeVarint(input);
1281     if (!size) {
1282       return false;
1283     }
1284     if (uncompressedLength() && *uncompressedLength() != size) {
1285       throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1286     }
1287     needDecodeSize_ = false;
1288   }
1289
1290   dstream_->next_in = const_cast<uint8_t*>(input.data());
1291   dstream_->avail_in = input.size();
1292   dstream_->next_out = output.data();
1293   dstream_->avail_out = output.size();
1294   SCOPE_EXIT {
1295     input.advance(input.size() - dstream_->avail_in);
1296     output.advance(output.size() - dstream_->avail_out);
1297   };
1298
1299   lzma_ret rc;
1300   switch (flushOp) {
1301     case StreamCodec::FlushOp::NONE:
1302     case StreamCodec::FlushOp::FLUSH:
1303       rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_RUN));
1304       break;
1305     case StreamCodec::FlushOp::END:
1306       rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_FINISH));
1307       break;
1308     default:
1309       throw std::invalid_argument("LZMA2StreamCodec: invalid flush");
1310   }
1311   return rc == LZMA_STREAM_END;
1312 }
1313 #endif // FOLLY_HAVE_LIBLZMA
1314
1315 #ifdef FOLLY_HAVE_LIBZSTD
1316
1317 namespace {
1318 void zstdFreeCStream(ZSTD_CStream* zcs) {
1319   ZSTD_freeCStream(zcs);
1320 }
1321
1322 void zstdFreeDStream(ZSTD_DStream* zds) {
1323   ZSTD_freeDStream(zds);
1324 }
1325 }
1326
1327 /**
1328  * ZSTD compression
1329  */
1330 class ZSTDStreamCodec final : public StreamCodec {
1331  public:
1332   static std::unique_ptr<Codec> createCodec(int level, CodecType);
1333   static std::unique_ptr<StreamCodec> createStream(int level, CodecType);
1334   explicit ZSTDStreamCodec(int level, CodecType type);
1335
1336   std::vector<std::string> validPrefixes() const override;
1337   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1338       const override;
1339
1340  private:
1341   bool doNeedsUncompressedLength() const override;
1342   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1343   Optional<uint64_t> doGetUncompressedLength(
1344       IOBuf const* data,
1345       Optional<uint64_t> uncompressedLength) const override;
1346
1347   void doResetStream() override;
1348   bool doCompressStream(
1349       ByteRange& input,
1350       MutableByteRange& output,
1351       StreamCodec::FlushOp flushOp) override;
1352   bool doUncompressStream(
1353       ByteRange& input,
1354       MutableByteRange& output,
1355       StreamCodec::FlushOp flushOp) override;
1356
1357   void resetCStream();
1358   void resetDStream();
1359
1360   bool tryBlockCompress(ByteRange& input, MutableByteRange& output) const;
1361   bool tryBlockUncompress(ByteRange& input, MutableByteRange& output) const;
1362
1363   int level_;
1364   bool needReset_{true};
1365   std::unique_ptr<
1366       ZSTD_CStream,
1367       folly::static_function_deleter<ZSTD_CStream, &zstdFreeCStream>>
1368       cstream_{nullptr};
1369   std::unique_ptr<
1370       ZSTD_DStream,
1371       folly::static_function_deleter<ZSTD_DStream, &zstdFreeDStream>>
1372       dstream_{nullptr};
1373 };
1374
1375 static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528;
1376
1377 std::vector<std::string> ZSTDStreamCodec::validPrefixes() const {
1378   return {prefixToStringLE(kZSTDMagicLE)};
1379 }
1380
1381 bool ZSTDStreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1382     const {
1383   return dataStartsWithLE(data, kZSTDMagicLE);
1384 }
1385
1386 std::unique_ptr<Codec> ZSTDStreamCodec::createCodec(int level, CodecType type) {
1387   return make_unique<ZSTDStreamCodec>(level, type);
1388 }
1389
1390 std::unique_ptr<StreamCodec> ZSTDStreamCodec::createStream(
1391     int level,
1392     CodecType type) {
1393   return make_unique<ZSTDStreamCodec>(level, type);
1394 }
1395
1396 ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
1397     : StreamCodec(type) {
1398   DCHECK(type == CodecType::ZSTD);
1399   switch (level) {
1400     case COMPRESSION_LEVEL_FASTEST:
1401       level = 1;
1402       break;
1403     case COMPRESSION_LEVEL_DEFAULT:
1404       level = 1;
1405       break;
1406     case COMPRESSION_LEVEL_BEST:
1407       level = 19;
1408       break;
1409   }
1410   if (level < 1 || level > ZSTD_maxCLevel()) {
1411     throw std::invalid_argument(
1412         to<std::string>("ZSTD: invalid level: ", level));
1413   }
1414   level_ = level;
1415 }
1416
1417 bool ZSTDStreamCodec::doNeedsUncompressedLength() const {
1418   return false;
1419 }
1420
1421 uint64_t ZSTDStreamCodec::doMaxCompressedLength(
1422     uint64_t uncompressedLength) const {
1423   return ZSTD_compressBound(uncompressedLength);
1424 }
1425
1426 void zstdThrowIfError(size_t rc) {
1427   if (!ZSTD_isError(rc)) {
1428     return;
1429   }
1430   throw std::runtime_error(
1431       to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1432 }
1433
1434 Optional<uint64_t> ZSTDStreamCodec::doGetUncompressedLength(
1435     IOBuf const* data,
1436     Optional<uint64_t> uncompressedLength) const {
1437   // Read decompressed size from frame if available in first IOBuf.
1438   auto const decompressedSize =
1439       ZSTD_getDecompressedSize(data->data(), data->length());
1440   if (decompressedSize != 0) {
1441     if (uncompressedLength && *uncompressedLength != decompressedSize) {
1442       throw std::runtime_error("ZSTD: invalid uncompressed length");
1443     }
1444     uncompressedLength = decompressedSize;
1445   }
1446   return uncompressedLength;
1447 }
1448
1449 void ZSTDStreamCodec::doResetStream() {
1450   needReset_ = true;
1451 }
1452
1453 bool ZSTDStreamCodec::tryBlockCompress(
1454     ByteRange& input,
1455     MutableByteRange& output) const {
1456   DCHECK(needReset_);
1457   // We need to know that we have enough output space to use block compression
1458   if (output.size() < ZSTD_compressBound(input.size())) {
1459     return false;
1460   }
1461   size_t const length = ZSTD_compress(
1462       output.data(), output.size(), input.data(), input.size(), level_);
1463   zstdThrowIfError(length);
1464   input.uncheckedAdvance(input.size());
1465   output.uncheckedAdvance(length);
1466   return true;
1467 }
1468
1469 void ZSTDStreamCodec::resetCStream() {
1470   if (!cstream_) {
1471     cstream_.reset(ZSTD_createCStream());
1472     if (!cstream_) {
1473       throw std::bad_alloc{};
1474     }
1475   }
1476   // Advanced API usage works for all supported versions of zstd.
1477   // Required to set contentSizeFlag.
1478   auto params = ZSTD_getParams(level_, uncompressedLength().value_or(0), 0);
1479   params.fParams.contentSizeFlag = uncompressedLength().hasValue();
1480   zstdThrowIfError(ZSTD_initCStream_advanced(
1481       cstream_.get(), nullptr, 0, params, uncompressedLength().value_or(0)));
1482 }
1483
1484 bool ZSTDStreamCodec::doCompressStream(
1485     ByteRange& input,
1486     MutableByteRange& output,
1487     StreamCodec::FlushOp flushOp) {
1488   if (needReset_) {
1489     // If we are given all the input in one chunk try to use block compression
1490     if (flushOp == StreamCodec::FlushOp::END &&
1491         tryBlockCompress(input, output)) {
1492       return true;
1493     }
1494     resetCStream();
1495     needReset_ = false;
1496   }
1497   ZSTD_inBuffer in = {input.data(), input.size(), 0};
1498   ZSTD_outBuffer out = {output.data(), output.size(), 0};
1499   SCOPE_EXIT {
1500     input.uncheckedAdvance(in.pos);
1501     output.uncheckedAdvance(out.pos);
1502   };
1503   if (flushOp == StreamCodec::FlushOp::NONE || !input.empty()) {
1504     zstdThrowIfError(ZSTD_compressStream(cstream_.get(), &out, &in));
1505   }
1506   if (in.pos == in.size && flushOp != StreamCodec::FlushOp::NONE) {
1507     size_t rc;
1508     switch (flushOp) {
1509       case StreamCodec::FlushOp::FLUSH:
1510         rc = ZSTD_flushStream(cstream_.get(), &out);
1511         break;
1512       case StreamCodec::FlushOp::END:
1513         rc = ZSTD_endStream(cstream_.get(), &out);
1514         break;
1515       default:
1516         throw std::invalid_argument("ZSTD: invalid FlushOp");
1517     }
1518     zstdThrowIfError(rc);
1519     if (rc == 0) {
1520       return true;
1521     }
1522   }
1523   return false;
1524 }
1525
1526 bool ZSTDStreamCodec::tryBlockUncompress(
1527     ByteRange& input,
1528     MutableByteRange& output) const {
1529   DCHECK(needReset_);
1530 #if ZSTD_VERSION_NUMBER < 10104
1531   // We require ZSTD_findFrameCompressedSize() to perform this optimization.
1532   return false;
1533 #else
1534   // We need to know the uncompressed length and have enough output space.
1535   if (!uncompressedLength() || output.size() < *uncompressedLength()) {
1536     return false;
1537   }
1538   size_t const compressedLength =
1539       ZSTD_findFrameCompressedSize(input.data(), input.size());
1540   zstdThrowIfError(compressedLength);
1541   size_t const length = ZSTD_decompress(
1542       output.data(), *uncompressedLength(), input.data(), compressedLength);
1543   zstdThrowIfError(length);
1544   if (length != *uncompressedLength()) {
1545     throw std::runtime_error("ZSTDStreamCodec: Incorrect uncompressed length");
1546   }
1547   input.uncheckedAdvance(compressedLength);
1548   output.uncheckedAdvance(length);
1549   return true;
1550 #endif
1551 }
1552
1553 void ZSTDStreamCodec::resetDStream() {
1554   if (!dstream_) {
1555     dstream_.reset(ZSTD_createDStream());
1556     if (!dstream_) {
1557       throw std::bad_alloc{};
1558     }
1559   }
1560   zstdThrowIfError(ZSTD_initDStream(dstream_.get()));
1561 }
1562
1563 bool ZSTDStreamCodec::doUncompressStream(
1564     ByteRange& input,
1565     MutableByteRange& output,
1566     StreamCodec::FlushOp flushOp) {
1567   if (needReset_) {
1568     // If we are given all the input in one chunk try to use block uncompression
1569     if (flushOp == StreamCodec::FlushOp::END &&
1570         tryBlockUncompress(input, output)) {
1571       return true;
1572     }
1573     resetDStream();
1574     needReset_ = false;
1575   }
1576   ZSTD_inBuffer in = {input.data(), input.size(), 0};
1577   ZSTD_outBuffer out = {output.data(), output.size(), 0};
1578   SCOPE_EXIT {
1579     input.uncheckedAdvance(in.pos);
1580     output.uncheckedAdvance(out.pos);
1581   };
1582   size_t const rc = ZSTD_decompressStream(dstream_.get(), &out, &in);
1583   zstdThrowIfError(rc);
1584   return rc == 0;
1585 }
1586
1587 #endif // FOLLY_HAVE_LIBZSTD
1588
1589 #if FOLLY_HAVE_LIBBZ2
1590
1591 class Bzip2Codec final : public Codec {
1592  public:
1593   static std::unique_ptr<Codec> create(int level, CodecType type);
1594   explicit Bzip2Codec(int level, CodecType type);
1595
1596   std::vector<std::string> validPrefixes() const override;
1597   bool canUncompress(IOBuf const* data, Optional<uint64_t> uncompressedLength)
1598       const override;
1599
1600  private:
1601   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1602   std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
1603   std::unique_ptr<IOBuf> doUncompress(
1604       IOBuf const* data,
1605       Optional<uint64_t> uncompressedLength) override;
1606
1607   int level_;
1608 };
1609
1610 /* static */ std::unique_ptr<Codec> Bzip2Codec::create(
1611     int level,
1612     CodecType type) {
1613   return std::make_unique<Bzip2Codec>(level, type);
1614 }
1615
1616 Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
1617   DCHECK(type == CodecType::BZIP2);
1618   switch (level) {
1619     case COMPRESSION_LEVEL_FASTEST:
1620       level = 1;
1621       break;
1622     case COMPRESSION_LEVEL_DEFAULT:
1623       level = 9;
1624       break;
1625     case COMPRESSION_LEVEL_BEST:
1626       level = 9;
1627       break;
1628   }
1629   if (level < 1 || level > 9) {
1630     throw std::invalid_argument(
1631         to<std::string>("Bzip2: invalid level: ", level));
1632   }
1633   level_ = level;
1634 }
1635
1636 static uint32_t constexpr kBzip2MagicLE = 0x685a42;
1637 static uint64_t constexpr kBzip2MagicBytes = 3;
1638
1639 std::vector<std::string> Bzip2Codec::validPrefixes() const {
1640   return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
1641 }
1642
1643 bool Bzip2Codec::canUncompress(IOBuf const* data, Optional<uint64_t>) const {
1644   return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
1645 }
1646
1647 uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1648   // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
1649   //   To guarantee that the compressed data will fit in its buffer, allocate an
1650   //   output buffer of size 1% larger than the uncompressed data, plus six
1651   //   hundred extra bytes.
1652   return uncompressedLength + uncompressedLength / 100 + 600;
1653 }
1654
1655 static bz_stream createBzStream() {
1656   bz_stream stream;
1657   stream.bzalloc = nullptr;
1658   stream.bzfree = nullptr;
1659   stream.opaque = nullptr;
1660   stream.next_in = stream.next_out = nullptr;
1661   stream.avail_in = stream.avail_out = 0;
1662   return stream;
1663 }
1664
1665 // Throws on error condition, otherwise returns the code.
1666 static int bzCheck(int const rc) {
1667   switch (rc) {
1668     case BZ_OK:
1669     case BZ_RUN_OK:
1670     case BZ_FLUSH_OK:
1671     case BZ_FINISH_OK:
1672     case BZ_STREAM_END:
1673       return rc;
1674     default:
1675       throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
1676   }
1677 }
1678
1679 static std::unique_ptr<IOBuf> addOutputBuffer(
1680     bz_stream* stream,
1681     uint64_t const bufferLength) {
1682   DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
1683   DCHECK_EQ(stream->avail_out, 0);
1684
1685   auto buf = IOBuf::create(bufferLength);
1686   buf->append(buf->capacity());
1687
1688   stream->next_out = reinterpret_cast<char*>(buf->writableData());
1689   stream->avail_out = buf->length();
1690
1691   return buf;
1692 }
1693
1694 std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
1695   bz_stream stream = createBzStream();
1696   bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
1697   SCOPE_EXIT {
1698     bzCheck(BZ2_bzCompressEnd(&stream));
1699   };
1700
1701   uint64_t const uncompressedLength = data->computeChainDataLength();
1702   uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
1703   uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1704   uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
1705
1706   auto out = addOutputBuffer(
1707       &stream,
1708       maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
1709                                                : kDefaultBufferLength);
1710
1711   for (auto range : *data) {
1712     while (!range.empty()) {
1713       auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1714       stream.next_in =
1715           const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1716       stream.avail_in = inSize;
1717
1718       if (stream.avail_out == 0) {
1719         out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1720       }
1721
1722       bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
1723       range.uncheckedAdvance(inSize - stream.avail_in);
1724     }
1725   }
1726   do {
1727     if (stream.avail_out == 0) {
1728       out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1729     }
1730   } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
1731
1732   out->prev()->trimEnd(stream.avail_out);
1733
1734   return out;
1735 }
1736
1737 std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
1738     const IOBuf* data,
1739     Optional<uint64_t> uncompressedLength) {
1740   bz_stream stream = createBzStream();
1741   bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
1742   SCOPE_EXIT {
1743     bzCheck(BZ2_bzDecompressEnd(&stream));
1744   };
1745
1746   uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1747   uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
1748   uint64_t const kDefaultBufferLength =
1749       computeBufferLength(data->computeChainDataLength(), kBlockSize);
1750
1751   auto out = addOutputBuffer(
1752       &stream,
1753       ((uncompressedLength && *uncompressedLength <= kMaxSingleStepLength)
1754            ? *uncompressedLength
1755            : kDefaultBufferLength));
1756
1757   int rc = BZ_OK;
1758   for (auto range : *data) {
1759     while (!range.empty()) {
1760       auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1761       stream.next_in =
1762           const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1763       stream.avail_in = inSize;
1764
1765       if (stream.avail_out == 0) {
1766         out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1767       }
1768
1769       rc = bzCheck(BZ2_bzDecompress(&stream));
1770       range.uncheckedAdvance(inSize - stream.avail_in);
1771     }
1772   }
1773   while (rc != BZ_STREAM_END) {
1774     if (stream.avail_out == 0) {
1775       out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1776     }
1777     size_t const outputSize = stream.avail_out;
1778     rc = bzCheck(BZ2_bzDecompress(&stream));
1779     if (outputSize == stream.avail_out) {
1780       throw std::runtime_error("Bzip2Codec: Truncated input");
1781     }
1782   }
1783
1784   out->prev()->trimEnd(stream.avail_out);
1785
1786   uint64_t const totalOut =
1787       (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
1788   if (uncompressedLength && uncompressedLength != totalOut) {
1789     throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
1790   }
1791
1792   return out;
1793 }
1794
1795 #endif // FOLLY_HAVE_LIBBZ2
1796
1797 #if FOLLY_HAVE_LIBZ
1798
1799 zlib::Options getZlibOptions(CodecType type) {
1800   DCHECK(type == CodecType::GZIP || type == CodecType::ZLIB);
1801   return type == CodecType::GZIP ? zlib::defaultGzipOptions()
1802                                  : zlib::defaultZlibOptions();
1803 }
1804
1805 std::unique_ptr<Codec> getZlibCodec(int level, CodecType type) {
1806   return zlib::getCodec(getZlibOptions(type), level);
1807 }
1808
1809 std::unique_ptr<StreamCodec> getZlibStreamCodec(int level, CodecType type) {
1810   return zlib::getStreamCodec(getZlibOptions(type), level);
1811 }
1812
1813 #endif // FOLLY_HAVE_LIBZ
1814
1815 /**
1816  * Automatic decompression
1817  */
1818 class AutomaticCodec final : public Codec {
1819  public:
1820   static std::unique_ptr<Codec> create(
1821       std::vector<std::unique_ptr<Codec>> customCodecs);
1822   explicit AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs);
1823
1824   std::vector<std::string> validPrefixes() const override;
1825   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1826       const override;
1827
1828  private:
1829   bool doNeedsUncompressedLength() const override;
1830   uint64_t doMaxUncompressedLength() const override;
1831
1832   uint64_t doMaxCompressedLength(uint64_t) const override {
1833     throw std::runtime_error(
1834         "AutomaticCodec error: maxCompressedLength() not supported.");
1835   }
1836   std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
1837     throw std::runtime_error("AutomaticCodec error: compress() not supported.");
1838   }
1839   std::unique_ptr<IOBuf> doUncompress(
1840       const IOBuf* data,
1841       Optional<uint64_t> uncompressedLength) override;
1842
1843   void addCodecIfSupported(CodecType type);
1844
1845   // Throws iff the codecs aren't compatible (very slow)
1846   void checkCompatibleCodecs() const;
1847
1848   std::vector<std::unique_ptr<Codec>> codecs_;
1849   bool needsUncompressedLength_;
1850   uint64_t maxUncompressedLength_;
1851 };
1852
1853 std::vector<std::string> AutomaticCodec::validPrefixes() const {
1854   std::unordered_set<std::string> prefixes;
1855   for (const auto& codec : codecs_) {
1856     const auto codecPrefixes = codec->validPrefixes();
1857     prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
1858   }
1859   return std::vector<std::string>{prefixes.begin(), prefixes.end()};
1860 }
1861
1862 bool AutomaticCodec::canUncompress(
1863     const IOBuf* data,
1864     Optional<uint64_t> uncompressedLength) const {
1865   return std::any_of(
1866       codecs_.begin(),
1867       codecs_.end(),
1868       [data, uncompressedLength](std::unique_ptr<Codec> const& codec) {
1869         return codec->canUncompress(data, uncompressedLength);
1870       });
1871 }
1872
1873 void AutomaticCodec::addCodecIfSupported(CodecType type) {
1874   const bool present = std::any_of(
1875       codecs_.begin(),
1876       codecs_.end(),
1877       [&type](std::unique_ptr<Codec> const& codec) {
1878         return codec->type() == type;
1879       });
1880   if (hasCodec(type) && !present) {
1881     codecs_.push_back(getCodec(type));
1882   }
1883 }
1884
1885 /* static */ std::unique_ptr<Codec> AutomaticCodec::create(
1886     std::vector<std::unique_ptr<Codec>> customCodecs) {
1887   return std::make_unique<AutomaticCodec>(std::move(customCodecs));
1888 }
1889
1890 AutomaticCodec::AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs)
1891     : Codec(CodecType::USER_DEFINED), codecs_(std::move(customCodecs)) {
1892   // Fastest -> slowest
1893   addCodecIfSupported(CodecType::LZ4_FRAME);
1894   addCodecIfSupported(CodecType::ZSTD);
1895   addCodecIfSupported(CodecType::ZLIB);
1896   addCodecIfSupported(CodecType::GZIP);
1897   addCodecIfSupported(CodecType::LZMA2);
1898   addCodecIfSupported(CodecType::BZIP2);
1899   if (kIsDebug) {
1900     checkCompatibleCodecs();
1901   }
1902   // Check that none of the codes are are null
1903   DCHECK(std::none_of(
1904       codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1905         return codec == nullptr;
1906       }));
1907
1908   needsUncompressedLength_ = std::any_of(
1909       codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1910         return codec->needsUncompressedLength();
1911       });
1912
1913   const auto it = std::max_element(
1914       codecs_.begin(),
1915       codecs_.end(),
1916       [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) {
1917         return lhs->maxUncompressedLength() < rhs->maxUncompressedLength();
1918       });
1919   DCHECK(it != codecs_.end());
1920   maxUncompressedLength_ = (*it)->maxUncompressedLength();
1921 }
1922
1923 void AutomaticCodec::checkCompatibleCodecs() const {
1924   // Keep track of all the possible headers.
1925   std::unordered_set<std::string> headers;
1926   // The empty header is not allowed.
1927   headers.insert("");
1928   // Step 1:
1929   // Construct a set of headers and check that none of the headers occur twice.
1930   // Eliminate edge cases.
1931   for (auto&& codec : codecs_) {
1932     const auto codecHeaders = codec->validPrefixes();
1933     // Codecs without any valid headers are not allowed.
1934     if (codecHeaders.empty()) {
1935       throw std::invalid_argument{
1936           "AutomaticCodec: validPrefixes() must not be empty."};
1937     }
1938     // Insert all the headers for the current codec.
1939     const size_t beforeSize = headers.size();
1940     headers.insert(codecHeaders.begin(), codecHeaders.end());
1941     // Codecs are not compatible if any header occurred twice.
1942     if (beforeSize + codecHeaders.size() != headers.size()) {
1943       throw std::invalid_argument{
1944           "AutomaticCodec: Two valid prefixes collide."};
1945     }
1946   }
1947   // Step 2:
1948   // Check if any strict non-empty prefix of any header is a header.
1949   for (const auto& header : headers) {
1950     for (size_t i = 1; i < header.size(); ++i) {
1951       if (headers.count(header.substr(0, i))) {
1952         throw std::invalid_argument{
1953             "AutomaticCodec: One valid prefix is a prefix of another valid "
1954             "prefix."};
1955       }
1956     }
1957   }
1958 }
1959
1960 bool AutomaticCodec::doNeedsUncompressedLength() const {
1961   return needsUncompressedLength_;
1962 }
1963
1964 uint64_t AutomaticCodec::doMaxUncompressedLength() const {
1965   return maxUncompressedLength_;
1966 }
1967
1968 std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
1969     const IOBuf* data,
1970     Optional<uint64_t> uncompressedLength) {
1971   for (auto&& codec : codecs_) {
1972     if (codec->canUncompress(data, uncompressedLength)) {
1973       return codec->uncompress(data, uncompressedLength);
1974     }
1975   }
1976   throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
1977 }
1978
1979 using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
1980 using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
1981 struct Factory {
1982   CodecFactory codec;
1983   StreamCodecFactory stream;
1984 };
1985
1986 constexpr Factory
1987     codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
1988         {}, // USER_DEFINED
1989         {NoCompressionCodec::create, nullptr},
1990
1991 #if FOLLY_HAVE_LIBLZ4
1992         {LZ4Codec::create, nullptr},
1993 #else
1994         {},
1995 #endif
1996
1997 #if FOLLY_HAVE_LIBSNAPPY
1998         {SnappyCodec::create, nullptr},
1999 #else
2000         {},
2001 #endif
2002
2003 #if FOLLY_HAVE_LIBZ
2004         {getZlibCodec, getZlibStreamCodec},
2005 #else
2006         {},
2007 #endif
2008
2009 #if FOLLY_HAVE_LIBLZ4
2010         {LZ4Codec::create, nullptr},
2011 #else
2012         {},
2013 #endif
2014
2015 #if FOLLY_HAVE_LIBLZMA
2016         {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2017         {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2018 #else
2019         {},
2020         {},
2021 #endif
2022
2023 #if FOLLY_HAVE_LIBZSTD
2024         {ZSTDStreamCodec::createCodec, ZSTDStreamCodec::createStream},
2025 #else
2026         {},
2027 #endif
2028
2029 #if FOLLY_HAVE_LIBZ
2030         {getZlibCodec, getZlibStreamCodec},
2031 #else
2032         {},
2033 #endif
2034
2035 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
2036         {LZ4FrameCodec::create, nullptr},
2037 #else
2038         {},
2039 #endif
2040
2041 #if FOLLY_HAVE_LIBBZ2
2042         {Bzip2Codec::create, nullptr},
2043 #else
2044         {},
2045 #endif
2046 };
2047
2048 Factory const& getFactory(CodecType type) {
2049   size_t const idx = static_cast<size_t>(type);
2050   if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
2051     throw std::invalid_argument(
2052         to<std::string>("Compression type ", idx, " invalid"));
2053   }
2054   return codecFactories[idx];
2055 }
2056 } // namespace
2057
2058 bool hasCodec(CodecType type) {
2059   return getFactory(type).codec != nullptr;
2060 }
2061
2062 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
2063   auto const factory = getFactory(type).codec;
2064   if (!factory) {
2065     throw std::invalid_argument(
2066         to<std::string>("Compression type ", type, " not supported"));
2067   }
2068   auto codec = (*factory)(level, type);
2069   DCHECK(codec->type() == type);
2070   return codec;
2071 }
2072
2073 bool hasStreamCodec(CodecType type) {
2074   return getFactory(type).stream != nullptr;
2075 }
2076
2077 std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
2078   auto const factory = getFactory(type).stream;
2079   if (!factory) {
2080     throw std::invalid_argument(
2081         to<std::string>("Compression type ", type, " not supported"));
2082   }
2083   auto codec = (*factory)(level, type);
2084   DCHECK(codec->type() == type);
2085   return codec;
2086 }
2087
2088 std::unique_ptr<Codec> getAutoUncompressionCodec(
2089     std::vector<std::unique_ptr<Codec>> customCodecs) {
2090   return AutomaticCodec::create(std::move(customCodecs));
2091 }
2092 } // namespace io
2093 } // namespace folly