3bcae86193e3beff7717f19b78e155ac2185fc5d
[folly.git] / folly / io / Compression.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/Compression.h>
18
19 #if FOLLY_HAVE_LIBLZ4
20 #include <lz4.h>
21 #include <lz4hc.h>
22 #if LZ4_VERSION_NUMBER >= 10301
23 #include <lz4frame.h>
24 #endif
25 #endif
26
27 #include <glog/logging.h>
28
29 #if FOLLY_HAVE_LIBSNAPPY
30 #include <snappy.h>
31 #include <snappy-sinksource.h>
32 #endif
33
34 #if FOLLY_HAVE_LIBZ
35 #include <zlib.h>
36 #endif
37
38 #if FOLLY_HAVE_LIBLZMA
39 #include <lzma.h>
40 #endif
41
42 #if FOLLY_HAVE_LIBZSTD
43 #include <zstd.h>
44 #endif
45
46 #if FOLLY_HAVE_LIBBZ2
47 #include <bzlib.h>
48 #endif
49
50 #include <folly/Bits.h>
51 #include <folly/Conv.h>
52 #include <folly/Memory.h>
53 #include <folly/Portability.h>
54 #include <folly/ScopeGuard.h>
55 #include <folly/Varint.h>
56 #include <folly/io/Cursor.h>
57 #include <algorithm>
58 #include <unordered_set>
59
60 namespace folly { namespace io {
61
62 Codec::Codec(CodecType type) : type_(type) { }
63
64 // Ensure consistent behavior in the nullptr case
65 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
66   uint64_t len = data->computeChainDataLength();
67   if (len == 0) {
68     return IOBuf::create(0);
69   }
70   if (len > maxUncompressedLength()) {
71     throw std::runtime_error("Codec: uncompressed length too large");
72   }
73
74   return doCompress(data);
75 }
76
77 std::string Codec::compress(const StringPiece data) {
78   const uint64_t len = data.size();
79   if (len == 0) {
80     return "";
81   }
82   if (len > maxUncompressedLength()) {
83     throw std::runtime_error("Codec: uncompressed length too large");
84   }
85
86   return doCompressString(data);
87 }
88
89 std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
90                                          uint64_t uncompressedLength) {
91   if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
92     if (needsUncompressedLength()) {
93       throw std::invalid_argument("Codec: uncompressed length required");
94     }
95   } else if (uncompressedLength > maxUncompressedLength()) {
96     throw std::runtime_error("Codec: uncompressed length too large");
97   }
98
99   if (data->empty()) {
100     if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
101         uncompressedLength != 0) {
102       throw std::runtime_error("Codec: invalid uncompressed length");
103     }
104     return IOBuf::create(0);
105   }
106
107   return doUncompress(data, uncompressedLength);
108 }
109
110 std::string Codec::uncompress(
111     const StringPiece data,
112     uint64_t uncompressedLength) {
113   if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
114     if (needsUncompressedLength()) {
115       throw std::invalid_argument("Codec: uncompressed length required");
116     }
117   } else if (uncompressedLength > maxUncompressedLength()) {
118     throw std::runtime_error("Codec: uncompressed length too large");
119   }
120
121   if (data.empty()) {
122     if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
123         uncompressedLength != 0) {
124       throw std::runtime_error("Codec: invalid uncompressed length");
125     }
126     return "";
127   }
128
129   return doUncompressString(data, uncompressedLength);
130 }
131
132 bool Codec::needsUncompressedLength() const {
133   return doNeedsUncompressedLength();
134 }
135
136 uint64_t Codec::maxUncompressedLength() const {
137   return doMaxUncompressedLength();
138 }
139
140 bool Codec::doNeedsUncompressedLength() const {
141   return false;
142 }
143
144 uint64_t Codec::doMaxUncompressedLength() const {
145   return UNLIMITED_UNCOMPRESSED_LENGTH;
146 }
147
148 std::vector<std::string> Codec::validPrefixes() const {
149   return {};
150 }
151
152 bool Codec::canUncompress(const IOBuf*, uint64_t) const {
153   return false;
154 }
155
156 std::string Codec::doCompressString(const StringPiece data) {
157   const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
158   auto outputBuffer = doCompress(&inputBuffer);
159   std::string output;
160   output.reserve(outputBuffer->computeChainDataLength());
161   for (auto range : *outputBuffer) {
162     output.append(reinterpret_cast<const char*>(range.data()), range.size());
163   }
164   return output;
165 }
166
167 std::string Codec::doUncompressString(
168     const StringPiece data,
169     uint64_t uncompressedLength) {
170   const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
171   auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
172   std::string output;
173   output.reserve(outputBuffer->computeChainDataLength());
174   for (auto range : *outputBuffer) {
175     output.append(reinterpret_cast<const char*>(range.data()), range.size());
176   }
177   return output;
178 }
179
180 namespace {
181
182 /**
183  * No compression
184  */
185 class NoCompressionCodec final : public Codec {
186  public:
187   static std::unique_ptr<Codec> create(int level, CodecType type);
188   explicit NoCompressionCodec(int level, CodecType type);
189
190  private:
191   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
192   std::unique_ptr<IOBuf> doUncompress(
193       const IOBuf* data,
194       uint64_t uncompressedLength) override;
195 };
196
197 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
198   return make_unique<NoCompressionCodec>(level, type);
199 }
200
201 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
202   : Codec(type) {
203   DCHECK(type == CodecType::NO_COMPRESSION);
204   switch (level) {
205   case COMPRESSION_LEVEL_DEFAULT:
206   case COMPRESSION_LEVEL_FASTEST:
207   case COMPRESSION_LEVEL_BEST:
208     level = 0;
209   }
210   if (level != 0) {
211     throw std::invalid_argument(to<std::string>(
212         "NoCompressionCodec: invalid level ", level));
213   }
214 }
215
216 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
217     const IOBuf* data) {
218   return data->clone();
219 }
220
221 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
222     const IOBuf* data,
223     uint64_t uncompressedLength) {
224   if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
225       data->computeChainDataLength() != uncompressedLength) {
226     throw std::runtime_error(to<std::string>(
227         "NoCompressionCodec: invalid uncompressed length"));
228   }
229   return data->clone();
230 }
231
232 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
233
234 namespace {
235
236 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
237   DCHECK_GE(out->tailroom(), kMaxVarintLength64);
238   out->append(encodeVarint(val, out->writableTail()));
239 }
240
241 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
242   uint64_t val = 0;
243   int8_t b = 0;
244   for (int shift = 0; shift <= 63; shift += 7) {
245     b = cursor.read<int8_t>();
246     val |= static_cast<uint64_t>(b & 0x7f) << shift;
247     if (b >= 0) {
248       break;
249     }
250   }
251   if (b < 0) {
252     throw std::invalid_argument("Invalid varint value. Too big.");
253   }
254   return val;
255 }
256
257 }  // namespace
258
259 #endif  // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
260
261 namespace {
262 /**
263  * Reads sizeof(T) bytes, and returns false if not enough bytes are available.
264  * Returns true if the first n bytes are equal to prefix when interpreted as
265  * a little endian T.
266  */
267 template <typename T>
268 typename std::enable_if<std::is_unsigned<T>::value, bool>::type
269 dataStartsWithLE(const IOBuf* data, T prefix, uint64_t n = sizeof(T)) {
270   DCHECK_GT(n, 0);
271   DCHECK_LE(n, sizeof(T));
272   T value;
273   Cursor cursor{data};
274   if (!cursor.tryReadLE(value)) {
275     return false;
276   }
277   const T mask = n == sizeof(T) ? T(-1) : (T(1) << (8 * n)) - 1;
278   return prefix == (value & mask);
279 }
280
281 template <typename T>
282 typename std::enable_if<std::is_arithmetic<T>::value, std::string>::type
283 prefixToStringLE(T prefix, uint64_t n = sizeof(T)) {
284   DCHECK_GT(n, 0);
285   DCHECK_LE(n, sizeof(T));
286   prefix = Endian::little(prefix);
287   std::string result;
288   result.resize(n);
289   memcpy(&result[0], &prefix, n);
290   return result;
291 }
292
293 static uint64_t computeBufferLength(
294     uint64_t const compressedLength,
295     uint64_t const blockSize) {
296   uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
297   uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
298   return std::min(goodBufferSize, kMaxBufferLength);
299 }
300 } // namespace
301
302 #if FOLLY_HAVE_LIBLZ4
303
304 /**
305  * LZ4 compression
306  */
307 class LZ4Codec final : public Codec {
308  public:
309   static std::unique_ptr<Codec> create(int level, CodecType type);
310   explicit LZ4Codec(int level, CodecType type);
311
312  private:
313   bool doNeedsUncompressedLength() const override;
314   uint64_t doMaxUncompressedLength() const override;
315
316   bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
317
318   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
319   std::unique_ptr<IOBuf> doUncompress(
320       const IOBuf* data,
321       uint64_t uncompressedLength) override;
322
323   bool highCompression_;
324 };
325
326 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
327   return make_unique<LZ4Codec>(level, type);
328 }
329
330 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
331   DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
332
333   switch (level) {
334   case COMPRESSION_LEVEL_FASTEST:
335   case COMPRESSION_LEVEL_DEFAULT:
336     level = 1;
337     break;
338   case COMPRESSION_LEVEL_BEST:
339     level = 2;
340     break;
341   }
342   if (level < 1 || level > 2) {
343     throw std::invalid_argument(to<std::string>(
344         "LZ4Codec: invalid level: ", level));
345   }
346   highCompression_ = (level > 1);
347 }
348
349 bool LZ4Codec::doNeedsUncompressedLength() const {
350   return !encodeSize();
351 }
352
353 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
354 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
355 // here.
356 #ifndef LZ4_MAX_INPUT_SIZE
357 # define LZ4_MAX_INPUT_SIZE 0x7E000000
358 #endif
359
360 uint64_t LZ4Codec::doMaxUncompressedLength() const {
361   return LZ4_MAX_INPUT_SIZE;
362 }
363
364 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
365   IOBuf clone;
366   if (data->isChained()) {
367     // LZ4 doesn't support streaming, so we have to coalesce
368     clone = data->cloneCoalescedAsValue();
369     data = &clone;
370   }
371
372   uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
373   auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
374   if (encodeSize()) {
375     encodeVarintToIOBuf(data->length(), out.get());
376   }
377
378   int n;
379   auto input = reinterpret_cast<const char*>(data->data());
380   auto output = reinterpret_cast<char*>(out->writableTail());
381   const auto inputLength = data->length();
382 #if LZ4_VERSION_NUMBER >= 10700
383   if (highCompression_) {
384     n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
385   } else {
386     n = LZ4_compress_default(input, output, inputLength, out->tailroom());
387   }
388 #else
389   if (highCompression_) {
390     n = LZ4_compressHC(input, output, inputLength);
391   } else {
392     n = LZ4_compress(input, output, inputLength);
393   }
394 #endif
395
396   CHECK_GE(n, 0);
397   CHECK_LE(n, out->capacity());
398
399   out->append(n);
400   return out;
401 }
402
403 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
404     const IOBuf* data,
405     uint64_t uncompressedLength) {
406   IOBuf clone;
407   if (data->isChained()) {
408     // LZ4 doesn't support streaming, so we have to coalesce
409     clone = data->cloneCoalescedAsValue();
410     data = &clone;
411   }
412
413   folly::io::Cursor cursor(data);
414   uint64_t actualUncompressedLength;
415   if (encodeSize()) {
416     actualUncompressedLength = decodeVarintFromCursor(cursor);
417     if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
418         uncompressedLength != actualUncompressedLength) {
419       throw std::runtime_error("LZ4Codec: invalid uncompressed length");
420     }
421   } else {
422     actualUncompressedLength = uncompressedLength;
423     if (actualUncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH ||
424         actualUncompressedLength > maxUncompressedLength()) {
425       throw std::runtime_error("LZ4Codec: invalid uncompressed length");
426     }
427   }
428
429   auto sp = StringPiece{cursor.peekBytes()};
430   auto out = IOBuf::create(actualUncompressedLength);
431   int n = LZ4_decompress_safe(
432       sp.data(),
433       reinterpret_cast<char*>(out->writableTail()),
434       sp.size(),
435       actualUncompressedLength);
436
437   if (n < 0 || uint64_t(n) != actualUncompressedLength) {
438     throw std::runtime_error(to<std::string>(
439         "LZ4 decompression returned invalid value ", n));
440   }
441   out->append(actualUncompressedLength);
442   return out;
443 }
444
445 #if LZ4_VERSION_NUMBER >= 10301
446
447 class LZ4FrameCodec final : public Codec {
448  public:
449   static std::unique_ptr<Codec> create(int level, CodecType type);
450   explicit LZ4FrameCodec(int level, CodecType type);
451   ~LZ4FrameCodec();
452
453   std::vector<std::string> validPrefixes() const override;
454   bool canUncompress(const IOBuf* data, uint64_t uncompressedLength)
455       const override;
456
457  private:
458   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
459   std::unique_ptr<IOBuf> doUncompress(
460       const IOBuf* data,
461       uint64_t uncompressedLength) override;
462
463   // Reset the dctx_ if it is dirty or null.
464   void resetDCtx();
465
466   int level_;
467   LZ4F_decompressionContext_t dctx_{nullptr};
468   bool dirty_{false};
469 };
470
471 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
472     int level,
473     CodecType type) {
474   return make_unique<LZ4FrameCodec>(level, type);
475 }
476
477 static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204;
478
479 std::vector<std::string> LZ4FrameCodec::validPrefixes() const {
480   return {prefixToStringLE(kLZ4FrameMagicLE)};
481 }
482
483 bool LZ4FrameCodec::canUncompress(const IOBuf* data, uint64_t) const {
484   return dataStartsWithLE(data, kLZ4FrameMagicLE);
485 }
486
487 static size_t lz4FrameThrowOnError(size_t code) {
488   if (LZ4F_isError(code)) {
489     throw std::runtime_error(
490         to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
491   }
492   return code;
493 }
494
495 void LZ4FrameCodec::resetDCtx() {
496   if (dctx_ && !dirty_) {
497     return;
498   }
499   if (dctx_) {
500     LZ4F_freeDecompressionContext(dctx_);
501   }
502   lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
503   dirty_ = false;
504 }
505
506 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
507   DCHECK(type == CodecType::LZ4_FRAME);
508   switch (level) {
509     case COMPRESSION_LEVEL_FASTEST:
510     case COMPRESSION_LEVEL_DEFAULT:
511       level_ = 0;
512       break;
513     case COMPRESSION_LEVEL_BEST:
514       level_ = 16;
515       break;
516     default:
517       level_ = level;
518       break;
519   }
520 }
521
522 LZ4FrameCodec::~LZ4FrameCodec() {
523   if (dctx_) {
524     LZ4F_freeDecompressionContext(dctx_);
525   }
526 }
527
528 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
529   // LZ4 Frame compression doesn't support streaming so we have to coalesce
530   IOBuf clone;
531   if (data->isChained()) {
532     clone = data->cloneCoalescedAsValue();
533     data = &clone;
534   }
535   // Set preferences
536   const auto uncompressedLength = data->length();
537   LZ4F_preferences_t prefs{};
538   prefs.compressionLevel = level_;
539   prefs.frameInfo.contentSize = uncompressedLength;
540   // Compress
541   auto buf = IOBuf::create(LZ4F_compressFrameBound(uncompressedLength, &prefs));
542   const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
543       buf->writableTail(),
544       buf->tailroom(),
545       data->data(),
546       data->length(),
547       &prefs));
548   buf->append(written);
549   return buf;
550 }
551
552 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
553     const IOBuf* data,
554     uint64_t uncompressedLength) {
555   // Reset the dctx if any errors have occurred
556   resetDCtx();
557   // Coalesce the data
558   ByteRange in = *data->begin();
559   IOBuf clone;
560   if (data->isChained()) {
561     clone = data->cloneCoalescedAsValue();
562     in = clone.coalesce();
563   }
564   data = nullptr;
565   // Select decompression options
566   LZ4F_decompressOptions_t options;
567   options.stableDst = 1;
568   // Select blockSize and growthSize for the IOBufQueue
569   IOBufQueue queue(IOBufQueue::cacheChainLength());
570   auto blockSize = uint64_t{64} << 10;
571   auto growthSize = uint64_t{4} << 20;
572   if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) {
573     // Allocate uncompressedLength in one chunk (up to 64 MB)
574     const auto allocateSize = std::min(uncompressedLength, uint64_t{64} << 20);
575     queue.preallocate(allocateSize, allocateSize);
576     blockSize = std::min(uncompressedLength, blockSize);
577     growthSize = std::min(uncompressedLength, growthSize);
578   } else {
579     // Reduce growthSize for small data
580     const auto guessUncompressedLen =
581         4 * std::max<uint64_t>(blockSize, in.size());
582     growthSize = std::min(guessUncompressedLen, growthSize);
583   }
584   // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
585   // returns 0
586   dirty_ = true;
587   // Decompress until the frame is over
588   size_t code = 0;
589   do {
590     // Allocate enough space to decompress at least a block
591     void* out;
592     size_t outSize;
593     std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
594     // Decompress
595     size_t inSize = in.size();
596     code = lz4FrameThrowOnError(
597         LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
598     if (in.empty() && outSize == 0 && code != 0) {
599       // We passed no input, no output was produced, and the frame isn't over
600       // No more forward progress is possible
601       throw std::runtime_error("LZ4Frame error: Incomplete frame");
602     }
603     in.uncheckedAdvance(inSize);
604     queue.postallocate(outSize);
605   } while (code != 0);
606   // At this point the decompression context can be reused
607   dirty_ = false;
608   if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
609       queue.chainLength() != uncompressedLength) {
610     throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
611   }
612   return queue.move();
613 }
614
615 #endif // LZ4_VERSION_NUMBER >= 10301
616 #endif // FOLLY_HAVE_LIBLZ4
617
618 #if FOLLY_HAVE_LIBSNAPPY
619
620 /**
621  * Snappy compression
622  */
623
624 /**
625  * Implementation of snappy::Source that reads from a IOBuf chain.
626  */
627 class IOBufSnappySource final : public snappy::Source {
628  public:
629   explicit IOBufSnappySource(const IOBuf* data);
630   size_t Available() const override;
631   const char* Peek(size_t* len) override;
632   void Skip(size_t n) override;
633  private:
634   size_t available_;
635   io::Cursor cursor_;
636 };
637
638 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
639   : available_(data->computeChainDataLength()),
640     cursor_(data) {
641 }
642
643 size_t IOBufSnappySource::Available() const {
644   return available_;
645 }
646
647 const char* IOBufSnappySource::Peek(size_t* len) {
648   auto sp = StringPiece{cursor_.peekBytes()};
649   *len = sp.size();
650   return sp.data();
651 }
652
653 void IOBufSnappySource::Skip(size_t n) {
654   CHECK_LE(n, available_);
655   cursor_.skip(n);
656   available_ -= n;
657 }
658
659 class SnappyCodec final : public Codec {
660  public:
661   static std::unique_ptr<Codec> create(int level, CodecType type);
662   explicit SnappyCodec(int level, CodecType type);
663
664  private:
665   uint64_t doMaxUncompressedLength() const override;
666   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
667   std::unique_ptr<IOBuf> doUncompress(
668       const IOBuf* data,
669       uint64_t uncompressedLength) override;
670 };
671
672 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
673   return make_unique<SnappyCodec>(level, type);
674 }
675
676 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
677   DCHECK(type == CodecType::SNAPPY);
678   switch (level) {
679   case COMPRESSION_LEVEL_FASTEST:
680   case COMPRESSION_LEVEL_DEFAULT:
681   case COMPRESSION_LEVEL_BEST:
682     level = 1;
683   }
684   if (level != 1) {
685     throw std::invalid_argument(to<std::string>(
686         "SnappyCodec: invalid level: ", level));
687   }
688 }
689
690 uint64_t SnappyCodec::doMaxUncompressedLength() const {
691   // snappy.h uses uint32_t for lengths, so there's that.
692   return std::numeric_limits<uint32_t>::max();
693 }
694
695 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
696   IOBufSnappySource source(data);
697   auto out =
698     IOBuf::create(snappy::MaxCompressedLength(source.Available()));
699
700   snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
701       out->writableTail()));
702
703   size_t n = snappy::Compress(&source, &sink);
704
705   CHECK_LE(n, out->capacity());
706   out->append(n);
707   return out;
708 }
709
710 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
711                                                  uint64_t uncompressedLength) {
712   uint32_t actualUncompressedLength = 0;
713
714   {
715     IOBufSnappySource source(data);
716     if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
717       throw std::runtime_error("snappy::GetUncompressedLength failed");
718     }
719     if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
720         uncompressedLength != actualUncompressedLength) {
721       throw std::runtime_error("snappy: invalid uncompressed length");
722     }
723   }
724
725   auto out = IOBuf::create(actualUncompressedLength);
726
727   {
728     IOBufSnappySource source(data);
729     if (!snappy::RawUncompress(&source,
730                                reinterpret_cast<char*>(out->writableTail()))) {
731       throw std::runtime_error("snappy::RawUncompress failed");
732     }
733   }
734
735   out->append(actualUncompressedLength);
736   return out;
737 }
738
739 #endif  // FOLLY_HAVE_LIBSNAPPY
740
741 #if FOLLY_HAVE_LIBZ
742 /**
743  * Zlib codec
744  */
745 class ZlibCodec final : public Codec {
746  public:
747   static std::unique_ptr<Codec> create(int level, CodecType type);
748   explicit ZlibCodec(int level, CodecType type);
749
750   std::vector<std::string> validPrefixes() const override;
751   bool canUncompress(const IOBuf* data, uint64_t uncompressedLength)
752       const override;
753
754  private:
755   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
756   std::unique_ptr<IOBuf> doUncompress(
757       const IOBuf* data,
758       uint64_t uncompressedLength) override;
759
760   std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
761   bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
762
763   int level_;
764 };
765
766 static constexpr uint16_t kGZIPMagicLE = 0x8B1F;
767
768 std::vector<std::string> ZlibCodec::validPrefixes() const {
769   if (type() == CodecType::ZLIB) {
770     // Zlib streams start with a 2 byte header.
771     //
772     //   0   1
773     // +---+---+
774     // |CMF|FLG|
775     // +---+---+
776     //
777     // We won't restrict the values of any sub-fields except as described below.
778     //
779     // The lowest 4 bits of CMF is the compression method (CM).
780     // CM == 0x8 is the deflate compression method, which is currently the only
781     // supported compression method, so any valid prefix must have CM == 0x8.
782     //
783     // The lowest 5 bits of FLG is FCHECK.
784     // FCHECK must be such that the two header bytes are a multiple of 31 when
785     // interpreted as a big endian 16-bit number.
786     std::vector<std::string> result;
787     // 16 values for the first byte, 8 values for the second byte.
788     // There are also 4 combinations where both 0x00 and 0x1F work as FCHECK.
789     result.reserve(132);
790     // Select all values for the CMF byte that use the deflate algorithm 0x8.
791     for (uint32_t first = 0x0800; first <= 0xF800; first += 0x1000) {
792       // Select all values for the FLG, but leave FCHECK as 0 since it's fixed.
793       for (uint32_t second = 0x00; second <= 0xE0; second += 0x20) {
794         uint16_t prefix = first | second;
795         // Compute FCHECK.
796         prefix += 31 - (prefix % 31);
797         result.push_back(prefixToStringLE(Endian::big(prefix)));
798         // zlib won't produce this, but it is a valid prefix.
799         if ((prefix & 0x1F) == 31) {
800           prefix -= 31;
801           result.push_back(prefixToStringLE(Endian::big(prefix)));
802         }
803       }
804     }
805     return result;
806   } else {
807     // The gzip frame starts with 2 magic bytes.
808     return {prefixToStringLE(kGZIPMagicLE)};
809   }
810 }
811
812 bool ZlibCodec::canUncompress(const IOBuf* data, uint64_t) const {
813   if (type() == CodecType::ZLIB) {
814     uint16_t value;
815     Cursor cursor{data};
816     if (!cursor.tryReadBE(value)) {
817       return false;
818     }
819     // zlib compressed if using deflate and is a multiple of 31.
820     return (value & 0x0F00) == 0x0800 && value % 31 == 0;
821   } else {
822     return dataStartsWithLE(data, kGZIPMagicLE);
823   }
824 }
825
826 std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
827   return make_unique<ZlibCodec>(level, type);
828 }
829
830 ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
831   DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP);
832   switch (level) {
833   case COMPRESSION_LEVEL_FASTEST:
834     level = 1;
835     break;
836   case COMPRESSION_LEVEL_DEFAULT:
837     level = Z_DEFAULT_COMPRESSION;
838     break;
839   case COMPRESSION_LEVEL_BEST:
840     level = 9;
841     break;
842   }
843   if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
844     throw std::invalid_argument(to<std::string>(
845         "ZlibCodec: invalid level: ", level));
846   }
847   level_ = level;
848 }
849
850 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
851                                                   uint32_t length) {
852   CHECK_EQ(stream->avail_out, 0);
853
854   auto buf = IOBuf::create(length);
855   buf->append(buf->capacity());
856
857   stream->next_out = buf->writableData();
858   stream->avail_out = buf->length();
859
860   return buf;
861 }
862
863 bool ZlibCodec::doInflate(z_stream* stream,
864                           IOBuf* head,
865                           uint32_t bufferLength) {
866   if (stream->avail_out == 0) {
867     head->prependChain(addOutputBuffer(stream, bufferLength));
868   }
869
870   int rc = inflate(stream, Z_NO_FLUSH);
871
872   switch (rc) {
873   case Z_OK:
874     break;
875   case Z_STREAM_END:
876     return true;
877   case Z_BUF_ERROR:
878   case Z_NEED_DICT:
879   case Z_DATA_ERROR:
880   case Z_MEM_ERROR:
881     throw std::runtime_error(to<std::string>(
882         "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
883   default:
884     CHECK(false) << rc << ": " << stream->msg;
885   }
886
887   return false;
888 }
889
890 std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
891   z_stream stream;
892   stream.zalloc = nullptr;
893   stream.zfree = nullptr;
894   stream.opaque = nullptr;
895
896   // Using deflateInit2() to support gzip.  "The windowBits parameter is the
897   // base two logarithm of the maximum window size (...) The default value is
898   // 15 (...) Add 16 to windowBits to write a simple gzip header and trailer
899   // around the compressed data instead of a zlib wrapper. The gzip header
900   // will have no file name, no extra data, no comment, no modification time
901   // (set to zero), no header crc, and the operating system will be set to 255
902   // (unknown)."
903   int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
904   // All other parameters (method, memLevel, strategy) get default values from
905   // the zlib manual.
906   int rc = deflateInit2(&stream,
907                         level_,
908                         Z_DEFLATED,
909                         windowBits,
910                         /* memLevel */ 8,
911                         Z_DEFAULT_STRATEGY);
912   if (rc != Z_OK) {
913     throw std::runtime_error(to<std::string>(
914         "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
915   }
916
917   stream.next_in = stream.next_out = nullptr;
918   stream.avail_in = stream.avail_out = 0;
919   stream.total_in = stream.total_out = 0;
920
921   bool success = false;
922
923   SCOPE_EXIT {
924     rc = deflateEnd(&stream);
925     // If we're here because of an exception, it's okay if some data
926     // got dropped.
927     CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
928       << rc << ": " << stream.msg;
929   };
930
931   uint64_t uncompressedLength = data->computeChainDataLength();
932   uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
933
934   // Max 64MiB in one go
935   constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
936   constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
937
938   auto out = addOutputBuffer(
939       &stream,
940       (maxCompressedLength <= maxSingleStepLength ?
941        maxCompressedLength :
942        defaultBufferLength));
943
944   for (auto& range : *data) {
945     uint64_t remaining = range.size();
946     uint64_t written = 0;
947     while (remaining) {
948       uint32_t step = (remaining > maxSingleStepLength ?
949                        maxSingleStepLength : remaining);
950       stream.next_in = const_cast<uint8_t*>(range.data() + written);
951       stream.avail_in = step;
952       remaining -= step;
953       written += step;
954
955       while (stream.avail_in != 0) {
956         if (stream.avail_out == 0) {
957           out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
958         }
959
960         rc = deflate(&stream, Z_NO_FLUSH);
961
962         CHECK_EQ(rc, Z_OK) << stream.msg;
963       }
964     }
965   }
966
967   do {
968     if (stream.avail_out == 0) {
969       out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
970     }
971
972     rc = deflate(&stream, Z_FINISH);
973   } while (rc == Z_OK);
974
975   CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
976
977   out->prev()->trimEnd(stream.avail_out);
978
979   success = true;  // we survived
980
981   return out;
982 }
983
984 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
985                                                uint64_t uncompressedLength) {
986   z_stream stream;
987   stream.zalloc = nullptr;
988   stream.zfree = nullptr;
989   stream.opaque = nullptr;
990
991   // "The windowBits parameter is the base two logarithm of the maximum window
992   // size (...) The default value is 15 (...) add 16 to decode only the gzip
993   // format (the zlib format will return a Z_DATA_ERROR)."
994   int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
995   int rc = inflateInit2(&stream, windowBits);
996   if (rc != Z_OK) {
997     throw std::runtime_error(to<std::string>(
998         "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
999   }
1000
1001   stream.next_in = stream.next_out = nullptr;
1002   stream.avail_in = stream.avail_out = 0;
1003   stream.total_in = stream.total_out = 0;
1004
1005   bool success = false;
1006
1007   SCOPE_EXIT {
1008     rc = inflateEnd(&stream);
1009     // If we're here because of an exception, it's okay if some data
1010     // got dropped.
1011     CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
1012       << rc << ": " << stream.msg;
1013   };
1014
1015   // Max 64MiB in one go
1016   constexpr uint64_t maxSingleStepLength = uint64_t(64) << 20; // 64MiB
1017   constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB
1018   const uint64_t defaultBufferLength =
1019       computeBufferLength(data->computeChainDataLength(), kBlockSize);
1020
1021   auto out = addOutputBuffer(
1022       &stream,
1023       ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1024         uncompressedLength <= maxSingleStepLength) ?
1025        uncompressedLength :
1026        defaultBufferLength));
1027
1028   bool streamEnd = false;
1029   for (auto& range : *data) {
1030     if (range.empty()) {
1031       continue;
1032     }
1033
1034     stream.next_in = const_cast<uint8_t*>(range.data());
1035     stream.avail_in = range.size();
1036
1037     while (stream.avail_in != 0) {
1038       if (streamEnd) {
1039         throw std::runtime_error(to<std::string>(
1040             "ZlibCodec: junk after end of data"));
1041       }
1042
1043       streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1044     }
1045   }
1046
1047   while (!streamEnd) {
1048     streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1049   }
1050
1051   out->prev()->trimEnd(stream.avail_out);
1052
1053   if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1054       uncompressedLength != stream.total_out) {
1055     throw std::runtime_error(to<std::string>(
1056         "ZlibCodec: invalid uncompressed length"));
1057   }
1058
1059   success = true;  // we survived
1060
1061   return out;
1062 }
1063
1064 #endif  // FOLLY_HAVE_LIBZ
1065
1066 #if FOLLY_HAVE_LIBLZMA
1067
1068 /**
1069  * LZMA2 compression
1070  */
1071 class LZMA2Codec final : public Codec {
1072  public:
1073   static std::unique_ptr<Codec> create(int level, CodecType type);
1074   explicit LZMA2Codec(int level, CodecType type);
1075
1076   std::vector<std::string> validPrefixes() const override;
1077   bool canUncompress(const IOBuf* data, uint64_t uncompressedLength)
1078       const override;
1079
1080  private:
1081   bool doNeedsUncompressedLength() const override;
1082   uint64_t doMaxUncompressedLength() const override;
1083
1084   bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
1085
1086   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1087   std::unique_ptr<IOBuf> doUncompress(
1088       const IOBuf* data,
1089       uint64_t uncompressedLength) override;
1090
1091   std::unique_ptr<IOBuf> addOutputBuffer(lzma_stream* stream, size_t length);
1092   bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength);
1093
1094   int level_;
1095 };
1096
1097 static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD;
1098 static constexpr unsigned kLZMA2MagicBytes = 6;
1099
1100 std::vector<std::string> LZMA2Codec::validPrefixes() const {
1101   if (type() == CodecType::LZMA2_VARINT_SIZE) {
1102     return {};
1103   }
1104   return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)};
1105 }
1106
1107 bool LZMA2Codec::canUncompress(const IOBuf* data, uint64_t) const {
1108   if (type() == CodecType::LZMA2_VARINT_SIZE) {
1109     return false;
1110   }
1111   // Returns false for all inputs less than 8 bytes.
1112   // This is okay, because no valid LZMA2 streams are less than 8 bytes.
1113   return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes);
1114 }
1115
1116 std::unique_ptr<Codec> LZMA2Codec::create(int level, CodecType type) {
1117   return make_unique<LZMA2Codec>(level, type);
1118 }
1119
1120 LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) {
1121   DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
1122   switch (level) {
1123   case COMPRESSION_LEVEL_FASTEST:
1124     level = 0;
1125     break;
1126   case COMPRESSION_LEVEL_DEFAULT:
1127     level = LZMA_PRESET_DEFAULT;
1128     break;
1129   case COMPRESSION_LEVEL_BEST:
1130     level = 9;
1131     break;
1132   }
1133   if (level < 0 || level > 9) {
1134     throw std::invalid_argument(to<std::string>(
1135         "LZMA2Codec: invalid level: ", level));
1136   }
1137   level_ = level;
1138 }
1139
1140 bool LZMA2Codec::doNeedsUncompressedLength() const {
1141   return false;
1142 }
1143
1144 uint64_t LZMA2Codec::doMaxUncompressedLength() const {
1145   // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
1146   return uint64_t(1) << 63;
1147 }
1148
1149 std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
1150     lzma_stream* stream,
1151     size_t length) {
1152
1153   CHECK_EQ(stream->avail_out, 0);
1154
1155   auto buf = IOBuf::create(length);
1156   buf->append(buf->capacity());
1157
1158   stream->next_out = buf->writableData();
1159   stream->avail_out = buf->length();
1160
1161   return buf;
1162 }
1163
1164 std::unique_ptr<IOBuf> LZMA2Codec::doCompress(const IOBuf* data) {
1165   lzma_ret rc;
1166   lzma_stream stream = LZMA_STREAM_INIT;
1167
1168   rc = lzma_easy_encoder(&stream, level_, LZMA_CHECK_NONE);
1169   if (rc != LZMA_OK) {
1170     throw std::runtime_error(folly::to<std::string>(
1171       "LZMA2Codec: lzma_easy_encoder error: ", rc));
1172   }
1173
1174   SCOPE_EXIT { lzma_end(&stream); };
1175
1176   uint64_t uncompressedLength = data->computeChainDataLength();
1177   uint64_t maxCompressedLength = lzma_stream_buffer_bound(uncompressedLength);
1178
1179   // Max 64MiB in one go
1180   constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
1181   constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
1182
1183   auto out = addOutputBuffer(
1184     &stream,
1185     (maxCompressedLength <= maxSingleStepLength ?
1186      maxCompressedLength :
1187      defaultBufferLength));
1188
1189   if (encodeSize()) {
1190     auto size = IOBuf::createCombined(kMaxVarintLength64);
1191     encodeVarintToIOBuf(uncompressedLength, size.get());
1192     size->appendChain(std::move(out));
1193     out = std::move(size);
1194   }
1195
1196   for (auto& range : *data) {
1197     if (range.empty()) {
1198       continue;
1199     }
1200
1201     stream.next_in = const_cast<uint8_t*>(range.data());
1202     stream.avail_in = range.size();
1203
1204     while (stream.avail_in != 0) {
1205       if (stream.avail_out == 0) {
1206         out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1207       }
1208
1209       rc = lzma_code(&stream, LZMA_RUN);
1210
1211       if (rc != LZMA_OK) {
1212         throw std::runtime_error(folly::to<std::string>(
1213           "LZMA2Codec: lzma_code error: ", rc));
1214       }
1215     }
1216   }
1217
1218   do {
1219     if (stream.avail_out == 0) {
1220       out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1221     }
1222
1223     rc = lzma_code(&stream, LZMA_FINISH);
1224   } while (rc == LZMA_OK);
1225
1226   if (rc != LZMA_STREAM_END) {
1227     throw std::runtime_error(folly::to<std::string>(
1228       "LZMA2Codec: lzma_code ended with error: ", rc));
1229   }
1230
1231   out->prev()->trimEnd(stream.avail_out);
1232
1233   return out;
1234 }
1235
1236 bool LZMA2Codec::doInflate(lzma_stream* stream,
1237                           IOBuf* head,
1238                           size_t bufferLength) {
1239   if (stream->avail_out == 0) {
1240     head->prependChain(addOutputBuffer(stream, bufferLength));
1241   }
1242
1243   lzma_ret rc = lzma_code(stream, LZMA_RUN);
1244
1245   switch (rc) {
1246   case LZMA_OK:
1247     break;
1248   case LZMA_STREAM_END:
1249     return true;
1250   default:
1251     throw std::runtime_error(to<std::string>(
1252         "LZMA2Codec: lzma_code error: ", rc));
1253   }
1254
1255   return false;
1256 }
1257
1258 std::unique_ptr<IOBuf> LZMA2Codec::doUncompress(const IOBuf* data,
1259                                                uint64_t uncompressedLength) {
1260   lzma_ret rc;
1261   lzma_stream stream = LZMA_STREAM_INIT;
1262
1263   rc = lzma_auto_decoder(&stream, std::numeric_limits<uint64_t>::max(), 0);
1264   if (rc != LZMA_OK) {
1265     throw std::runtime_error(folly::to<std::string>(
1266       "LZMA2Codec: lzma_auto_decoder error: ", rc));
1267   }
1268
1269   SCOPE_EXIT { lzma_end(&stream); };
1270
1271   // Max 64MiB in one go
1272   constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
1273   constexpr uint32_t defaultBufferLength = uint32_t(256) << 10; // 256 KiB
1274
1275   folly::io::Cursor cursor(data);
1276   if (encodeSize()) {
1277     const uint64_t actualUncompressedLength = decodeVarintFromCursor(cursor);
1278     if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1279         uncompressedLength != actualUncompressedLength) {
1280       throw std::runtime_error("LZMA2Codec: invalid uncompressed length");
1281     }
1282     uncompressedLength = actualUncompressedLength;
1283   }
1284
1285   auto out = addOutputBuffer(
1286       &stream,
1287       ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1288         uncompressedLength <= maxSingleStepLength)
1289            ? uncompressedLength
1290            : defaultBufferLength));
1291
1292   bool streamEnd = false;
1293   auto buf = cursor.peekBytes();
1294   while (!buf.empty()) {
1295     stream.next_in = const_cast<uint8_t*>(buf.data());
1296     stream.avail_in = buf.size();
1297
1298     while (stream.avail_in != 0) {
1299       if (streamEnd) {
1300         throw std::runtime_error(to<std::string>(
1301             "LZMA2Codec: junk after end of data"));
1302       }
1303
1304       streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1305     }
1306
1307     cursor.skip(buf.size());
1308     buf = cursor.peekBytes();
1309   }
1310
1311   while (!streamEnd) {
1312     streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1313   }
1314
1315   out->prev()->trimEnd(stream.avail_out);
1316
1317   if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1318       uncompressedLength != stream.total_out) {
1319     throw std::runtime_error(
1320         to<std::string>("LZMA2Codec: invalid uncompressed length"));
1321   }
1322
1323   return out;
1324 }
1325
1326 #endif  // FOLLY_HAVE_LIBLZMA
1327
1328 #ifdef FOLLY_HAVE_LIBZSTD
1329
1330 /**
1331  * ZSTD compression
1332  */
1333 class ZSTDCodec final : public Codec {
1334  public:
1335   static std::unique_ptr<Codec> create(int level, CodecType);
1336   explicit ZSTDCodec(int level, CodecType type);
1337
1338   std::vector<std::string> validPrefixes() const override;
1339   bool canUncompress(const IOBuf* data, uint64_t uncompressedLength)
1340       const override;
1341
1342  private:
1343   bool doNeedsUncompressedLength() const override;
1344   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1345   std::unique_ptr<IOBuf> doUncompress(
1346       const IOBuf* data,
1347       uint64_t uncompressedLength) override;
1348
1349   int level_;
1350 };
1351
1352 static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528;
1353
1354 std::vector<std::string> ZSTDCodec::validPrefixes() const {
1355   return {prefixToStringLE(kZSTDMagicLE)};
1356 }
1357
1358 bool ZSTDCodec::canUncompress(const IOBuf* data, uint64_t) const {
1359   return dataStartsWithLE(data, kZSTDMagicLE);
1360 }
1361
1362 std::unique_ptr<Codec> ZSTDCodec::create(int level, CodecType type) {
1363   return make_unique<ZSTDCodec>(level, type);
1364 }
1365
1366 ZSTDCodec::ZSTDCodec(int level, CodecType type) : Codec(type) {
1367   DCHECK(type == CodecType::ZSTD);
1368   switch (level) {
1369     case COMPRESSION_LEVEL_FASTEST:
1370       level = 1;
1371       break;
1372     case COMPRESSION_LEVEL_DEFAULT:
1373       level = 1;
1374       break;
1375     case COMPRESSION_LEVEL_BEST:
1376       level = 19;
1377       break;
1378   }
1379   if (level < 1 || level > ZSTD_maxCLevel()) {
1380     throw std::invalid_argument(
1381         to<std::string>("ZSTD: invalid level: ", level));
1382   }
1383   level_ = level;
1384 }
1385
1386 bool ZSTDCodec::doNeedsUncompressedLength() const {
1387   return false;
1388 }
1389
1390 void zstdThrowIfError(size_t rc) {
1391   if (!ZSTD_isError(rc)) {
1392     return;
1393   }
1394   throw std::runtime_error(
1395       to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1396 }
1397
1398 std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
1399   // Support earlier versions of the codec (working with a single IOBuf,
1400   // and using ZSTD_decompress which requires ZSTD frame to contain size,
1401   // which isn't populated by streaming API).
1402   if (!data->isChained()) {
1403     auto out = IOBuf::createCombined(ZSTD_compressBound(data->length()));
1404     const auto rc = ZSTD_compress(
1405         out->writableData(),
1406         out->capacity(),
1407         data->data(),
1408         data->length(),
1409         level_);
1410     zstdThrowIfError(rc);
1411     out->append(rc);
1412     return out;
1413   }
1414
1415   auto zcs = ZSTD_createCStream();
1416   SCOPE_EXIT {
1417     ZSTD_freeCStream(zcs);
1418   };
1419
1420   auto rc = ZSTD_initCStream(zcs, level_);
1421   zstdThrowIfError(rc);
1422
1423   Cursor cursor(data);
1424   auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength()));
1425
1426   ZSTD_outBuffer out;
1427   out.dst = result->writableTail();
1428   out.size = result->capacity();
1429   out.pos = 0;
1430
1431   for (auto buffer = cursor.peekBytes(); !buffer.empty();) {
1432     ZSTD_inBuffer in;
1433     in.src = buffer.data();
1434     in.size = buffer.size();
1435     for (in.pos = 0; in.pos != in.size;) {
1436       rc = ZSTD_compressStream(zcs, &out, &in);
1437       zstdThrowIfError(rc);
1438     }
1439     cursor.skip(in.size);
1440     buffer = cursor.peekBytes();
1441   }
1442
1443   rc = ZSTD_endStream(zcs, &out);
1444   zstdThrowIfError(rc);
1445   CHECK_EQ(rc, 0);
1446
1447   result->append(out.pos);
1448   return result;
1449 }
1450
1451 static std::unique_ptr<IOBuf> zstdUncompressBuffer(
1452     const IOBuf* data,
1453     uint64_t uncompressedLength) {
1454   // Check preconditions
1455   DCHECK(!data->isChained());
1456   DCHECK(uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH);
1457
1458   auto uncompressed = IOBuf::create(uncompressedLength);
1459   const auto decompressedSize = ZSTD_decompress(
1460       uncompressed->writableTail(),
1461       uncompressed->tailroom(),
1462       data->data(),
1463       data->length());
1464   zstdThrowIfError(decompressedSize);
1465   if (decompressedSize != uncompressedLength) {
1466     throw std::runtime_error("ZSTD: invalid uncompressed length");
1467   }
1468   uncompressed->append(decompressedSize);
1469   return uncompressed;
1470 }
1471
1472 static std::unique_ptr<IOBuf> zstdUncompressStream(
1473     const IOBuf* data,
1474     uint64_t uncompressedLength) {
1475   auto zds = ZSTD_createDStream();
1476   SCOPE_EXIT {
1477     ZSTD_freeDStream(zds);
1478   };
1479
1480   auto rc = ZSTD_initDStream(zds);
1481   zstdThrowIfError(rc);
1482
1483   ZSTD_outBuffer out{};
1484   ZSTD_inBuffer in{};
1485
1486   auto outputSize = ZSTD_DStreamOutSize();
1487   if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH) {
1488     outputSize = uncompressedLength;
1489   }
1490
1491   IOBufQueue queue(IOBufQueue::cacheChainLength());
1492
1493   Cursor cursor(data);
1494   for (rc = 0;;) {
1495     if (in.pos == in.size) {
1496       auto buffer = cursor.peekBytes();
1497       in.src = buffer.data();
1498       in.size = buffer.size();
1499       in.pos = 0;
1500       cursor.skip(in.size);
1501       if (rc > 1 && in.size == 0) {
1502         throw std::runtime_error(to<std::string>("ZSTD: incomplete input"));
1503       }
1504     }
1505     if (out.pos == out.size) {
1506       if (out.pos != 0) {
1507         queue.postallocate(out.pos);
1508       }
1509       auto buffer = queue.preallocate(outputSize, outputSize);
1510       out.dst = buffer.first;
1511       out.size = buffer.second;
1512       out.pos = 0;
1513       outputSize = ZSTD_DStreamOutSize();
1514     }
1515     rc = ZSTD_decompressStream(zds, &out, &in);
1516     zstdThrowIfError(rc);
1517     if (rc == 0) {
1518       break;
1519     }
1520   }
1521   if (out.pos != 0) {
1522     queue.postallocate(out.pos);
1523   }
1524   if (in.pos != in.size || !cursor.isAtEnd()) {
1525     throw std::runtime_error("ZSTD: junk after end of data");
1526   }
1527   if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH &&
1528       queue.chainLength() != uncompressedLength) {
1529     throw std::runtime_error("ZSTD: invalid uncompressed length");
1530   }
1531
1532   return queue.move();
1533 }
1534
1535 std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(
1536     const IOBuf* data,
1537     uint64_t uncompressedLength) {
1538   {
1539     // Read decompressed size from frame if available in first IOBuf.
1540     const auto decompressedSize =
1541         ZSTD_getDecompressedSize(data->data(), data->length());
1542     if (decompressedSize != 0) {
1543       if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH &&
1544           uncompressedLength != decompressedSize) {
1545         throw std::runtime_error("ZSTD: invalid uncompressed length");
1546       }
1547       uncompressedLength = decompressedSize;
1548     }
1549   }
1550   // Faster to decompress using ZSTD_decompress() if we can.
1551   if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && !data->isChained()) {
1552     return zstdUncompressBuffer(data, uncompressedLength);
1553   }
1554   // Fall back to slower streaming decompression.
1555   return zstdUncompressStream(data, uncompressedLength);
1556 }
1557
1558 #endif  // FOLLY_HAVE_LIBZSTD
1559
1560 #if FOLLY_HAVE_LIBBZ2
1561
1562 class Bzip2Codec final : public Codec {
1563  public:
1564   static std::unique_ptr<Codec> create(int level, CodecType type);
1565   explicit Bzip2Codec(int level, CodecType type);
1566
1567   std::vector<std::string> validPrefixes() const override;
1568   bool canUncompress(IOBuf const* data, uint64_t uncompressedLength)
1569       const override;
1570
1571  private:
1572   std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
1573   std::unique_ptr<IOBuf> doUncompress(
1574       IOBuf const* data,
1575       uint64_t uncompressedLength) override;
1576
1577   int level_;
1578 };
1579
1580 /* static */ std::unique_ptr<Codec> Bzip2Codec::create(
1581     int level,
1582     CodecType type) {
1583   return make_unique<Bzip2Codec>(level, type);
1584 }
1585
1586 Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
1587   DCHECK(type == CodecType::BZIP2);
1588   switch (level) {
1589     case COMPRESSION_LEVEL_FASTEST:
1590       level = 1;
1591       break;
1592     case COMPRESSION_LEVEL_DEFAULT:
1593       level = 9;
1594       break;
1595     case COMPRESSION_LEVEL_BEST:
1596       level = 9;
1597       break;
1598   }
1599   if (level < 1 || level > 9) {
1600     throw std::invalid_argument(
1601         to<std::string>("Bzip2: invalid level: ", level));
1602   }
1603   level_ = level;
1604 }
1605
1606 static uint32_t constexpr kBzip2MagicLE = 0x685a42;
1607 static uint64_t constexpr kBzip2MagicBytes = 3;
1608
1609 std::vector<std::string> Bzip2Codec::validPrefixes() const {
1610   return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
1611 }
1612
1613 bool Bzip2Codec::canUncompress(IOBuf const* data, uint64_t) const {
1614   return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
1615 }
1616
1617 static bz_stream createBzStream() {
1618   bz_stream stream;
1619   stream.bzalloc = nullptr;
1620   stream.bzfree = nullptr;
1621   stream.opaque = nullptr;
1622   stream.next_in = stream.next_out = nullptr;
1623   stream.avail_in = stream.avail_out = 0;
1624   return stream;
1625 }
1626
1627 // Throws on error condition, otherwise returns the code.
1628 static int bzCheck(int const rc) {
1629   switch (rc) {
1630     case BZ_OK:
1631     case BZ_RUN_OK:
1632     case BZ_FLUSH_OK:
1633     case BZ_FINISH_OK:
1634     case BZ_STREAM_END:
1635       return rc;
1636     default:
1637       throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
1638   }
1639 }
1640
1641 static uint64_t bzCompressBound(uint64_t const uncompressedLength) {
1642   // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
1643   //   To guarantee that the compressed data will fit in its buffer, allocate an
1644   //   output buffer of size 1% larger than the uncompressed data, plus six
1645   //   hundred extra bytes.
1646   return uncompressedLength + uncompressedLength / 100 + 600;
1647 }
1648
1649 static std::unique_ptr<IOBuf> addOutputBuffer(
1650     bz_stream* stream,
1651     uint64_t const bufferLength) {
1652   DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
1653   DCHECK_EQ(stream->avail_out, 0);
1654
1655   auto buf = IOBuf::create(bufferLength);
1656   buf->append(buf->capacity());
1657
1658   stream->next_out = reinterpret_cast<char*>(buf->writableData());
1659   stream->avail_out = buf->length();
1660
1661   return buf;
1662 }
1663
1664 std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
1665   bz_stream stream = createBzStream();
1666   bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
1667   SCOPE_EXIT {
1668     bzCheck(BZ2_bzCompressEnd(&stream));
1669   };
1670
1671   uint64_t const uncompressedLength = data->computeChainDataLength();
1672   uint64_t const maxCompressedLength = bzCompressBound(uncompressedLength);
1673   uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1674   uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
1675
1676   auto out = addOutputBuffer(
1677       &stream,
1678       maxCompressedLength <= kMaxSingleStepLength ? maxCompressedLength
1679                                                   : kDefaultBufferLength);
1680
1681   for (auto range : *data) {
1682     while (!range.empty()) {
1683       auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1684       stream.next_in =
1685           const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1686       stream.avail_in = inSize;
1687
1688       if (stream.avail_out == 0) {
1689         out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1690       }
1691
1692       bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
1693       range.uncheckedAdvance(inSize - stream.avail_in);
1694     }
1695   }
1696   do {
1697     if (stream.avail_out == 0) {
1698       out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1699     }
1700   } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
1701
1702   out->prev()->trimEnd(stream.avail_out);
1703
1704   return out;
1705 }
1706
1707 std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
1708     const IOBuf* data,
1709     uint64_t uncompressedLength) {
1710   bz_stream stream = createBzStream();
1711   bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
1712   SCOPE_EXIT {
1713     bzCheck(BZ2_bzDecompressEnd(&stream));
1714   };
1715
1716   uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1717   uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
1718   uint64_t const kDefaultBufferLength =
1719       computeBufferLength(data->computeChainDataLength(), kBlockSize);
1720
1721   auto out = addOutputBuffer(
1722       &stream,
1723       ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1724         uncompressedLength <= kMaxSingleStepLength)
1725            ? uncompressedLength
1726            : kDefaultBufferLength));
1727
1728   int rc = BZ_OK;
1729   for (auto range : *data) {
1730     while (!range.empty()) {
1731       auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1732       stream.next_in =
1733           const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1734       stream.avail_in = inSize;
1735
1736       if (stream.avail_out == 0) {
1737         out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1738       }
1739
1740       rc = bzCheck(BZ2_bzDecompress(&stream));
1741       range.uncheckedAdvance(inSize - stream.avail_in);
1742     }
1743   }
1744   while (rc != BZ_STREAM_END) {
1745     if (stream.avail_out == 0) {
1746       out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1747     }
1748
1749     rc = bzCheck(BZ2_bzDecompress(&stream));
1750   }
1751
1752   out->prev()->trimEnd(stream.avail_out);
1753
1754   uint64_t const totalOut =
1755       (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
1756   if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1757       uncompressedLength != totalOut) {
1758     throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
1759   }
1760
1761   return out;
1762 }
1763
1764 #endif // FOLLY_HAVE_LIBBZ2
1765
1766 /**
1767  * Automatic decompression
1768  */
1769 class AutomaticCodec final : public Codec {
1770  public:
1771   static std::unique_ptr<Codec> create(
1772       std::vector<std::unique_ptr<Codec>> customCodecs);
1773   explicit AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs);
1774
1775   std::vector<std::string> validPrefixes() const override;
1776   bool canUncompress(const IOBuf* data, uint64_t uncompressedLength)
1777       const override;
1778
1779  private:
1780   bool doNeedsUncompressedLength() const override;
1781   uint64_t doMaxUncompressedLength() const override;
1782
1783   std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
1784     throw std::runtime_error("AutomaticCodec error: compress() not supported.");
1785   }
1786   std::unique_ptr<IOBuf> doUncompress(
1787       const IOBuf* data,
1788       uint64_t uncompressedLength) override;
1789
1790   void addCodecIfSupported(CodecType type);
1791
1792   // Throws iff the codecs aren't compatible (very slow)
1793   void checkCompatibleCodecs() const;
1794
1795   std::vector<std::unique_ptr<Codec>> codecs_;
1796   bool needsUncompressedLength_;
1797   uint64_t maxUncompressedLength_;
1798 };
1799
1800 std::vector<std::string> AutomaticCodec::validPrefixes() const {
1801   std::unordered_set<std::string> prefixes;
1802   for (const auto& codec : codecs_) {
1803     const auto codecPrefixes = codec->validPrefixes();
1804     prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
1805   }
1806   return std::vector<std::string>{prefixes.begin(), prefixes.end()};
1807 }
1808
1809 bool AutomaticCodec::canUncompress(
1810     const IOBuf* data,
1811     uint64_t uncompressedLength) const {
1812   return std::any_of(
1813       codecs_.begin(),
1814       codecs_.end(),
1815       [data, uncompressedLength](std::unique_ptr<Codec> const& codec) {
1816         return codec->canUncompress(data, uncompressedLength);
1817       });
1818 }
1819
1820 void AutomaticCodec::addCodecIfSupported(CodecType type) {
1821   const bool present = std::any_of(
1822       codecs_.begin(),
1823       codecs_.end(),
1824       [&type](std::unique_ptr<Codec> const& codec) {
1825         return codec->type() == type;
1826       });
1827   if (hasCodec(type) && !present) {
1828     codecs_.push_back(getCodec(type));
1829   }
1830 }
1831
1832 /* static */ std::unique_ptr<Codec> AutomaticCodec::create(
1833     std::vector<std::unique_ptr<Codec>> customCodecs) {
1834   return make_unique<AutomaticCodec>(std::move(customCodecs));
1835 }
1836
1837 AutomaticCodec::AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs)
1838     : Codec(CodecType::USER_DEFINED), codecs_(std::move(customCodecs)) {
1839   // Fastest -> slowest
1840   addCodecIfSupported(CodecType::LZ4_FRAME);
1841   addCodecIfSupported(CodecType::ZSTD);
1842   addCodecIfSupported(CodecType::ZLIB);
1843   addCodecIfSupported(CodecType::GZIP);
1844   addCodecIfSupported(CodecType::LZMA2);
1845   addCodecIfSupported(CodecType::BZIP2);
1846   if (kIsDebug) {
1847     checkCompatibleCodecs();
1848   }
1849   // Check that none of the codes are are null
1850   DCHECK(std::none_of(
1851       codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1852         return codec == nullptr;
1853       }));
1854
1855   needsUncompressedLength_ = std::any_of(
1856       codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1857         return codec->needsUncompressedLength();
1858       });
1859
1860   const auto it = std::max_element(
1861       codecs_.begin(),
1862       codecs_.end(),
1863       [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) {
1864         return lhs->maxUncompressedLength() < rhs->maxUncompressedLength();
1865       });
1866   DCHECK(it != codecs_.end());
1867   maxUncompressedLength_ = (*it)->maxUncompressedLength();
1868 }
1869
1870 void AutomaticCodec::checkCompatibleCodecs() const {
1871   // Keep track of all the possible headers.
1872   std::unordered_set<std::string> headers;
1873   // The empty header is not allowed.
1874   headers.insert("");
1875   // Step 1:
1876   // Construct a set of headers and check that none of the headers occur twice.
1877   // Eliminate edge cases.
1878   for (auto&& codec : codecs_) {
1879     const auto codecHeaders = codec->validPrefixes();
1880     // Codecs without any valid headers are not allowed.
1881     if (codecHeaders.empty()) {
1882       throw std::invalid_argument{
1883           "AutomaticCodec: validPrefixes() must not be empty."};
1884     }
1885     // Insert all the headers for the current codec.
1886     const size_t beforeSize = headers.size();
1887     headers.insert(codecHeaders.begin(), codecHeaders.end());
1888     // Codecs are not compatible if any header occurred twice.
1889     if (beforeSize + codecHeaders.size() != headers.size()) {
1890       throw std::invalid_argument{
1891           "AutomaticCodec: Two valid prefixes collide."};
1892     }
1893   }
1894   // Step 2:
1895   // Check if any strict non-empty prefix of any header is a header.
1896   for (const auto& header : headers) {
1897     for (size_t i = 1; i < header.size(); ++i) {
1898       if (headers.count(header.substr(0, i))) {
1899         throw std::invalid_argument{
1900             "AutomaticCodec: One valid prefix is a prefix of another valid "
1901             "prefix."};
1902       }
1903     }
1904   }
1905 }
1906
1907 bool AutomaticCodec::doNeedsUncompressedLength() const {
1908   return needsUncompressedLength_;
1909 }
1910
1911 uint64_t AutomaticCodec::doMaxUncompressedLength() const {
1912   return maxUncompressedLength_;
1913 }
1914
1915 std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
1916     const IOBuf* data,
1917     uint64_t uncompressedLength) {
1918   for (auto&& codec : codecs_) {
1919     if (codec->canUncompress(data, uncompressedLength)) {
1920       return codec->uncompress(data, uncompressedLength);
1921     }
1922   }
1923   throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
1924 }
1925
1926 }  // namespace
1927
1928 typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
1929 static constexpr CodecFactory
1930     codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
1931         nullptr, // USER_DEFINED
1932         NoCompressionCodec::create,
1933
1934 #if FOLLY_HAVE_LIBLZ4
1935         LZ4Codec::create,
1936 #else
1937         nullptr,
1938 #endif
1939
1940 #if FOLLY_HAVE_LIBSNAPPY
1941         SnappyCodec::create,
1942 #else
1943         nullptr,
1944 #endif
1945
1946 #if FOLLY_HAVE_LIBZ
1947         ZlibCodec::create,
1948 #else
1949         nullptr,
1950 #endif
1951
1952 #if FOLLY_HAVE_LIBLZ4
1953         LZ4Codec::create,
1954 #else
1955         nullptr,
1956 #endif
1957
1958 #if FOLLY_HAVE_LIBLZMA
1959         LZMA2Codec::create,
1960         LZMA2Codec::create,
1961 #else
1962         nullptr,
1963         nullptr,
1964 #endif
1965
1966 #if FOLLY_HAVE_LIBZSTD
1967         ZSTDCodec::create,
1968 #else
1969         nullptr,
1970 #endif
1971
1972 #if FOLLY_HAVE_LIBZ
1973         ZlibCodec::create,
1974 #else
1975         nullptr,
1976 #endif
1977
1978 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
1979         LZ4FrameCodec::create,
1980 #else
1981         nullptr,
1982 #endif
1983
1984 #if FOLLY_HAVE_LIBBZ2
1985         Bzip2Codec::create,
1986 #else
1987         nullptr
1988 #endif
1989 };
1990
1991 bool hasCodec(CodecType type) {
1992   size_t idx = static_cast<size_t>(type);
1993   if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
1994     throw std::invalid_argument(
1995         to<std::string>("Compression type ", idx, " invalid"));
1996   }
1997   return codecFactories[idx] != nullptr;
1998 }
1999
2000 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
2001   size_t idx = static_cast<size_t>(type);
2002   if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
2003     throw std::invalid_argument(
2004         to<std::string>("Compression type ", idx, " invalid"));
2005   }
2006   auto factory = codecFactories[idx];
2007   if (!factory) {
2008     throw std::invalid_argument(to<std::string>(
2009         "Compression type ", idx, " not supported"));
2010   }
2011   auto codec = (*factory)(level, type);
2012   DCHECK_EQ(static_cast<size_t>(codec->type()), idx);
2013   return codec;
2014 }
2015
2016 std::unique_ptr<Codec> getAutoUncompressionCodec(
2017     std::vector<std::unique_ptr<Codec>> customCodecs) {
2018   return AutomaticCodec::create(std::move(customCodecs));
2019 }
2020 }}  // namespaces