improve io::Cursor read() performance for small sizeof(T)
[folly.git] / folly / io / Compression.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/io/Compression.h>
18
19 #if FOLLY_HAVE_LIBLZ4
20 #include <lz4.h>
21 #include <lz4hc.h>
22 #endif
23
24 #include <glog/logging.h>
25
26 #if FOLLY_HAVE_LIBSNAPPY
27 #include <snappy.h>
28 #include <snappy-sinksource.h>
29 #endif
30
31 #if FOLLY_HAVE_LIBZ
32 #include <zlib.h>
33 #endif
34
35 #if FOLLY_HAVE_LIBLZMA
36 #include <lzma.h>
37 #endif
38
39 #include <folly/Conv.h>
40 #include <folly/Memory.h>
41 #include <folly/Portability.h>
42 #include <folly/ScopeGuard.h>
43 #include <folly/Varint.h>
44 #include <folly/io/Cursor.h>
45
46 namespace folly { namespace io {
47
48 Codec::Codec(CodecType type) : type_(type) { }
49
50 // Ensure consistent behavior in the nullptr case
51 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
52   uint64_t len = data->computeChainDataLength();
53   if (len == 0) {
54     return IOBuf::create(0);
55   } else if (len > maxUncompressedLength()) {
56     throw std::runtime_error("Codec: uncompressed length too large");
57   }
58
59   return doCompress(data);
60 }
61
62 std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
63                                          uint64_t uncompressedLength) {
64   if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
65     if (needsUncompressedLength()) {
66       throw std::invalid_argument("Codec: uncompressed length required");
67     }
68   } else if (uncompressedLength > maxUncompressedLength()) {
69     throw std::runtime_error("Codec: uncompressed length too large");
70   }
71
72   if (data->empty()) {
73     if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
74         uncompressedLength != 0) {
75       throw std::runtime_error("Codec: invalid uncompressed length");
76     }
77     return IOBuf::create(0);
78   }
79
80   return doUncompress(data, uncompressedLength);
81 }
82
83 bool Codec::needsUncompressedLength() const {
84   return doNeedsUncompressedLength();
85 }
86
87 uint64_t Codec::maxUncompressedLength() const {
88   return doMaxUncompressedLength();
89 }
90
91 bool Codec::doNeedsUncompressedLength() const {
92   return false;
93 }
94
95 uint64_t Codec::doMaxUncompressedLength() const {
96   return UNLIMITED_UNCOMPRESSED_LENGTH;
97 }
98
99 namespace {
100
101 /**
102  * No compression
103  */
104 class NoCompressionCodec FOLLY_FINAL : public Codec {
105  public:
106   static std::unique_ptr<Codec> create(int level, CodecType type);
107   explicit NoCompressionCodec(int level, CodecType type);
108
109  private:
110   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
111   std::unique_ptr<IOBuf> doUncompress(
112       const IOBuf* data,
113       uint64_t uncompressedLength) FOLLY_OVERRIDE;
114 };
115
116 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
117   return make_unique<NoCompressionCodec>(level, type);
118 }
119
120 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
121   : Codec(type) {
122   DCHECK(type == CodecType::NO_COMPRESSION);
123   switch (level) {
124   case COMPRESSION_LEVEL_DEFAULT:
125   case COMPRESSION_LEVEL_FASTEST:
126   case COMPRESSION_LEVEL_BEST:
127     level = 0;
128   }
129   if (level != 0) {
130     throw std::invalid_argument(to<std::string>(
131         "NoCompressionCodec: invalid level ", level));
132   }
133 }
134
135 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
136     const IOBuf* data) {
137   return data->clone();
138 }
139
140 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
141     const IOBuf* data,
142     uint64_t uncompressedLength) {
143   if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
144       data->computeChainDataLength() != uncompressedLength) {
145     throw std::runtime_error(to<std::string>(
146         "NoCompressionCodec: invalid uncompressed length"));
147   }
148   return data->clone();
149 }
150
151 namespace {
152
153 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
154   DCHECK_GE(out->tailroom(), kMaxVarintLength64);
155   out->append(encodeVarint(val, out->writableTail()));
156 }
157
158 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
159   uint64_t val = 0;
160   int8_t b = 0;
161   for (int shift = 0; shift <= 63; shift += 7) {
162     b = cursor.read<int8_t>();
163     val |= static_cast<uint64_t>(b & 0x7f) << shift;
164     if (b >= 0) {
165       break;
166     }
167   }
168   if (b < 0) {
169     throw std::invalid_argument("Invalid varint value. Too big.");
170   }
171   return val;
172 }
173
174 }  // namespace
175
176 #if FOLLY_HAVE_LIBLZ4
177
178 /**
179  * LZ4 compression
180  */
181 class LZ4Codec FOLLY_FINAL : public Codec {
182  public:
183   static std::unique_ptr<Codec> create(int level, CodecType type);
184   explicit LZ4Codec(int level, CodecType type);
185
186  private:
187   bool doNeedsUncompressedLength() const FOLLY_OVERRIDE;
188   uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
189
190   bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
191
192   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
193   std::unique_ptr<IOBuf> doUncompress(
194       const IOBuf* data,
195       uint64_t uncompressedLength) FOLLY_OVERRIDE;
196
197   bool highCompression_;
198 };
199
200 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
201   return make_unique<LZ4Codec>(level, type);
202 }
203
204 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
205   DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
206
207   switch (level) {
208   case COMPRESSION_LEVEL_FASTEST:
209   case COMPRESSION_LEVEL_DEFAULT:
210     level = 1;
211     break;
212   case COMPRESSION_LEVEL_BEST:
213     level = 2;
214     break;
215   }
216   if (level < 1 || level > 2) {
217     throw std::invalid_argument(to<std::string>(
218         "LZ4Codec: invalid level: ", level));
219   }
220   highCompression_ = (level > 1);
221 }
222
223 bool LZ4Codec::doNeedsUncompressedLength() const {
224   return !encodeSize();
225 }
226
227 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
228 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
229 // here.
230 #ifndef LZ4_MAX_INPUT_SIZE
231 # define LZ4_MAX_INPUT_SIZE 0x7E000000
232 #endif
233
234 uint64_t LZ4Codec::doMaxUncompressedLength() const {
235   return LZ4_MAX_INPUT_SIZE;
236 }
237
238 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
239   std::unique_ptr<IOBuf> clone;
240   if (data->isChained()) {
241     // LZ4 doesn't support streaming, so we have to coalesce
242     clone = data->clone();
243     clone->coalesce();
244     data = clone.get();
245   }
246
247   uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
248   auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
249   if (encodeSize()) {
250     encodeVarintToIOBuf(data->length(), out.get());
251   }
252
253   int n;
254   if (highCompression_) {
255     n = LZ4_compressHC(reinterpret_cast<const char*>(data->data()),
256                        reinterpret_cast<char*>(out->writableTail()),
257                        data->length());
258   } else {
259     n = LZ4_compress(reinterpret_cast<const char*>(data->data()),
260                      reinterpret_cast<char*>(out->writableTail()),
261                      data->length());
262   }
263
264   CHECK_GE(n, 0);
265   CHECK_LE(n, out->capacity());
266
267   out->append(n);
268   return out;
269 }
270
271 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
272     const IOBuf* data,
273     uint64_t uncompressedLength) {
274   std::unique_ptr<IOBuf> clone;
275   if (data->isChained()) {
276     // LZ4 doesn't support streaming, so we have to coalesce
277     clone = data->clone();
278     clone->coalesce();
279     data = clone.get();
280   }
281
282   folly::io::Cursor cursor(data);
283   uint64_t actualUncompressedLength;
284   if (encodeSize()) {
285     actualUncompressedLength = decodeVarintFromCursor(cursor);
286     if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
287         uncompressedLength != actualUncompressedLength) {
288       throw std::runtime_error("LZ4Codec: invalid uncompressed length");
289     }
290   } else {
291     actualUncompressedLength = uncompressedLength;
292     if (actualUncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH ||
293         actualUncompressedLength > maxUncompressedLength()) {
294       throw std::runtime_error("LZ4Codec: invalid uncompressed length");
295     }
296   }
297
298   auto p = cursor.peek();
299   auto out = IOBuf::create(actualUncompressedLength);
300   int n = LZ4_decompress_safe(reinterpret_cast<const char*>(p.first),
301                               reinterpret_cast<char*>(out->writableTail()),
302                               p.second,
303                               actualUncompressedLength);
304
305   if (n < 0 || uint64_t(n) != actualUncompressedLength) {
306     throw std::runtime_error(to<std::string>(
307         "LZ4 decompression returned invalid value ", n));
308   }
309   out->append(actualUncompressedLength);
310   return out;
311 }
312
313 #endif  // FOLLY_HAVE_LIBLZ4
314
315 #if FOLLY_HAVE_LIBSNAPPY
316
317 /**
318  * Snappy compression
319  */
320
321 /**
322  * Implementation of snappy::Source that reads from a IOBuf chain.
323  */
324 class IOBufSnappySource FOLLY_FINAL : public snappy::Source {
325  public:
326   explicit IOBufSnappySource(const IOBuf* data);
327   size_t Available() const FOLLY_OVERRIDE;
328   const char* Peek(size_t* len) FOLLY_OVERRIDE;
329   void Skip(size_t n) FOLLY_OVERRIDE;
330  private:
331   size_t available_;
332   io::Cursor cursor_;
333 };
334
335 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
336   : available_(data->computeChainDataLength()),
337     cursor_(data) {
338 }
339
340 size_t IOBufSnappySource::Available() const {
341   return available_;
342 }
343
344 const char* IOBufSnappySource::Peek(size_t* len) {
345   auto p = cursor_.peek();
346   *len = p.second;
347   return reinterpret_cast<const char*>(p.first);
348 }
349
350 void IOBufSnappySource::Skip(size_t n) {
351   CHECK_LE(n, available_);
352   cursor_.skip(n);
353   available_ -= n;
354 }
355
356 class SnappyCodec FOLLY_FINAL : public Codec {
357  public:
358   static std::unique_ptr<Codec> create(int level, CodecType type);
359   explicit SnappyCodec(int level, CodecType type);
360
361  private:
362   uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
363   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
364   std::unique_ptr<IOBuf> doUncompress(
365       const IOBuf* data,
366       uint64_t uncompressedLength) FOLLY_OVERRIDE;
367 };
368
369 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
370   return make_unique<SnappyCodec>(level, type);
371 }
372
373 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
374   DCHECK(type == CodecType::SNAPPY);
375   switch (level) {
376   case COMPRESSION_LEVEL_FASTEST:
377   case COMPRESSION_LEVEL_DEFAULT:
378   case COMPRESSION_LEVEL_BEST:
379     level = 1;
380   }
381   if (level != 1) {
382     throw std::invalid_argument(to<std::string>(
383         "SnappyCodec: invalid level: ", level));
384   }
385 }
386
387 uint64_t SnappyCodec::doMaxUncompressedLength() const {
388   // snappy.h uses uint32_t for lengths, so there's that.
389   return std::numeric_limits<uint32_t>::max();
390 }
391
392 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
393   IOBufSnappySource source(data);
394   auto out =
395     IOBuf::create(snappy::MaxCompressedLength(source.Available()));
396
397   snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
398       out->writableTail()));
399
400   size_t n = snappy::Compress(&source, &sink);
401
402   CHECK_LE(n, out->capacity());
403   out->append(n);
404   return out;
405 }
406
407 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
408                                                  uint64_t uncompressedLength) {
409   uint32_t actualUncompressedLength = 0;
410
411   {
412     IOBufSnappySource source(data);
413     if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
414       throw std::runtime_error("snappy::GetUncompressedLength failed");
415     }
416     if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
417         uncompressedLength != actualUncompressedLength) {
418       throw std::runtime_error("snappy: invalid uncompressed length");
419     }
420   }
421
422   auto out = IOBuf::create(actualUncompressedLength);
423
424   {
425     IOBufSnappySource source(data);
426     if (!snappy::RawUncompress(&source,
427                                reinterpret_cast<char*>(out->writableTail()))) {
428       throw std::runtime_error("snappy::RawUncompress failed");
429     }
430   }
431
432   out->append(actualUncompressedLength);
433   return out;
434 }
435
436 #endif  // FOLLY_HAVE_LIBSNAPPY
437
438 #if FOLLY_HAVE_LIBZ
439 /**
440  * Zlib codec
441  */
442 class ZlibCodec FOLLY_FINAL : public Codec {
443  public:
444   static std::unique_ptr<Codec> create(int level, CodecType type);
445   explicit ZlibCodec(int level, CodecType type);
446
447  private:
448   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
449   std::unique_ptr<IOBuf> doUncompress(
450       const IOBuf* data,
451       uint64_t uncompressedLength) FOLLY_OVERRIDE;
452
453   std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
454   bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
455
456   int level_;
457 };
458
459 std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
460   return make_unique<ZlibCodec>(level, type);
461 }
462
463 ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
464   DCHECK(type == CodecType::ZLIB);
465   switch (level) {
466   case COMPRESSION_LEVEL_FASTEST:
467     level = 1;
468     break;
469   case COMPRESSION_LEVEL_DEFAULT:
470     level = Z_DEFAULT_COMPRESSION;
471     break;
472   case COMPRESSION_LEVEL_BEST:
473     level = 9;
474     break;
475   }
476   if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
477     throw std::invalid_argument(to<std::string>(
478         "ZlibCodec: invalid level: ", level));
479   }
480   level_ = level;
481 }
482
483 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
484                                                   uint32_t length) {
485   CHECK_EQ(stream->avail_out, 0);
486
487   auto buf = IOBuf::create(length);
488   buf->append(length);
489
490   stream->next_out = buf->writableData();
491   stream->avail_out = buf->length();
492
493   return buf;
494 }
495
496 bool ZlibCodec::doInflate(z_stream* stream,
497                           IOBuf* head,
498                           uint32_t bufferLength) {
499   if (stream->avail_out == 0) {
500     head->prependChain(addOutputBuffer(stream, bufferLength));
501   }
502
503   int rc = inflate(stream, Z_NO_FLUSH);
504
505   switch (rc) {
506   case Z_OK:
507     break;
508   case Z_STREAM_END:
509     return true;
510   case Z_BUF_ERROR:
511   case Z_NEED_DICT:
512   case Z_DATA_ERROR:
513   case Z_MEM_ERROR:
514     throw std::runtime_error(to<std::string>(
515         "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
516   default:
517     CHECK(false) << rc << ": " << stream->msg;
518   }
519
520   return false;
521 }
522
523 std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
524   z_stream stream;
525   stream.zalloc = nullptr;
526   stream.zfree = nullptr;
527   stream.opaque = nullptr;
528
529   int rc = deflateInit(&stream, level_);
530   if (rc != Z_OK) {
531     throw std::runtime_error(to<std::string>(
532         "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
533   }
534
535   stream.next_in = stream.next_out = nullptr;
536   stream.avail_in = stream.avail_out = 0;
537   stream.total_in = stream.total_out = 0;
538
539   bool success = false;
540
541   SCOPE_EXIT {
542     int rc = deflateEnd(&stream);
543     // If we're here because of an exception, it's okay if some data
544     // got dropped.
545     CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
546       << rc << ": " << stream.msg;
547   };
548
549   uint64_t uncompressedLength = data->computeChainDataLength();
550   uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
551
552   // Max 64MiB in one go
553   constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
554   constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
555
556   auto out = addOutputBuffer(
557       &stream,
558       (maxCompressedLength <= maxSingleStepLength ?
559        maxCompressedLength :
560        defaultBufferLength));
561
562   for (auto& range : *data) {
563     uint64_t remaining = range.size();
564     uint64_t written = 0;
565     while (remaining) {
566       uint32_t step = (remaining > maxSingleStepLength ?
567                        maxSingleStepLength : remaining);
568       stream.next_in = const_cast<uint8_t*>(range.data() + written);
569       stream.avail_in = step;
570       remaining -= step;
571       written += step;
572
573       while (stream.avail_in != 0) {
574         if (stream.avail_out == 0) {
575           out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
576         }
577
578         rc = deflate(&stream, Z_NO_FLUSH);
579
580         CHECK_EQ(rc, Z_OK) << stream.msg;
581       }
582     }
583   }
584
585   do {
586     if (stream.avail_out == 0) {
587       out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
588     }
589
590     rc = deflate(&stream, Z_FINISH);
591   } while (rc == Z_OK);
592
593   CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
594
595   out->prev()->trimEnd(stream.avail_out);
596
597   success = true;  // we survived
598
599   return out;
600 }
601
602 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
603                                                uint64_t uncompressedLength) {
604   z_stream stream;
605   stream.zalloc = nullptr;
606   stream.zfree = nullptr;
607   stream.opaque = nullptr;
608
609   int rc = inflateInit(&stream);
610   if (rc != Z_OK) {
611     throw std::runtime_error(to<std::string>(
612         "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
613   }
614
615   stream.next_in = stream.next_out = nullptr;
616   stream.avail_in = stream.avail_out = 0;
617   stream.total_in = stream.total_out = 0;
618
619   bool success = false;
620
621   SCOPE_EXIT {
622     int rc = inflateEnd(&stream);
623     // If we're here because of an exception, it's okay if some data
624     // got dropped.
625     CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
626       << rc << ": " << stream.msg;
627   };
628
629   // Max 64MiB in one go
630   constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
631   constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
632
633   auto out = addOutputBuffer(
634       &stream,
635       ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
636         uncompressedLength <= maxSingleStepLength) ?
637        uncompressedLength :
638        defaultBufferLength));
639
640   bool streamEnd = false;
641   for (auto& range : *data) {
642     if (range.empty()) {
643       continue;
644     }
645
646     stream.next_in = const_cast<uint8_t*>(range.data());
647     stream.avail_in = range.size();
648
649     while (stream.avail_in != 0) {
650       if (streamEnd) {
651         throw std::runtime_error(to<std::string>(
652             "ZlibCodec: junk after end of data"));
653       }
654
655       streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
656     }
657   }
658
659   while (!streamEnd) {
660     streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
661   }
662
663   out->prev()->trimEnd(stream.avail_out);
664
665   if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
666       uncompressedLength != stream.total_out) {
667     throw std::runtime_error(to<std::string>(
668         "ZlibCodec: invalid uncompressed length"));
669   }
670
671   success = true;  // we survived
672
673   return out;
674 }
675
676 #endif  // FOLLY_HAVE_LIBZ
677
678 #if FOLLY_HAVE_LIBLZMA
679
680 /**
681  * LZMA2 compression
682  */
683 class LZMA2Codec FOLLY_FINAL : public Codec {
684  public:
685   static std::unique_ptr<Codec> create(int level, CodecType type);
686   explicit LZMA2Codec(int level, CodecType type);
687
688  private:
689   bool doNeedsUncompressedLength() const FOLLY_OVERRIDE;
690   uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
691
692   bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
693
694   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
695   std::unique_ptr<IOBuf> doUncompress(
696       const IOBuf* data,
697       uint64_t uncompressedLength) FOLLY_OVERRIDE;
698
699   std::unique_ptr<IOBuf> addOutputBuffer(lzma_stream* stream, size_t length);
700   bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength);
701
702   int level_;
703 };
704
705 std::unique_ptr<Codec> LZMA2Codec::create(int level, CodecType type) {
706   return make_unique<LZMA2Codec>(level, type);
707 }
708
709 LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) {
710   DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
711   switch (level) {
712   case COMPRESSION_LEVEL_FASTEST:
713     level = 0;
714     break;
715   case COMPRESSION_LEVEL_DEFAULT:
716     level = LZMA_PRESET_DEFAULT;
717     break;
718   case COMPRESSION_LEVEL_BEST:
719     level = 9;
720     break;
721   }
722   if (level < 0 || level > 9) {
723     throw std::invalid_argument(to<std::string>(
724         "LZMA2Codec: invalid level: ", level));
725   }
726   level_ = level;
727 }
728
729 bool LZMA2Codec::doNeedsUncompressedLength() const {
730   return !encodeSize();
731 }
732
733 uint64_t LZMA2Codec::doMaxUncompressedLength() const {
734   // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
735   return uint64_t(1) << 63;
736 }
737
738 std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
739     lzma_stream* stream,
740     size_t length) {
741
742   CHECK_EQ(stream->avail_out, 0);
743
744   auto buf = IOBuf::create(length);
745   buf->append(length);
746
747   stream->next_out = buf->writableData();
748   stream->avail_out = buf->length();
749
750   return buf;
751 }
752
753 std::unique_ptr<IOBuf> LZMA2Codec::doCompress(const IOBuf* data) {
754   lzma_ret rc;
755   lzma_stream stream = LZMA_STREAM_INIT;
756
757   rc = lzma_easy_encoder(&stream, level_, LZMA_CHECK_NONE);
758   if (rc != LZMA_OK) {
759     throw std::runtime_error(folly::to<std::string>(
760       "LZMA2Codec: lzma_easy_encoder error: ", rc));
761   }
762
763   SCOPE_EXIT { lzma_end(&stream); };
764
765   uint64_t uncompressedLength = data->computeChainDataLength();
766   uint64_t maxCompressedLength = lzma_stream_buffer_bound(uncompressedLength);
767
768   // Max 64MiB in one go
769   constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
770   constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
771
772   auto out = addOutputBuffer(
773     &stream,
774     (maxCompressedLength <= maxSingleStepLength ?
775      maxCompressedLength :
776      defaultBufferLength));
777
778   if (encodeSize()) {
779     auto size = IOBuf::createCombined(kMaxVarintLength64);
780     encodeVarintToIOBuf(uncompressedLength, size.get());
781     size->appendChain(std::move(out));
782     out = std::move(size);
783   }
784
785   for (auto& range : *data) {
786     if (range.empty()) {
787       continue;
788     }
789
790     stream.next_in = const_cast<uint8_t*>(range.data());
791     stream.avail_in = range.size();
792
793     while (stream.avail_in != 0) {
794       if (stream.avail_out == 0) {
795         out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
796       }
797
798       rc = lzma_code(&stream, LZMA_RUN);
799
800       if (rc != LZMA_OK) {
801         throw std::runtime_error(folly::to<std::string>(
802           "LZMA2Codec: lzma_code error: ", rc));
803       }
804     }
805   }
806
807   do {
808     if (stream.avail_out == 0) {
809       out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
810     }
811
812     rc = lzma_code(&stream, LZMA_FINISH);
813   } while (rc == LZMA_OK);
814
815   if (rc != LZMA_STREAM_END) {
816     throw std::runtime_error(folly::to<std::string>(
817       "LZMA2Codec: lzma_code ended with error: ", rc));
818   }
819
820   out->prev()->trimEnd(stream.avail_out);
821
822   return out;
823 }
824
825 bool LZMA2Codec::doInflate(lzma_stream* stream,
826                           IOBuf* head,
827                           size_t bufferLength) {
828   if (stream->avail_out == 0) {
829     head->prependChain(addOutputBuffer(stream, bufferLength));
830   }
831
832   lzma_ret rc = lzma_code(stream, LZMA_RUN);
833
834   switch (rc) {
835   case LZMA_OK:
836     break;
837   case LZMA_STREAM_END:
838     return true;
839   default:
840     throw std::runtime_error(to<std::string>(
841         "LZMA2Codec: lzma_code error: ", rc));
842   }
843
844   return false;
845 }
846
847 std::unique_ptr<IOBuf> LZMA2Codec::doUncompress(const IOBuf* data,
848                                                uint64_t uncompressedLength) {
849   lzma_ret rc;
850   lzma_stream stream = LZMA_STREAM_INIT;
851
852   rc = lzma_auto_decoder(&stream, std::numeric_limits<uint64_t>::max(), 0);
853   if (rc != LZMA_OK) {
854     throw std::runtime_error(folly::to<std::string>(
855       "LZMA2Codec: lzma_auto_decoder error: ", rc));
856   }
857
858   SCOPE_EXIT { lzma_end(&stream); };
859
860   // Max 64MiB in one go
861   constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
862   constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
863
864   folly::io::Cursor cursor(data);
865   uint64_t actualUncompressedLength;
866   if (encodeSize()) {
867     actualUncompressedLength = decodeVarintFromCursor(cursor);
868     if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
869         uncompressedLength != actualUncompressedLength) {
870       throw std::runtime_error("LZMA2Codec: invalid uncompressed length");
871     }
872   } else {
873     actualUncompressedLength = uncompressedLength;
874     DCHECK_NE(actualUncompressedLength, UNKNOWN_UNCOMPRESSED_LENGTH);
875   }
876
877   auto out = addOutputBuffer(
878       &stream,
879       (actualUncompressedLength <= maxSingleStepLength ?
880        actualUncompressedLength :
881        defaultBufferLength));
882
883   bool streamEnd = false;
884   auto buf = cursor.peek();
885   while (buf.second != 0) {
886     stream.next_in = const_cast<uint8_t*>(buf.first);
887     stream.avail_in = buf.second;
888
889     while (stream.avail_in != 0) {
890       if (streamEnd) {
891         throw std::runtime_error(to<std::string>(
892             "LZMA2Codec: junk after end of data"));
893       }
894
895       streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
896     }
897
898     cursor.skip(buf.second);
899     buf = cursor.peek();
900   }
901
902   while (!streamEnd) {
903     streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
904   }
905
906   out->prev()->trimEnd(stream.avail_out);
907
908   if (actualUncompressedLength != stream.total_out) {
909     throw std::runtime_error(to<std::string>(
910         "LZMA2Codec: invalid uncompressed length"));
911   }
912
913   return out;
914 }
915
916 #endif  // FOLLY_HAVE_LIBLZMA
917
918 typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
919
920 CodecFactory gCodecFactories[
921     static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
922   nullptr,  // USER_DEFINED
923   NoCompressionCodec::create,
924
925 #if FOLLY_HAVE_LIBLZ4
926   LZ4Codec::create,
927 #else
928   nullptr,
929 #endif
930
931 #if FOLLY_HAVE_LIBSNAPPY
932   SnappyCodec::create,
933 #else
934   nullptr,
935 #endif
936
937 #if FOLLY_HAVE_LIBZ
938   ZlibCodec::create,
939 #else
940   nullptr,
941 #endif
942
943 #if FOLLY_HAVE_LIBLZ4
944   LZ4Codec::create,
945 #else
946   nullptr,
947 #endif
948
949 #if FOLLY_HAVE_LIBLZMA
950   LZMA2Codec::create,
951   LZMA2Codec::create,
952 #else
953   nullptr,
954   nullptr,
955 #endif
956 };
957
958 }  // namespace
959
960 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
961   size_t idx = static_cast<size_t>(type);
962   if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
963     throw std::invalid_argument(to<std::string>(
964         "Compression type ", idx, " not supported"));
965   }
966   auto factory = gCodecFactories[idx];
967   if (!factory) {
968     throw std::invalid_argument(to<std::string>(
969         "Compression type ", idx, " not supported"));
970   }
971   auto codec = (*factory)(level, type);
972   DCHECK_EQ(static_cast<size_t>(codec->type()), idx);
973   return codec;
974 }
975
976 }}  // namespaces