29796a25df0de4478293e78dcee31e627759a298
[folly.git] / folly / io / Compression.cpp
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/io/Compression.h"
18
19 #include <lz4.h>
20 #include <lz4hc.h>
21 #include <glog/logging.h>
22 #include <snappy.h>
23 #include <snappy-sinksource.h>
24 #include <zlib.h>
25 #include <lzma.h>
26
27 #include "folly/Conv.h"
28 #include "folly/Memory.h"
29 #include "folly/Portability.h"
30 #include "folly/ScopeGuard.h"
31 #include "folly/Varint.h"
32 #include "folly/io/Cursor.h"
33
34 namespace folly { namespace io {
35
36 Codec::Codec(CodecType type) : type_(type) { }
37
38 // Ensure consistent behavior in the nullptr case
39 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
40   return !data->empty() ? doCompress(data) : IOBuf::create(0);
41 }
42
43 std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
44                                          uint64_t uncompressedLength) {
45   if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
46     if (needsUncompressedLength()) {
47       throw std::invalid_argument("Codec: uncompressed length required");
48     }
49   } else if (uncompressedLength > maxUncompressedLength()) {
50     throw std::runtime_error("Codec: uncompressed length too large");
51   }
52
53   if (data->empty()) {
54     if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
55         uncompressedLength != 0) {
56       throw std::runtime_error("Codec: invalid uncompressed length");
57     }
58     return IOBuf::create(0);
59   }
60
61   return doUncompress(data, uncompressedLength);
62 }
63
64 bool Codec::needsUncompressedLength() const {
65   return doNeedsUncompressedLength();
66 }
67
68 uint64_t Codec::maxUncompressedLength() const {
69   return doMaxUncompressedLength();
70 }
71
72 bool Codec::doNeedsUncompressedLength() const {
73   return false;
74 }
75
76 uint64_t Codec::doMaxUncompressedLength() const {
77   return std::numeric_limits<uint64_t>::max() - 1;
78 }
79
80 namespace {
81
82 /**
83  * No compression
84  */
85 class NoCompressionCodec FOLLY_FINAL : public Codec {
86  public:
87   static std::unique_ptr<Codec> create(int level, CodecType type);
88   explicit NoCompressionCodec(int level, CodecType type);
89
90  private:
91   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
92   std::unique_ptr<IOBuf> doUncompress(
93       const IOBuf* data,
94       uint64_t uncompressedLength) FOLLY_OVERRIDE;
95 };
96
97 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
98   return make_unique<NoCompressionCodec>(level, type);
99 }
100
101 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
102   : Codec(type) {
103   DCHECK(type == CodecType::NO_COMPRESSION);
104   switch (level) {
105   case COMPRESSION_LEVEL_DEFAULT:
106   case COMPRESSION_LEVEL_FASTEST:
107   case COMPRESSION_LEVEL_BEST:
108     level = 0;
109   }
110   if (level != 0) {
111     throw std::invalid_argument(to<std::string>(
112         "NoCompressionCodec: invalid level ", level));
113   }
114 }
115
116 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
117     const IOBuf* data) {
118   return data->clone();
119 }
120
121 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
122     const IOBuf* data,
123     uint64_t uncompressedLength) {
124   if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
125       data->computeChainDataLength() != uncompressedLength) {
126     throw std::runtime_error(to<std::string>(
127         "NoCompressionCodec: invalid uncompressed length"));
128   }
129   return data->clone();
130 }
131
132 /**
133  * LZ4 compression
134  */
135 class LZ4Codec FOLLY_FINAL : public Codec {
136  public:
137   static std::unique_ptr<Codec> create(int level, CodecType type);
138   explicit LZ4Codec(int level, CodecType type);
139
140  private:
141   bool doNeedsUncompressedLength() const FOLLY_OVERRIDE;
142   uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
143
144   bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
145
146   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
147   std::unique_ptr<IOBuf> doUncompress(
148       const IOBuf* data,
149       uint64_t uncompressedLength) FOLLY_OVERRIDE;
150
151   bool highCompression_;
152 };
153
154 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
155   return make_unique<LZ4Codec>(level, type);
156 }
157
158 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
159   DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
160
161   switch (level) {
162   case COMPRESSION_LEVEL_FASTEST:
163   case COMPRESSION_LEVEL_DEFAULT:
164     level = 1;
165     break;
166   case COMPRESSION_LEVEL_BEST:
167     level = 2;
168     break;
169   }
170   if (level < 1 || level > 2) {
171     throw std::invalid_argument(to<std::string>(
172         "LZ4Codec: invalid level: ", level));
173   }
174   highCompression_ = (level > 1);
175 }
176
177 bool LZ4Codec::doNeedsUncompressedLength() const {
178   return !encodeSize();
179 }
180
181 uint64_t LZ4Codec::doMaxUncompressedLength() const {
182   // From lz4.h: "Max supported value is ~1.9GB"; I wish we had something
183   // more accurate.
184   return 1.8 * (uint64_t(1) << 30);
185 }
186
187 namespace {
188
189 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
190   DCHECK_GE(out->tailroom(), kMaxVarintLength64);
191   out->append(encodeVarint(val, out->writableTail()));
192 }
193
194 uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
195   // Must have enough room in *this* buffer.
196   auto p = cursor.peek();
197   folly::ByteRange range(p.first, p.second);
198   uint64_t val = decodeVarint(range);
199   cursor.skip(range.data() - p.first);
200   return val;
201 }
202
203 }  // namespace
204
205 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
206   std::unique_ptr<IOBuf> clone;
207   if (data->isChained()) {
208     // LZ4 doesn't support streaming, so we have to coalesce
209     clone = data->clone();
210     clone->coalesce();
211     data = clone.get();
212   }
213
214   uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
215   auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
216   if (encodeSize()) {
217     encodeVarintToIOBuf(data->length(), out.get());
218   }
219
220   int n;
221   if (highCompression_) {
222     n = LZ4_compressHC(reinterpret_cast<const char*>(data->data()),
223                        reinterpret_cast<char*>(out->writableTail()),
224                        data->length());
225   } else {
226     n = LZ4_compress(reinterpret_cast<const char*>(data->data()),
227                      reinterpret_cast<char*>(out->writableTail()),
228                      data->length());
229   }
230
231   CHECK_GE(n, 0);
232   CHECK_LE(n, out->capacity());
233
234   out->append(n);
235   return out;
236 }
237
238 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
239     const IOBuf* data,
240     uint64_t uncompressedLength) {
241   std::unique_ptr<IOBuf> clone;
242   if (data->isChained()) {
243     // LZ4 doesn't support streaming, so we have to coalesce
244     clone = data->clone();
245     clone->coalesce();
246     data = clone.get();
247   }
248
249   folly::io::Cursor cursor(data);
250   uint64_t actualUncompressedLength;
251   if (encodeSize()) {
252     actualUncompressedLength = decodeVarintFromCursor(cursor);
253     if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
254         uncompressedLength != actualUncompressedLength) {
255       throw std::runtime_error("LZ4Codec: invalid uncompressed length");
256     }
257   } else {
258     actualUncompressedLength = uncompressedLength;
259     DCHECK_NE(actualUncompressedLength, UNKNOWN_UNCOMPRESSED_LENGTH);
260   }
261
262   auto out = IOBuf::create(actualUncompressedLength);
263   auto p = cursor.peek();
264   int n = LZ4_uncompress(reinterpret_cast<const char*>(p.first),
265                          reinterpret_cast<char*>(out->writableTail()),
266                          actualUncompressedLength);
267   if (n != p.second) {
268     throw std::runtime_error(to<std::string>(
269         "LZ4 decompression returned invalid value ", n));
270   }
271   out->append(actualUncompressedLength);
272   return out;
273 }
274
275 /**
276  * Snappy compression
277  */
278
279 /**
280  * Implementation of snappy::Source that reads from a IOBuf chain.
281  */
282 class IOBufSnappySource FOLLY_FINAL : public snappy::Source {
283  public:
284   explicit IOBufSnappySource(const IOBuf* data);
285   size_t Available() const FOLLY_OVERRIDE;
286   const char* Peek(size_t* len) FOLLY_OVERRIDE;
287   void Skip(size_t n) FOLLY_OVERRIDE;
288  private:
289   size_t available_;
290   io::Cursor cursor_;
291 };
292
293 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
294   : available_(data->computeChainDataLength()),
295     cursor_(data) {
296 }
297
298 size_t IOBufSnappySource::Available() const {
299   return available_;
300 }
301
302 const char* IOBufSnappySource::Peek(size_t* len) {
303   auto p = cursor_.peek();
304   *len = p.second;
305   return reinterpret_cast<const char*>(p.first);
306 }
307
308 void IOBufSnappySource::Skip(size_t n) {
309   CHECK_LE(n, available_);
310   cursor_.skip(n);
311   available_ -= n;
312 }
313
314 class SnappyCodec FOLLY_FINAL : public Codec {
315  public:
316   static std::unique_ptr<Codec> create(int level, CodecType type);
317   explicit SnappyCodec(int level, CodecType type);
318
319  private:
320   uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
321   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
322   std::unique_ptr<IOBuf> doUncompress(
323       const IOBuf* data,
324       uint64_t uncompressedLength) FOLLY_OVERRIDE;
325 };
326
327 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
328   return make_unique<SnappyCodec>(level, type);
329 }
330
331 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
332   DCHECK(type == CodecType::SNAPPY);
333   switch (level) {
334   case COMPRESSION_LEVEL_FASTEST:
335   case COMPRESSION_LEVEL_DEFAULT:
336   case COMPRESSION_LEVEL_BEST:
337     level = 1;
338   }
339   if (level != 1) {
340     throw std::invalid_argument(to<std::string>(
341         "SnappyCodec: invalid level: ", level));
342   }
343 }
344
345 uint64_t SnappyCodec::doMaxUncompressedLength() const {
346   // snappy.h uses uint32_t for lengths, so there's that.
347   return std::numeric_limits<uint32_t>::max();
348 }
349
350 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
351   IOBufSnappySource source(data);
352   auto out =
353     IOBuf::create(snappy::MaxCompressedLength(source.Available()));
354
355   snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
356       out->writableTail()));
357
358   size_t n = snappy::Compress(&source, &sink);
359
360   CHECK_LE(n, out->capacity());
361   out->append(n);
362   return out;
363 }
364
365 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
366                                                  uint64_t uncompressedLength) {
367   uint32_t actualUncompressedLength = 0;
368
369   {
370     IOBufSnappySource source(data);
371     if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
372       throw std::runtime_error("snappy::GetUncompressedLength failed");
373     }
374     if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
375         uncompressedLength != actualUncompressedLength) {
376       throw std::runtime_error("snappy: invalid uncompressed length");
377     }
378   }
379
380   auto out = IOBuf::create(actualUncompressedLength);
381
382   {
383     IOBufSnappySource source(data);
384     if (!snappy::RawUncompress(&source,
385                                reinterpret_cast<char*>(out->writableTail()))) {
386       throw std::runtime_error("snappy::RawUncompress failed");
387     }
388   }
389
390   out->append(actualUncompressedLength);
391   return out;
392 }
393
394 /**
395  * Zlib codec
396  */
397 class ZlibCodec FOLLY_FINAL : public Codec {
398  public:
399   static std::unique_ptr<Codec> create(int level, CodecType type);
400   explicit ZlibCodec(int level, CodecType type);
401
402  private:
403   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
404   std::unique_ptr<IOBuf> doUncompress(
405       const IOBuf* data,
406       uint64_t uncompressedLength) FOLLY_OVERRIDE;
407
408   std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
409   bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
410
411   int level_;
412 };
413
414 std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
415   return make_unique<ZlibCodec>(level, type);
416 }
417
418 ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
419   DCHECK(type == CodecType::ZLIB);
420   switch (level) {
421   case COMPRESSION_LEVEL_FASTEST:
422     level = 1;
423     break;
424   case COMPRESSION_LEVEL_DEFAULT:
425     level = Z_DEFAULT_COMPRESSION;
426     break;
427   case COMPRESSION_LEVEL_BEST:
428     level = 9;
429     break;
430   }
431   if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
432     throw std::invalid_argument(to<std::string>(
433         "ZlibCodec: invalid level: ", level));
434   }
435   level_ = level;
436 }
437
438 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
439                                                   uint32_t length) {
440   CHECK_EQ(stream->avail_out, 0);
441
442   auto buf = IOBuf::create(length);
443   buf->append(length);
444
445   stream->next_out = buf->writableData();
446   stream->avail_out = buf->length();
447
448   return buf;
449 }
450
451 bool ZlibCodec::doInflate(z_stream* stream,
452                           IOBuf* head,
453                           uint32_t bufferLength) {
454   if (stream->avail_out == 0) {
455     head->prependChain(addOutputBuffer(stream, bufferLength));
456   }
457
458   int rc = inflate(stream, Z_NO_FLUSH);
459
460   switch (rc) {
461   case Z_OK:
462     break;
463   case Z_STREAM_END:
464     return true;
465   case Z_BUF_ERROR:
466   case Z_NEED_DICT:
467   case Z_DATA_ERROR:
468   case Z_MEM_ERROR:
469     throw std::runtime_error(to<std::string>(
470         "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
471   default:
472     CHECK(false) << rc << ": " << stream->msg;
473   }
474
475   return false;
476 }
477
478
479 std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
480   z_stream stream;
481   stream.zalloc = nullptr;
482   stream.zfree = nullptr;
483   stream.opaque = nullptr;
484
485   int rc = deflateInit(&stream, level_);
486   if (rc != Z_OK) {
487     throw std::runtime_error(to<std::string>(
488         "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
489   }
490
491   stream.next_in = stream.next_out = nullptr;
492   stream.avail_in = stream.avail_out = 0;
493   stream.total_in = stream.total_out = 0;
494
495   bool success = false;
496
497   SCOPE_EXIT {
498     int rc = deflateEnd(&stream);
499     // If we're here because of an exception, it's okay if some data
500     // got dropped.
501     CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
502       << rc << ": " << stream.msg;
503   };
504
505   uint64_t uncompressedLength = data->computeChainDataLength();
506   uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
507
508   // Max 64MiB in one go
509   constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
510   constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
511
512   auto out = addOutputBuffer(
513       &stream,
514       (maxCompressedLength <= maxSingleStepLength ?
515        maxCompressedLength :
516        defaultBufferLength));
517
518   for (auto& range : *data) {
519     if (range.empty()) {
520       continue;
521     }
522
523     stream.next_in = const_cast<uint8_t*>(range.data());
524     stream.avail_in = range.size();
525
526     while (stream.avail_in != 0) {
527       if (stream.avail_out == 0) {
528         out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
529       }
530
531       rc = deflate(&stream, Z_NO_FLUSH);
532
533       CHECK_EQ(rc, Z_OK) << stream.msg;
534     }
535   }
536
537   do {
538     if (stream.avail_out == 0) {
539       out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
540     }
541
542     rc = deflate(&stream, Z_FINISH);
543   } while (rc == Z_OK);
544
545   CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
546
547   out->prev()->trimEnd(stream.avail_out);
548
549   success = true;  // we survived
550
551   return out;
552 }
553
554 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
555                                                uint64_t uncompressedLength) {
556   z_stream stream;
557   stream.zalloc = nullptr;
558   stream.zfree = nullptr;
559   stream.opaque = nullptr;
560
561   int rc = inflateInit(&stream);
562   if (rc != Z_OK) {
563     throw std::runtime_error(to<std::string>(
564         "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
565   }
566
567   stream.next_in = stream.next_out = nullptr;
568   stream.avail_in = stream.avail_out = 0;
569   stream.total_in = stream.total_out = 0;
570
571   bool success = false;
572
573   SCOPE_EXIT {
574     int rc = inflateEnd(&stream);
575     // If we're here because of an exception, it's okay if some data
576     // got dropped.
577     CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
578       << rc << ": " << stream.msg;
579   };
580
581   // Max 64MiB in one go
582   constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
583   constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
584
585   auto out = addOutputBuffer(
586       &stream,
587       ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
588         uncompressedLength <= maxSingleStepLength) ?
589        uncompressedLength :
590        defaultBufferLength));
591
592   bool streamEnd = false;
593   for (auto& range : *data) {
594     if (range.empty()) {
595       continue;
596     }
597
598     stream.next_in = const_cast<uint8_t*>(range.data());
599     stream.avail_in = range.size();
600
601     while (stream.avail_in != 0) {
602       if (streamEnd) {
603         throw std::runtime_error(to<std::string>(
604             "ZlibCodec: junk after end of data"));
605       }
606
607       streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
608     }
609   }
610
611   while (!streamEnd) {
612     streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
613   }
614
615   out->prev()->trimEnd(stream.avail_out);
616
617   if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
618       uncompressedLength != stream.total_out) {
619     throw std::runtime_error(to<std::string>(
620         "ZlibCodec: invalid uncompressed length"));
621   }
622
623   success = true;  // we survived
624
625   return out;
626 }
627
628 /**
629  * LZMA2 compression
630  */
631 class LZMA2Codec FOLLY_FINAL : public Codec {
632  public:
633   static std::unique_ptr<Codec> create(int level, CodecType type);
634   explicit LZMA2Codec(int level, CodecType type);
635
636  private:
637   bool doNeedsUncompressedLength() const FOLLY_OVERRIDE;
638   uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
639
640   bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
641
642   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
643   std::unique_ptr<IOBuf> doUncompress(
644       const IOBuf* data,
645       uint64_t uncompressedLength) FOLLY_OVERRIDE;
646
647   std::unique_ptr<IOBuf> addOutputBuffer(lzma_stream* stream, size_t length);
648   bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength);
649
650   int level_;
651 };
652
653 std::unique_ptr<Codec> LZMA2Codec::create(int level, CodecType type) {
654   return make_unique<LZMA2Codec>(level, type);
655 }
656
657 LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) {
658   DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
659   switch (level) {
660   case COMPRESSION_LEVEL_FASTEST:
661     level = 0;
662     break;
663   case COMPRESSION_LEVEL_DEFAULT:
664     level = LZMA_PRESET_DEFAULT;
665     break;
666   case COMPRESSION_LEVEL_BEST:
667     level = 9;
668     break;
669   }
670   if (level < 0 || level > 9) {
671     throw std::invalid_argument(to<std::string>(
672         "LZMA2Codec: invalid level: ", level));
673   }
674   level_ = level;
675 }
676
677 bool LZMA2Codec::doNeedsUncompressedLength() const {
678   return !encodeSize();
679 }
680
681 uint64_t LZMA2Codec::doMaxUncompressedLength() const {
682   // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
683   return uint64_t(1) << 63;
684 }
685
686 std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
687     lzma_stream* stream,
688     size_t length) {
689
690   CHECK_EQ(stream->avail_out, 0);
691
692   auto buf = IOBuf::create(length);
693   buf->append(length);
694
695   stream->next_out = buf->writableData();
696   stream->avail_out = buf->length();
697
698   return buf;
699 }
700
701 std::unique_ptr<IOBuf> LZMA2Codec::doCompress(const IOBuf* data) {
702   lzma_ret rc;
703   lzma_stream stream = LZMA_STREAM_INIT;
704
705   rc = lzma_easy_encoder(&stream, level_, LZMA_CHECK_NONE);
706   if (rc != LZMA_OK) {
707     throw std::runtime_error(folly::to<std::string>(
708       "LZMA2Codec: lzma_easy_encoder error: ", rc));
709   }
710
711   SCOPE_EXIT { lzma_end(&stream); };
712
713   uint64_t uncompressedLength = data->computeChainDataLength();
714   uint64_t maxCompressedLength = lzma_stream_buffer_bound(uncompressedLength);
715
716   // Max 64MiB in one go
717   constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
718   constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
719
720   auto out = addOutputBuffer(
721     &stream,
722     (maxCompressedLength <= maxSingleStepLength ?
723      maxCompressedLength :
724      defaultBufferLength));
725
726   if (encodeSize()) {
727     auto size = IOBuf::createCombined(kMaxVarintLength64);
728     encodeVarintToIOBuf(uncompressedLength, size.get());
729     size->appendChain(std::move(out));
730     out = std::move(size);
731   }
732
733   for (auto& range : *data) {
734     if (range.empty()) {
735       continue;
736     }
737
738     stream.next_in = const_cast<uint8_t*>(range.data());
739     stream.avail_in = range.size();
740
741     while (stream.avail_in != 0) {
742       if (stream.avail_out == 0) {
743         out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
744       }
745
746       rc = lzma_code(&stream, LZMA_RUN);
747
748       if (rc != LZMA_OK) {
749         throw std::runtime_error(folly::to<std::string>(
750           "LZMA2Codec: lzma_code error: ", rc));
751       }
752     }
753   }
754
755   do {
756     if (stream.avail_out == 0) {
757       out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
758     }
759
760     rc = lzma_code(&stream, LZMA_FINISH);
761   } while (rc == LZMA_OK);
762
763   if (rc != LZMA_STREAM_END) {
764     throw std::runtime_error(folly::to<std::string>(
765       "LZMA2Codec: lzma_code ended with error: ", rc));
766   }
767
768   out->prev()->trimEnd(stream.avail_out);
769
770   return out;
771 }
772
773 bool LZMA2Codec::doInflate(lzma_stream* stream,
774                           IOBuf* head,
775                           size_t bufferLength) {
776   if (stream->avail_out == 0) {
777     head->prependChain(addOutputBuffer(stream, bufferLength));
778   }
779
780   lzma_ret rc = lzma_code(stream, LZMA_RUN);
781
782   switch (rc) {
783   case LZMA_OK:
784     break;
785   case LZMA_STREAM_END:
786     return true;
787   default:
788     throw std::runtime_error(to<std::string>(
789         "LZMA2Codec: lzma_code error: ", rc));
790   }
791
792   return false;
793 }
794
795 std::unique_ptr<IOBuf> LZMA2Codec::doUncompress(const IOBuf* data,
796                                                uint64_t uncompressedLength) {
797   lzma_ret rc;
798   lzma_stream stream = LZMA_STREAM_INIT;
799
800   rc = lzma_auto_decoder(&stream, std::numeric_limits<uint64_t>::max(), 0);
801   if (rc != LZMA_OK) {
802     throw std::runtime_error(folly::to<std::string>(
803       "LZMA2Codec: lzma_auto_decoder error: ", rc));
804   }
805
806   SCOPE_EXIT { lzma_end(&stream); };
807
808   // Max 64MiB in one go
809   constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
810   constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
811
812   folly::io::Cursor cursor(data);
813   uint64_t actualUncompressedLength;
814   if (encodeSize()) {
815     actualUncompressedLength = decodeVarintFromCursor(cursor);
816     if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
817         uncompressedLength != actualUncompressedLength) {
818       throw std::runtime_error("LZMA2Codec: invalid uncompressed length");
819     }
820   } else {
821     actualUncompressedLength = uncompressedLength;
822     DCHECK_NE(actualUncompressedLength, UNKNOWN_UNCOMPRESSED_LENGTH);
823   }
824
825   auto out = addOutputBuffer(
826       &stream,
827       (actualUncompressedLength <= maxSingleStepLength ?
828        actualUncompressedLength :
829        defaultBufferLength));
830
831   bool streamEnd = false;
832   auto buf = cursor.peek();
833   while (buf.second != 0) {
834     stream.next_in = const_cast<uint8_t*>(buf.first);
835     stream.avail_in = buf.second;
836
837     while (stream.avail_in != 0) {
838       if (streamEnd) {
839         throw std::runtime_error(to<std::string>(
840             "LZMA2Codec: junk after end of data"));
841       }
842
843       streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
844     }
845
846     cursor.skip(buf.second);
847     buf = cursor.peek();
848   }
849
850   while (!streamEnd) {
851     streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
852   }
853
854   out->prev()->trimEnd(stream.avail_out);
855
856   if (actualUncompressedLength != stream.total_out) {
857     throw std::runtime_error(to<std::string>(
858         "LZMA2Codec: invalid uncompressed length"));
859   }
860
861   return out;
862 }
863
864
865 typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
866
867 CodecFactory gCodecFactories[
868     static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
869   nullptr,  // USER_DEFINED
870   NoCompressionCodec::create,
871   LZ4Codec::create,
872   SnappyCodec::create,
873   ZlibCodec::create,
874   LZ4Codec::create,
875   LZMA2Codec::create,
876   LZMA2Codec::create,
877 };
878
879 }  // namespace
880
881 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
882   size_t idx = static_cast<size_t>(type);
883   if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
884     throw std::invalid_argument(to<std::string>(
885         "Compression type ", idx, " not supported"));
886   }
887   auto factory = gCodecFactories[idx];
888   if (!factory) {
889     throw std::invalid_argument(to<std::string>(
890         "Compression type ", idx, " not supported"));
891   }
892   auto codec = (*factory)(level, type);
893   DCHECK_EQ(static_cast<size_t>(codec->type()), idx);
894   return codec;
895 }
896
897 }}  // namespaces
898