2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
22 #if LZ4_VERSION_NUMBER >= 10301
27 #include <glog/logging.h>
29 #if FOLLY_HAVE_LIBSNAPPY
31 #include <snappy-sinksource.h>
38 #if FOLLY_HAVE_LIBLZMA
42 #if FOLLY_HAVE_LIBZSTD
46 #include <folly/Conv.h>
47 #include <folly/Memory.h>
48 #include <folly/Portability.h>
49 #include <folly/ScopeGuard.h>
50 #include <folly/Varint.h>
51 #include <folly/io/Cursor.h>
53 namespace folly { namespace io {
55 Codec::Codec(CodecType type) : type_(type) { }
57 // Ensure consistent behavior in the nullptr case
58 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
59 uint64_t len = data->computeChainDataLength();
61 return IOBuf::create(0);
63 if (len > maxUncompressedLength()) {
64 throw std::runtime_error("Codec: uncompressed length too large");
67 return doCompress(data);
70 std::string Codec::compress(const StringPiece data) {
71 const uint64_t len = data.size();
75 if (len > maxUncompressedLength()) {
76 throw std::runtime_error("Codec: uncompressed length too large");
79 return doCompressString(data);
82 std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
83 uint64_t uncompressedLength) {
84 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
85 if (needsUncompressedLength()) {
86 throw std::invalid_argument("Codec: uncompressed length required");
88 } else if (uncompressedLength > maxUncompressedLength()) {
89 throw std::runtime_error("Codec: uncompressed length too large");
93 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
94 uncompressedLength != 0) {
95 throw std::runtime_error("Codec: invalid uncompressed length");
97 return IOBuf::create(0);
100 return doUncompress(data, uncompressedLength);
103 std::string Codec::uncompress(
104 const StringPiece data,
105 uint64_t uncompressedLength) {
106 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
107 if (needsUncompressedLength()) {
108 throw std::invalid_argument("Codec: uncompressed length required");
110 } else if (uncompressedLength > maxUncompressedLength()) {
111 throw std::runtime_error("Codec: uncompressed length too large");
115 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
116 uncompressedLength != 0) {
117 throw std::runtime_error("Codec: invalid uncompressed length");
122 return doUncompressString(data, uncompressedLength);
125 bool Codec::needsUncompressedLength() const {
126 return doNeedsUncompressedLength();
129 uint64_t Codec::maxUncompressedLength() const {
130 return doMaxUncompressedLength();
133 bool Codec::doNeedsUncompressedLength() const {
137 uint64_t Codec::doMaxUncompressedLength() const {
138 return UNLIMITED_UNCOMPRESSED_LENGTH;
141 std::string Codec::doCompressString(const StringPiece data) {
142 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
143 auto outputBuffer = doCompress(&inputBuffer);
145 output.reserve(outputBuffer->computeChainDataLength());
146 for (auto range : *outputBuffer) {
147 output.append(reinterpret_cast<const char*>(range.data()), range.size());
152 std::string Codec::doUncompressString(
153 const StringPiece data,
154 uint64_t uncompressedLength) {
155 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
156 auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
158 output.reserve(outputBuffer->computeChainDataLength());
159 for (auto range : *outputBuffer) {
160 output.append(reinterpret_cast<const char*>(range.data()), range.size());
170 class NoCompressionCodec final : public Codec {
172 static std::unique_ptr<Codec> create(int level, CodecType type);
173 explicit NoCompressionCodec(int level, CodecType type);
176 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
177 std::unique_ptr<IOBuf> doUncompress(
179 uint64_t uncompressedLength) override;
182 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
183 return make_unique<NoCompressionCodec>(level, type);
186 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
188 DCHECK(type == CodecType::NO_COMPRESSION);
190 case COMPRESSION_LEVEL_DEFAULT:
191 case COMPRESSION_LEVEL_FASTEST:
192 case COMPRESSION_LEVEL_BEST:
196 throw std::invalid_argument(to<std::string>(
197 "NoCompressionCodec: invalid level ", level));
201 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
203 return data->clone();
206 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
208 uint64_t uncompressedLength) {
209 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
210 data->computeChainDataLength() != uncompressedLength) {
211 throw std::runtime_error(to<std::string>(
212 "NoCompressionCodec: invalid uncompressed length"));
214 return data->clone();
217 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
221 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
222 DCHECK_GE(out->tailroom(), kMaxVarintLength64);
223 out->append(encodeVarint(val, out->writableTail()));
226 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
229 for (int shift = 0; shift <= 63; shift += 7) {
230 b = cursor.read<int8_t>();
231 val |= static_cast<uint64_t>(b & 0x7f) << shift;
237 throw std::invalid_argument("Invalid varint value. Too big.");
244 #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
246 #if FOLLY_HAVE_LIBLZ4
251 class LZ4Codec final : public Codec {
253 static std::unique_ptr<Codec> create(int level, CodecType type);
254 explicit LZ4Codec(int level, CodecType type);
257 bool doNeedsUncompressedLength() const override;
258 uint64_t doMaxUncompressedLength() const override;
260 bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
262 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
263 std::unique_ptr<IOBuf> doUncompress(
265 uint64_t uncompressedLength) override;
267 bool highCompression_;
270 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
271 return make_unique<LZ4Codec>(level, type);
274 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
275 DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
278 case COMPRESSION_LEVEL_FASTEST:
279 case COMPRESSION_LEVEL_DEFAULT:
282 case COMPRESSION_LEVEL_BEST:
286 if (level < 1 || level > 2) {
287 throw std::invalid_argument(to<std::string>(
288 "LZ4Codec: invalid level: ", level));
290 highCompression_ = (level > 1);
293 bool LZ4Codec::doNeedsUncompressedLength() const {
294 return !encodeSize();
297 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
298 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
300 #ifndef LZ4_MAX_INPUT_SIZE
301 # define LZ4_MAX_INPUT_SIZE 0x7E000000
304 uint64_t LZ4Codec::doMaxUncompressedLength() const {
305 return LZ4_MAX_INPUT_SIZE;
308 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
310 if (data->isChained()) {
311 // LZ4 doesn't support streaming, so we have to coalesce
312 clone = data->cloneCoalescedAsValue();
316 uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
317 auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
319 encodeVarintToIOBuf(data->length(), out.get());
323 auto input = reinterpret_cast<const char*>(data->data());
324 auto output = reinterpret_cast<char*>(out->writableTail());
325 const auto inputLength = data->length();
326 #if LZ4_VERSION_NUMBER >= 10700
327 if (highCompression_) {
328 n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
330 n = LZ4_compress_default(input, output, inputLength, out->tailroom());
333 if (highCompression_) {
334 n = LZ4_compressHC(input, output, inputLength);
336 n = LZ4_compress(input, output, inputLength);
341 CHECK_LE(n, out->capacity());
347 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
349 uint64_t uncompressedLength) {
351 if (data->isChained()) {
352 // LZ4 doesn't support streaming, so we have to coalesce
353 clone = data->cloneCoalescedAsValue();
357 folly::io::Cursor cursor(data);
358 uint64_t actualUncompressedLength;
360 actualUncompressedLength = decodeVarintFromCursor(cursor);
361 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
362 uncompressedLength != actualUncompressedLength) {
363 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
366 actualUncompressedLength = uncompressedLength;
367 if (actualUncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH ||
368 actualUncompressedLength > maxUncompressedLength()) {
369 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
373 auto sp = StringPiece{cursor.peekBytes()};
374 auto out = IOBuf::create(actualUncompressedLength);
375 int n = LZ4_decompress_safe(
377 reinterpret_cast<char*>(out->writableTail()),
379 actualUncompressedLength);
381 if (n < 0 || uint64_t(n) != actualUncompressedLength) {
382 throw std::runtime_error(to<std::string>(
383 "LZ4 decompression returned invalid value ", n));
385 out->append(actualUncompressedLength);
389 #if LZ4_VERSION_NUMBER >= 10301
391 class LZ4FrameCodec final : public Codec {
393 static std::unique_ptr<Codec> create(int level, CodecType type);
394 explicit LZ4FrameCodec(int level, CodecType type);
398 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
399 std::unique_ptr<IOBuf> doUncompress(
401 uint64_t uncompressedLength) override;
403 // Reset the dctx_ if it is dirty or null.
407 LZ4F_decompressionContext_t dctx_{nullptr};
411 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
414 return make_unique<LZ4FrameCodec>(level, type);
417 static size_t lz4FrameThrowOnError(size_t code) {
418 if (LZ4F_isError(code)) {
419 throw std::runtime_error(
420 to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
425 void LZ4FrameCodec::resetDCtx() {
426 if (dctx_ && !dirty_) {
430 LZ4F_freeDecompressionContext(dctx_);
432 lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
436 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
437 DCHECK(type == CodecType::LZ4_FRAME);
439 case COMPRESSION_LEVEL_FASTEST:
440 case COMPRESSION_LEVEL_DEFAULT:
443 case COMPRESSION_LEVEL_BEST:
452 LZ4FrameCodec::~LZ4FrameCodec() {
454 LZ4F_freeDecompressionContext(dctx_);
458 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
459 // LZ4 Frame compression doesn't support streaming so we have to coalesce
461 if (data->isChained()) {
462 clone = data->cloneCoalescedAsValue();
466 const auto uncompressedLength = data->length();
467 LZ4F_preferences_t prefs{};
468 prefs.compressionLevel = level_;
469 prefs.frameInfo.contentSize = uncompressedLength;
471 auto buf = IOBuf::create(LZ4F_compressFrameBound(uncompressedLength, &prefs));
472 const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
478 buf->append(written);
482 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
484 uint64_t uncompressedLength) {
485 // Reset the dctx if any errors have occurred
488 ByteRange in = *data->begin();
490 if (data->isChained()) {
491 clone = data->cloneCoalescedAsValue();
492 in = clone.coalesce();
495 // Select decompression options
496 LZ4F_decompressOptions_t options;
497 options.stableDst = 1;
498 // Select blockSize and growthSize for the IOBufQueue
499 IOBufQueue queue(IOBufQueue::cacheChainLength());
500 auto blockSize = uint64_t{64} << 10;
501 auto growthSize = uint64_t{4} << 20;
502 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) {
503 // Allocate uncompressedLength in one chunk (up to 64 MB)
504 const auto allocateSize = std::min(uncompressedLength, uint64_t{64} << 20);
505 queue.preallocate(allocateSize, allocateSize);
506 blockSize = std::min(uncompressedLength, blockSize);
507 growthSize = std::min(uncompressedLength, growthSize);
509 // Reduce growthSize for small data
510 const auto guessUncompressedLen = 4 * std::max(blockSize, in.size());
511 growthSize = std::min(guessUncompressedLen, growthSize);
513 // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
516 // Decompress until the frame is over
519 // Allocate enough space to decompress at least a block
522 std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
524 size_t inSize = in.size();
525 code = lz4FrameThrowOnError(
526 LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
527 if (in.empty() && outSize == 0 && code != 0) {
528 // We passed no input, no output was produced, and the frame isn't over
529 // No more forward progress is possible
530 throw std::runtime_error("LZ4Frame error: Incomplete frame");
532 in.uncheckedAdvance(inSize);
533 queue.postallocate(outSize);
535 // At this point the decompression context can be reused
537 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
538 queue.chainLength() != uncompressedLength) {
539 throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
544 #endif // LZ4_VERSION_NUMBER >= 10301
545 #endif // FOLLY_HAVE_LIBLZ4
547 #if FOLLY_HAVE_LIBSNAPPY
554 * Implementation of snappy::Source that reads from a IOBuf chain.
556 class IOBufSnappySource final : public snappy::Source {
558 explicit IOBufSnappySource(const IOBuf* data);
559 size_t Available() const override;
560 const char* Peek(size_t* len) override;
561 void Skip(size_t n) override;
567 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
568 : available_(data->computeChainDataLength()),
572 size_t IOBufSnappySource::Available() const {
576 const char* IOBufSnappySource::Peek(size_t* len) {
577 auto sp = StringPiece{cursor_.peekBytes()};
582 void IOBufSnappySource::Skip(size_t n) {
583 CHECK_LE(n, available_);
588 class SnappyCodec final : public Codec {
590 static std::unique_ptr<Codec> create(int level, CodecType type);
591 explicit SnappyCodec(int level, CodecType type);
594 uint64_t doMaxUncompressedLength() const override;
595 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
596 std::unique_ptr<IOBuf> doUncompress(
598 uint64_t uncompressedLength) override;
601 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
602 return make_unique<SnappyCodec>(level, type);
605 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
606 DCHECK(type == CodecType::SNAPPY);
608 case COMPRESSION_LEVEL_FASTEST:
609 case COMPRESSION_LEVEL_DEFAULT:
610 case COMPRESSION_LEVEL_BEST:
614 throw std::invalid_argument(to<std::string>(
615 "SnappyCodec: invalid level: ", level));
619 uint64_t SnappyCodec::doMaxUncompressedLength() const {
620 // snappy.h uses uint32_t for lengths, so there's that.
621 return std::numeric_limits<uint32_t>::max();
624 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
625 IOBufSnappySource source(data);
627 IOBuf::create(snappy::MaxCompressedLength(source.Available()));
629 snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
630 out->writableTail()));
632 size_t n = snappy::Compress(&source, &sink);
634 CHECK_LE(n, out->capacity());
639 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
640 uint64_t uncompressedLength) {
641 uint32_t actualUncompressedLength = 0;
644 IOBufSnappySource source(data);
645 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
646 throw std::runtime_error("snappy::GetUncompressedLength failed");
648 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
649 uncompressedLength != actualUncompressedLength) {
650 throw std::runtime_error("snappy: invalid uncompressed length");
654 auto out = IOBuf::create(actualUncompressedLength);
657 IOBufSnappySource source(data);
658 if (!snappy::RawUncompress(&source,
659 reinterpret_cast<char*>(out->writableTail()))) {
660 throw std::runtime_error("snappy::RawUncompress failed");
664 out->append(actualUncompressedLength);
668 #endif // FOLLY_HAVE_LIBSNAPPY
674 class ZlibCodec final : public Codec {
676 static std::unique_ptr<Codec> create(int level, CodecType type);
677 explicit ZlibCodec(int level, CodecType type);
680 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
681 std::unique_ptr<IOBuf> doUncompress(
683 uint64_t uncompressedLength) override;
685 std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
686 bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
691 std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
692 return make_unique<ZlibCodec>(level, type);
695 ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
696 DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP);
698 case COMPRESSION_LEVEL_FASTEST:
701 case COMPRESSION_LEVEL_DEFAULT:
702 level = Z_DEFAULT_COMPRESSION;
704 case COMPRESSION_LEVEL_BEST:
708 if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
709 throw std::invalid_argument(to<std::string>(
710 "ZlibCodec: invalid level: ", level));
715 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
717 CHECK_EQ(stream->avail_out, 0);
719 auto buf = IOBuf::create(length);
722 stream->next_out = buf->writableData();
723 stream->avail_out = buf->length();
728 bool ZlibCodec::doInflate(z_stream* stream,
730 uint32_t bufferLength) {
731 if (stream->avail_out == 0) {
732 head->prependChain(addOutputBuffer(stream, bufferLength));
735 int rc = inflate(stream, Z_NO_FLUSH);
746 throw std::runtime_error(to<std::string>(
747 "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
749 CHECK(false) << rc << ": " << stream->msg;
755 std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
757 stream.zalloc = nullptr;
758 stream.zfree = nullptr;
759 stream.opaque = nullptr;
761 // Using deflateInit2() to support gzip. "The windowBits parameter is the
762 // base two logarithm of the maximum window size (...) The default value is
763 // 15 (...) Add 16 to windowBits to write a simple gzip header and trailer
764 // around the compressed data instead of a zlib wrapper. The gzip header
765 // will have no file name, no extra data, no comment, no modification time
766 // (set to zero), no header crc, and the operating system will be set to 255
768 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
769 // All other parameters (method, memLevel, strategy) get default values from
771 int rc = deflateInit2(&stream,
778 throw std::runtime_error(to<std::string>(
779 "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
782 stream.next_in = stream.next_out = nullptr;
783 stream.avail_in = stream.avail_out = 0;
784 stream.total_in = stream.total_out = 0;
786 bool success = false;
789 rc = deflateEnd(&stream);
790 // If we're here because of an exception, it's okay if some data
792 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
793 << rc << ": " << stream.msg;
796 uint64_t uncompressedLength = data->computeChainDataLength();
797 uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
799 // Max 64MiB in one go
800 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
801 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
803 auto out = addOutputBuffer(
805 (maxCompressedLength <= maxSingleStepLength ?
806 maxCompressedLength :
807 defaultBufferLength));
809 for (auto& range : *data) {
810 uint64_t remaining = range.size();
811 uint64_t written = 0;
813 uint32_t step = (remaining > maxSingleStepLength ?
814 maxSingleStepLength : remaining);
815 stream.next_in = const_cast<uint8_t*>(range.data() + written);
816 stream.avail_in = step;
820 while (stream.avail_in != 0) {
821 if (stream.avail_out == 0) {
822 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
825 rc = deflate(&stream, Z_NO_FLUSH);
827 CHECK_EQ(rc, Z_OK) << stream.msg;
833 if (stream.avail_out == 0) {
834 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
837 rc = deflate(&stream, Z_FINISH);
838 } while (rc == Z_OK);
840 CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
842 out->prev()->trimEnd(stream.avail_out);
844 success = true; // we survived
849 static uint64_t computeBufferLength(uint64_t const compressedLength) {
850 constexpr uint64_t kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
851 constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB
852 const uint64_t goodBufferSize = 4 * std::max(kBlockSize, compressedLength);
853 return std::min(goodBufferSize, kMaxBufferLength);
856 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
857 uint64_t uncompressedLength) {
859 stream.zalloc = nullptr;
860 stream.zfree = nullptr;
861 stream.opaque = nullptr;
863 // "The windowBits parameter is the base two logarithm of the maximum window
864 // size (...) The default value is 15 (...) add 16 to decode only the gzip
865 // format (the zlib format will return a Z_DATA_ERROR)."
866 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
867 int rc = inflateInit2(&stream, windowBits);
869 throw std::runtime_error(to<std::string>(
870 "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
873 stream.next_in = stream.next_out = nullptr;
874 stream.avail_in = stream.avail_out = 0;
875 stream.total_in = stream.total_out = 0;
877 bool success = false;
880 rc = inflateEnd(&stream);
881 // If we're here because of an exception, it's okay if some data
883 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
884 << rc << ": " << stream.msg;
887 // Max 64MiB in one go
888 constexpr uint64_t maxSingleStepLength = uint64_t(64) << 20; // 64MiB
889 const uint64_t defaultBufferLength =
890 computeBufferLength(data->computeChainDataLength());
892 auto out = addOutputBuffer(
894 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
895 uncompressedLength <= maxSingleStepLength) ?
897 defaultBufferLength));
899 bool streamEnd = false;
900 for (auto& range : *data) {
905 stream.next_in = const_cast<uint8_t*>(range.data());
906 stream.avail_in = range.size();
908 while (stream.avail_in != 0) {
910 throw std::runtime_error(to<std::string>(
911 "ZlibCodec: junk after end of data"));
914 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
919 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
922 out->prev()->trimEnd(stream.avail_out);
924 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
925 uncompressedLength != stream.total_out) {
926 throw std::runtime_error(to<std::string>(
927 "ZlibCodec: invalid uncompressed length"));
930 success = true; // we survived
935 #endif // FOLLY_HAVE_LIBZ
937 #if FOLLY_HAVE_LIBLZMA
942 class LZMA2Codec final : public Codec {
944 static std::unique_ptr<Codec> create(int level, CodecType type);
945 explicit LZMA2Codec(int level, CodecType type);
948 bool doNeedsUncompressedLength() const override;
949 uint64_t doMaxUncompressedLength() const override;
951 bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
953 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
954 std::unique_ptr<IOBuf> doUncompress(
956 uint64_t uncompressedLength) override;
958 std::unique_ptr<IOBuf> addOutputBuffer(lzma_stream* stream, size_t length);
959 bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength);
964 std::unique_ptr<Codec> LZMA2Codec::create(int level, CodecType type) {
965 return make_unique<LZMA2Codec>(level, type);
968 LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) {
969 DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
971 case COMPRESSION_LEVEL_FASTEST:
974 case COMPRESSION_LEVEL_DEFAULT:
975 level = LZMA_PRESET_DEFAULT;
977 case COMPRESSION_LEVEL_BEST:
981 if (level < 0 || level > 9) {
982 throw std::invalid_argument(to<std::string>(
983 "LZMA2Codec: invalid level: ", level));
988 bool LZMA2Codec::doNeedsUncompressedLength() const {
992 uint64_t LZMA2Codec::doMaxUncompressedLength() const {
993 // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
994 return uint64_t(1) << 63;
997 std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
1001 CHECK_EQ(stream->avail_out, 0);
1003 auto buf = IOBuf::create(length);
1004 buf->append(length);
1006 stream->next_out = buf->writableData();
1007 stream->avail_out = buf->length();
1012 std::unique_ptr<IOBuf> LZMA2Codec::doCompress(const IOBuf* data) {
1014 lzma_stream stream = LZMA_STREAM_INIT;
1016 rc = lzma_easy_encoder(&stream, level_, LZMA_CHECK_NONE);
1017 if (rc != LZMA_OK) {
1018 throw std::runtime_error(folly::to<std::string>(
1019 "LZMA2Codec: lzma_easy_encoder error: ", rc));
1022 SCOPE_EXIT { lzma_end(&stream); };
1024 uint64_t uncompressedLength = data->computeChainDataLength();
1025 uint64_t maxCompressedLength = lzma_stream_buffer_bound(uncompressedLength);
1027 // Max 64MiB in one go
1028 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
1029 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
1031 auto out = addOutputBuffer(
1033 (maxCompressedLength <= maxSingleStepLength ?
1034 maxCompressedLength :
1035 defaultBufferLength));
1038 auto size = IOBuf::createCombined(kMaxVarintLength64);
1039 encodeVarintToIOBuf(uncompressedLength, size.get());
1040 size->appendChain(std::move(out));
1041 out = std::move(size);
1044 for (auto& range : *data) {
1045 if (range.empty()) {
1049 stream.next_in = const_cast<uint8_t*>(range.data());
1050 stream.avail_in = range.size();
1052 while (stream.avail_in != 0) {
1053 if (stream.avail_out == 0) {
1054 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1057 rc = lzma_code(&stream, LZMA_RUN);
1059 if (rc != LZMA_OK) {
1060 throw std::runtime_error(folly::to<std::string>(
1061 "LZMA2Codec: lzma_code error: ", rc));
1067 if (stream.avail_out == 0) {
1068 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1071 rc = lzma_code(&stream, LZMA_FINISH);
1072 } while (rc == LZMA_OK);
1074 if (rc != LZMA_STREAM_END) {
1075 throw std::runtime_error(folly::to<std::string>(
1076 "LZMA2Codec: lzma_code ended with error: ", rc));
1079 out->prev()->trimEnd(stream.avail_out);
1084 bool LZMA2Codec::doInflate(lzma_stream* stream,
1086 size_t bufferLength) {
1087 if (stream->avail_out == 0) {
1088 head->prependChain(addOutputBuffer(stream, bufferLength));
1091 lzma_ret rc = lzma_code(stream, LZMA_RUN);
1096 case LZMA_STREAM_END:
1099 throw std::runtime_error(to<std::string>(
1100 "LZMA2Codec: lzma_code error: ", rc));
1106 std::unique_ptr<IOBuf> LZMA2Codec::doUncompress(const IOBuf* data,
1107 uint64_t uncompressedLength) {
1109 lzma_stream stream = LZMA_STREAM_INIT;
1111 rc = lzma_auto_decoder(&stream, std::numeric_limits<uint64_t>::max(), 0);
1112 if (rc != LZMA_OK) {
1113 throw std::runtime_error(folly::to<std::string>(
1114 "LZMA2Codec: lzma_auto_decoder error: ", rc));
1117 SCOPE_EXIT { lzma_end(&stream); };
1119 // Max 64MiB in one go
1120 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
1121 constexpr uint32_t defaultBufferLength = uint32_t(256) << 10; // 256 KiB
1123 folly::io::Cursor cursor(data);
1125 const uint64_t actualUncompressedLength = decodeVarintFromCursor(cursor);
1126 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1127 uncompressedLength != actualUncompressedLength) {
1128 throw std::runtime_error("LZMA2Codec: invalid uncompressed length");
1130 uncompressedLength = actualUncompressedLength;
1133 auto out = addOutputBuffer(
1135 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1136 uncompressedLength <= maxSingleStepLength)
1137 ? uncompressedLength
1138 : defaultBufferLength));
1140 bool streamEnd = false;
1141 auto buf = cursor.peekBytes();
1142 while (!buf.empty()) {
1143 stream.next_in = const_cast<uint8_t*>(buf.data());
1144 stream.avail_in = buf.size();
1146 while (stream.avail_in != 0) {
1148 throw std::runtime_error(to<std::string>(
1149 "LZMA2Codec: junk after end of data"));
1152 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1155 cursor.skip(buf.size());
1156 buf = cursor.peekBytes();
1159 while (!streamEnd) {
1160 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1163 out->prev()->trimEnd(stream.avail_out);
1165 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1166 uncompressedLength != stream.total_out) {
1167 throw std::runtime_error(
1168 to<std::string>("LZMA2Codec: invalid uncompressed length"));
1174 #endif // FOLLY_HAVE_LIBLZMA
1176 #ifdef FOLLY_HAVE_LIBZSTD
1181 class ZSTDCodec final : public Codec {
1183 static std::unique_ptr<Codec> create(int level, CodecType);
1184 explicit ZSTDCodec(int level, CodecType type);
1187 bool doNeedsUncompressedLength() const override;
1188 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1189 std::unique_ptr<IOBuf> doUncompress(
1191 uint64_t uncompressedLength) override;
1196 std::unique_ptr<Codec> ZSTDCodec::create(int level, CodecType type) {
1197 return make_unique<ZSTDCodec>(level, type);
1200 ZSTDCodec::ZSTDCodec(int level, CodecType type) : Codec(type) {
1201 DCHECK(type == CodecType::ZSTD);
1203 case COMPRESSION_LEVEL_FASTEST:
1206 case COMPRESSION_LEVEL_DEFAULT:
1209 case COMPRESSION_LEVEL_BEST:
1213 if (level < 1 || level > ZSTD_maxCLevel()) {
1214 throw std::invalid_argument(
1215 to<std::string>("ZSTD: invalid level: ", level));
1220 bool ZSTDCodec::doNeedsUncompressedLength() const {
1224 void zstdThrowIfError(size_t rc) {
1225 if (!ZSTD_isError(rc)) {
1228 throw std::runtime_error(
1229 to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1232 std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
1233 // Support earlier versions of the codec (working with a single IOBuf,
1234 // and using ZSTD_decompress which requires ZSTD frame to contain size,
1235 // which isn't populated by streaming API).
1236 if (!data->isChained()) {
1237 auto out = IOBuf::createCombined(ZSTD_compressBound(data->length()));
1238 const auto rc = ZSTD_compress(
1239 out->writableData(),
1244 zstdThrowIfError(rc);
1249 auto zcs = ZSTD_createCStream();
1251 ZSTD_freeCStream(zcs);
1254 auto rc = ZSTD_initCStream(zcs, level_);
1255 zstdThrowIfError(rc);
1257 Cursor cursor(data);
1258 auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength()));
1261 out.dst = result->writableTail();
1262 out.size = result->capacity();
1265 for (auto buffer = cursor.peekBytes(); !buffer.empty();) {
1267 in.src = buffer.data();
1268 in.size = buffer.size();
1269 for (in.pos = 0; in.pos != in.size;) {
1270 rc = ZSTD_compressStream(zcs, &out, &in);
1271 zstdThrowIfError(rc);
1273 cursor.skip(in.size);
1274 buffer = cursor.peekBytes();
1277 rc = ZSTD_endStream(zcs, &out);
1278 zstdThrowIfError(rc);
1281 result->append(out.pos);
1285 static std::unique_ptr<IOBuf> zstdUncompressBuffer(
1287 uint64_t uncompressedLength) {
1288 // Check preconditions
1289 DCHECK(!data->isChained());
1290 DCHECK(uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH);
1292 auto uncompressed = IOBuf::create(uncompressedLength);
1293 const auto decompressedSize = ZSTD_decompress(
1294 uncompressed->writableTail(),
1295 uncompressed->tailroom(),
1298 zstdThrowIfError(decompressedSize);
1299 if (decompressedSize != uncompressedLength) {
1300 throw std::runtime_error("ZSTD: invalid uncompressed length");
1302 uncompressed->append(decompressedSize);
1303 return uncompressed;
1306 static std::unique_ptr<IOBuf> zstdUncompressStream(
1308 uint64_t uncompressedLength) {
1309 auto zds = ZSTD_createDStream();
1311 ZSTD_freeDStream(zds);
1314 auto rc = ZSTD_initDStream(zds);
1315 zstdThrowIfError(rc);
1317 ZSTD_outBuffer out{};
1320 auto outputSize = ZSTD_DStreamOutSize();
1321 if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH) {
1322 outputSize = uncompressedLength;
1325 IOBufQueue queue(IOBufQueue::cacheChainLength());
1327 Cursor cursor(data);
1329 if (in.pos == in.size) {
1330 auto buffer = cursor.peekBytes();
1331 in.src = buffer.data();
1332 in.size = buffer.size();
1334 cursor.skip(in.size);
1335 if (rc > 1 && in.size == 0) {
1336 throw std::runtime_error(to<std::string>("ZSTD: incomplete input"));
1339 if (out.pos == out.size) {
1341 queue.postallocate(out.pos);
1343 auto buffer = queue.preallocate(outputSize, outputSize);
1344 out.dst = buffer.first;
1345 out.size = buffer.second;
1347 outputSize = ZSTD_DStreamOutSize();
1349 rc = ZSTD_decompressStream(zds, &out, &in);
1350 zstdThrowIfError(rc);
1356 queue.postallocate(out.pos);
1358 if (in.pos != in.size || !cursor.isAtEnd()) {
1359 throw std::runtime_error("ZSTD: junk after end of data");
1361 if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH &&
1362 queue.chainLength() != uncompressedLength) {
1363 throw std::runtime_error("ZSTD: invalid uncompressed length");
1366 return queue.move();
1369 std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(
1371 uint64_t uncompressedLength) {
1373 // Read decompressed size from frame if available in first IOBuf.
1374 const auto decompressedSize =
1375 ZSTD_getDecompressedSize(data->data(), data->length());
1376 if (decompressedSize != 0) {
1377 if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH &&
1378 uncompressedLength != decompressedSize) {
1379 throw std::runtime_error("ZSTD: invalid uncompressed length");
1381 uncompressedLength = decompressedSize;
1384 // Faster to decompress using ZSTD_decompress() if we can.
1385 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && !data->isChained()) {
1386 return zstdUncompressBuffer(data, uncompressedLength);
1388 // Fall back to slower streaming decompression.
1389 return zstdUncompressStream(data, uncompressedLength);
1392 #endif // FOLLY_HAVE_LIBZSTD
1396 typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
1397 static constexpr CodecFactory
1398 codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
1399 nullptr, // USER_DEFINED
1400 NoCompressionCodec::create,
1402 #if FOLLY_HAVE_LIBLZ4
1408 #if FOLLY_HAVE_LIBSNAPPY
1409 SnappyCodec::create,
1420 #if FOLLY_HAVE_LIBLZ4
1426 #if FOLLY_HAVE_LIBLZMA
1434 #if FOLLY_HAVE_LIBZSTD
1446 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
1447 LZ4FrameCodec::create,
1453 bool hasCodec(CodecType type) {
1454 size_t idx = static_cast<size_t>(type);
1455 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
1456 throw std::invalid_argument(
1457 to<std::string>("Compression type ", idx, " invalid"));
1459 return codecFactories[idx] != nullptr;
1462 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
1463 size_t idx = static_cast<size_t>(type);
1464 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
1465 throw std::invalid_argument(
1466 to<std::string>("Compression type ", idx, " invalid"));
1468 auto factory = codecFactories[idx];
1470 throw std::invalid_argument(to<std::string>(
1471 "Compression type ", idx, " not supported"));
1473 auto codec = (*factory)(level, type);
1474 DCHECK_EQ(static_cast<size_t>(codec->type()), idx);