2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
22 #if LZ4_VERSION_NUMBER >= 10301
27 #include <glog/logging.h>
29 #if FOLLY_HAVE_LIBSNAPPY
31 #include <snappy-sinksource.h>
38 #if FOLLY_HAVE_LIBLZMA
42 #if FOLLY_HAVE_LIBZSTD
50 #include <folly/Bits.h>
51 #include <folly/Conv.h>
52 #include <folly/Memory.h>
53 #include <folly/Portability.h>
54 #include <folly/ScopeGuard.h>
55 #include <folly/Varint.h>
56 #include <folly/io/Cursor.h>
58 #include <unordered_set>
60 namespace folly { namespace io {
62 Codec::Codec(CodecType type) : type_(type) { }
64 // Ensure consistent behavior in the nullptr case
65 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
66 uint64_t len = data->computeChainDataLength();
68 return IOBuf::create(0);
70 if (len > maxUncompressedLength()) {
71 throw std::runtime_error("Codec: uncompressed length too large");
74 return doCompress(data);
77 std::string Codec::compress(const StringPiece data) {
78 const uint64_t len = data.size();
82 if (len > maxUncompressedLength()) {
83 throw std::runtime_error("Codec: uncompressed length too large");
86 return doCompressString(data);
89 std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
90 uint64_t uncompressedLength) {
91 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
92 if (needsUncompressedLength()) {
93 throw std::invalid_argument("Codec: uncompressed length required");
95 } else if (uncompressedLength > maxUncompressedLength()) {
96 throw std::runtime_error("Codec: uncompressed length too large");
100 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
101 uncompressedLength != 0) {
102 throw std::runtime_error("Codec: invalid uncompressed length");
104 return IOBuf::create(0);
107 return doUncompress(data, uncompressedLength);
110 std::string Codec::uncompress(
111 const StringPiece data,
112 uint64_t uncompressedLength) {
113 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
114 if (needsUncompressedLength()) {
115 throw std::invalid_argument("Codec: uncompressed length required");
117 } else if (uncompressedLength > maxUncompressedLength()) {
118 throw std::runtime_error("Codec: uncompressed length too large");
122 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
123 uncompressedLength != 0) {
124 throw std::runtime_error("Codec: invalid uncompressed length");
129 return doUncompressString(data, uncompressedLength);
132 bool Codec::needsUncompressedLength() const {
133 return doNeedsUncompressedLength();
136 uint64_t Codec::maxUncompressedLength() const {
137 return doMaxUncompressedLength();
140 bool Codec::doNeedsUncompressedLength() const {
144 uint64_t Codec::doMaxUncompressedLength() const {
145 return UNLIMITED_UNCOMPRESSED_LENGTH;
148 std::vector<std::string> Codec::validPrefixes() const {
152 bool Codec::canUncompress(const IOBuf*, uint64_t) const {
156 std::string Codec::doCompressString(const StringPiece data) {
157 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
158 auto outputBuffer = doCompress(&inputBuffer);
160 output.reserve(outputBuffer->computeChainDataLength());
161 for (auto range : *outputBuffer) {
162 output.append(reinterpret_cast<const char*>(range.data()), range.size());
167 std::string Codec::doUncompressString(
168 const StringPiece data,
169 uint64_t uncompressedLength) {
170 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
171 auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
173 output.reserve(outputBuffer->computeChainDataLength());
174 for (auto range : *outputBuffer) {
175 output.append(reinterpret_cast<const char*>(range.data()), range.size());
185 class NoCompressionCodec final : public Codec {
187 static std::unique_ptr<Codec> create(int level, CodecType type);
188 explicit NoCompressionCodec(int level, CodecType type);
191 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
192 std::unique_ptr<IOBuf> doUncompress(
194 uint64_t uncompressedLength) override;
197 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
198 return make_unique<NoCompressionCodec>(level, type);
201 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
203 DCHECK(type == CodecType::NO_COMPRESSION);
205 case COMPRESSION_LEVEL_DEFAULT:
206 case COMPRESSION_LEVEL_FASTEST:
207 case COMPRESSION_LEVEL_BEST:
211 throw std::invalid_argument(to<std::string>(
212 "NoCompressionCodec: invalid level ", level));
216 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
218 return data->clone();
221 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
223 uint64_t uncompressedLength) {
224 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
225 data->computeChainDataLength() != uncompressedLength) {
226 throw std::runtime_error(to<std::string>(
227 "NoCompressionCodec: invalid uncompressed length"));
229 return data->clone();
232 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
236 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
237 DCHECK_GE(out->tailroom(), kMaxVarintLength64);
238 out->append(encodeVarint(val, out->writableTail()));
241 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
244 for (int shift = 0; shift <= 63; shift += 7) {
245 b = cursor.read<int8_t>();
246 val |= static_cast<uint64_t>(b & 0x7f) << shift;
252 throw std::invalid_argument("Invalid varint value. Too big.");
259 #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
263 * Reads sizeof(T) bytes, and returns false if not enough bytes are available.
264 * Returns true if the first n bytes are equal to prefix when interpreted as
267 template <typename T>
268 typename std::enable_if<std::is_unsigned<T>::value, bool>::type
269 dataStartsWithLE(const IOBuf* data, T prefix, uint64_t n = sizeof(T)) {
271 DCHECK_LE(n, sizeof(T));
274 if (!cursor.tryReadLE(value)) {
277 const T mask = n == sizeof(T) ? T(-1) : (T(1) << (8 * n)) - 1;
278 return prefix == (value & mask);
281 template <typename T>
282 typename std::enable_if<std::is_arithmetic<T>::value, std::string>::type
283 prefixToStringLE(T prefix, uint64_t n = sizeof(T)) {
285 DCHECK_LE(n, sizeof(T));
286 prefix = Endian::little(prefix);
289 memcpy(&result[0], &prefix, n);
293 static uint64_t computeBufferLength(
294 uint64_t const compressedLength,
295 uint64_t const blockSize) {
296 uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
297 uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
298 return std::min(goodBufferSize, kMaxBufferLength);
302 #if FOLLY_HAVE_LIBLZ4
307 class LZ4Codec final : public Codec {
309 static std::unique_ptr<Codec> create(int level, CodecType type);
310 explicit LZ4Codec(int level, CodecType type);
313 bool doNeedsUncompressedLength() const override;
314 uint64_t doMaxUncompressedLength() const override;
316 bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
318 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
319 std::unique_ptr<IOBuf> doUncompress(
321 uint64_t uncompressedLength) override;
323 bool highCompression_;
326 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
327 return make_unique<LZ4Codec>(level, type);
330 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
331 DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
334 case COMPRESSION_LEVEL_FASTEST:
335 case COMPRESSION_LEVEL_DEFAULT:
338 case COMPRESSION_LEVEL_BEST:
342 if (level < 1 || level > 2) {
343 throw std::invalid_argument(to<std::string>(
344 "LZ4Codec: invalid level: ", level));
346 highCompression_ = (level > 1);
349 bool LZ4Codec::doNeedsUncompressedLength() const {
350 return !encodeSize();
353 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
354 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
356 #ifndef LZ4_MAX_INPUT_SIZE
357 # define LZ4_MAX_INPUT_SIZE 0x7E000000
360 uint64_t LZ4Codec::doMaxUncompressedLength() const {
361 return LZ4_MAX_INPUT_SIZE;
364 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
366 if (data->isChained()) {
367 // LZ4 doesn't support streaming, so we have to coalesce
368 clone = data->cloneCoalescedAsValue();
372 uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
373 auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
375 encodeVarintToIOBuf(data->length(), out.get());
379 auto input = reinterpret_cast<const char*>(data->data());
380 auto output = reinterpret_cast<char*>(out->writableTail());
381 const auto inputLength = data->length();
382 #if LZ4_VERSION_NUMBER >= 10700
383 if (highCompression_) {
384 n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
386 n = LZ4_compress_default(input, output, inputLength, out->tailroom());
389 if (highCompression_) {
390 n = LZ4_compressHC(input, output, inputLength);
392 n = LZ4_compress(input, output, inputLength);
397 CHECK_LE(n, out->capacity());
403 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
405 uint64_t uncompressedLength) {
407 if (data->isChained()) {
408 // LZ4 doesn't support streaming, so we have to coalesce
409 clone = data->cloneCoalescedAsValue();
413 folly::io::Cursor cursor(data);
414 uint64_t actualUncompressedLength;
416 actualUncompressedLength = decodeVarintFromCursor(cursor);
417 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
418 uncompressedLength != actualUncompressedLength) {
419 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
422 actualUncompressedLength = uncompressedLength;
423 if (actualUncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH ||
424 actualUncompressedLength > maxUncompressedLength()) {
425 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
429 auto sp = StringPiece{cursor.peekBytes()};
430 auto out = IOBuf::create(actualUncompressedLength);
431 int n = LZ4_decompress_safe(
433 reinterpret_cast<char*>(out->writableTail()),
435 actualUncompressedLength);
437 if (n < 0 || uint64_t(n) != actualUncompressedLength) {
438 throw std::runtime_error(to<std::string>(
439 "LZ4 decompression returned invalid value ", n));
441 out->append(actualUncompressedLength);
445 #if LZ4_VERSION_NUMBER >= 10301
447 class LZ4FrameCodec final : public Codec {
449 static std::unique_ptr<Codec> create(int level, CodecType type);
450 explicit LZ4FrameCodec(int level, CodecType type);
453 std::vector<std::string> validPrefixes() const override;
454 bool canUncompress(const IOBuf* data, uint64_t uncompressedLength)
458 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
459 std::unique_ptr<IOBuf> doUncompress(
461 uint64_t uncompressedLength) override;
463 // Reset the dctx_ if it is dirty or null.
467 LZ4F_decompressionContext_t dctx_{nullptr};
471 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
474 return make_unique<LZ4FrameCodec>(level, type);
477 static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204;
479 std::vector<std::string> LZ4FrameCodec::validPrefixes() const {
480 return {prefixToStringLE(kLZ4FrameMagicLE)};
483 bool LZ4FrameCodec::canUncompress(const IOBuf* data, uint64_t) const {
484 return dataStartsWithLE(data, kLZ4FrameMagicLE);
487 static size_t lz4FrameThrowOnError(size_t code) {
488 if (LZ4F_isError(code)) {
489 throw std::runtime_error(
490 to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
495 void LZ4FrameCodec::resetDCtx() {
496 if (dctx_ && !dirty_) {
500 LZ4F_freeDecompressionContext(dctx_);
502 lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
506 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
507 DCHECK(type == CodecType::LZ4_FRAME);
509 case COMPRESSION_LEVEL_FASTEST:
510 case COMPRESSION_LEVEL_DEFAULT:
513 case COMPRESSION_LEVEL_BEST:
522 LZ4FrameCodec::~LZ4FrameCodec() {
524 LZ4F_freeDecompressionContext(dctx_);
528 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
529 // LZ4 Frame compression doesn't support streaming so we have to coalesce
531 if (data->isChained()) {
532 clone = data->cloneCoalescedAsValue();
536 const auto uncompressedLength = data->length();
537 LZ4F_preferences_t prefs{};
538 prefs.compressionLevel = level_;
539 prefs.frameInfo.contentSize = uncompressedLength;
541 auto buf = IOBuf::create(LZ4F_compressFrameBound(uncompressedLength, &prefs));
542 const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
548 buf->append(written);
552 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
554 uint64_t uncompressedLength) {
555 // Reset the dctx if any errors have occurred
558 ByteRange in = *data->begin();
560 if (data->isChained()) {
561 clone = data->cloneCoalescedAsValue();
562 in = clone.coalesce();
565 // Select decompression options
566 LZ4F_decompressOptions_t options;
567 options.stableDst = 1;
568 // Select blockSize and growthSize for the IOBufQueue
569 IOBufQueue queue(IOBufQueue::cacheChainLength());
570 auto blockSize = uint64_t{64} << 10;
571 auto growthSize = uint64_t{4} << 20;
572 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) {
573 // Allocate uncompressedLength in one chunk (up to 64 MB)
574 const auto allocateSize = std::min(uncompressedLength, uint64_t{64} << 20);
575 queue.preallocate(allocateSize, allocateSize);
576 blockSize = std::min(uncompressedLength, blockSize);
577 growthSize = std::min(uncompressedLength, growthSize);
579 // Reduce growthSize for small data
580 const auto guessUncompressedLen =
581 4 * std::max<uint64_t>(blockSize, in.size());
582 growthSize = std::min(guessUncompressedLen, growthSize);
584 // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
587 // Decompress until the frame is over
590 // Allocate enough space to decompress at least a block
593 std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
595 size_t inSize = in.size();
596 code = lz4FrameThrowOnError(
597 LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
598 if (in.empty() && outSize == 0 && code != 0) {
599 // We passed no input, no output was produced, and the frame isn't over
600 // No more forward progress is possible
601 throw std::runtime_error("LZ4Frame error: Incomplete frame");
603 in.uncheckedAdvance(inSize);
604 queue.postallocate(outSize);
606 // At this point the decompression context can be reused
608 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
609 queue.chainLength() != uncompressedLength) {
610 throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
615 #endif // LZ4_VERSION_NUMBER >= 10301
616 #endif // FOLLY_HAVE_LIBLZ4
618 #if FOLLY_HAVE_LIBSNAPPY
625 * Implementation of snappy::Source that reads from a IOBuf chain.
627 class IOBufSnappySource final : public snappy::Source {
629 explicit IOBufSnappySource(const IOBuf* data);
630 size_t Available() const override;
631 const char* Peek(size_t* len) override;
632 void Skip(size_t n) override;
638 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
639 : available_(data->computeChainDataLength()),
643 size_t IOBufSnappySource::Available() const {
647 const char* IOBufSnappySource::Peek(size_t* len) {
648 auto sp = StringPiece{cursor_.peekBytes()};
653 void IOBufSnappySource::Skip(size_t n) {
654 CHECK_LE(n, available_);
659 class SnappyCodec final : public Codec {
661 static std::unique_ptr<Codec> create(int level, CodecType type);
662 explicit SnappyCodec(int level, CodecType type);
665 uint64_t doMaxUncompressedLength() const override;
666 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
667 std::unique_ptr<IOBuf> doUncompress(
669 uint64_t uncompressedLength) override;
672 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
673 return make_unique<SnappyCodec>(level, type);
676 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
677 DCHECK(type == CodecType::SNAPPY);
679 case COMPRESSION_LEVEL_FASTEST:
680 case COMPRESSION_LEVEL_DEFAULT:
681 case COMPRESSION_LEVEL_BEST:
685 throw std::invalid_argument(to<std::string>(
686 "SnappyCodec: invalid level: ", level));
690 uint64_t SnappyCodec::doMaxUncompressedLength() const {
691 // snappy.h uses uint32_t for lengths, so there's that.
692 return std::numeric_limits<uint32_t>::max();
695 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
696 IOBufSnappySource source(data);
698 IOBuf::create(snappy::MaxCompressedLength(source.Available()));
700 snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
701 out->writableTail()));
703 size_t n = snappy::Compress(&source, &sink);
705 CHECK_LE(n, out->capacity());
710 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
711 uint64_t uncompressedLength) {
712 uint32_t actualUncompressedLength = 0;
715 IOBufSnappySource source(data);
716 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
717 throw std::runtime_error("snappy::GetUncompressedLength failed");
719 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
720 uncompressedLength != actualUncompressedLength) {
721 throw std::runtime_error("snappy: invalid uncompressed length");
725 auto out = IOBuf::create(actualUncompressedLength);
728 IOBufSnappySource source(data);
729 if (!snappy::RawUncompress(&source,
730 reinterpret_cast<char*>(out->writableTail()))) {
731 throw std::runtime_error("snappy::RawUncompress failed");
735 out->append(actualUncompressedLength);
739 #endif // FOLLY_HAVE_LIBSNAPPY
745 class ZlibCodec final : public Codec {
747 static std::unique_ptr<Codec> create(int level, CodecType type);
748 explicit ZlibCodec(int level, CodecType type);
750 std::vector<std::string> validPrefixes() const override;
751 bool canUncompress(const IOBuf* data, uint64_t uncompressedLength)
755 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
756 std::unique_ptr<IOBuf> doUncompress(
758 uint64_t uncompressedLength) override;
760 std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
761 bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
766 static constexpr uint16_t kGZIPMagicLE = 0x8B1F;
768 std::vector<std::string> ZlibCodec::validPrefixes() const {
769 if (type() == CodecType::ZLIB) {
770 // Zlib streams start with a 2 byte header.
777 // We won't restrict the values of any sub-fields except as described below.
779 // The lowest 4 bits of CMF is the compression method (CM).
780 // CM == 0x8 is the deflate compression method, which is currently the only
781 // supported compression method, so any valid prefix must have CM == 0x8.
783 // The lowest 5 bits of FLG is FCHECK.
784 // FCHECK must be such that the two header bytes are a multiple of 31 when
785 // interpreted as a big endian 16-bit number.
786 std::vector<std::string> result;
787 // 16 values for the first byte, 8 values for the second byte.
788 // There are also 4 combinations where both 0x00 and 0x1F work as FCHECK.
790 // Select all values for the CMF byte that use the deflate algorithm 0x8.
791 for (uint32_t first = 0x0800; first <= 0xF800; first += 0x1000) {
792 // Select all values for the FLG, but leave FCHECK as 0 since it's fixed.
793 for (uint32_t second = 0x00; second <= 0xE0; second += 0x20) {
794 uint16_t prefix = first | second;
796 prefix += 31 - (prefix % 31);
797 result.push_back(prefixToStringLE(Endian::big(prefix)));
798 // zlib won't produce this, but it is a valid prefix.
799 if ((prefix & 0x1F) == 31) {
801 result.push_back(prefixToStringLE(Endian::big(prefix)));
807 // The gzip frame starts with 2 magic bytes.
808 return {prefixToStringLE(kGZIPMagicLE)};
812 bool ZlibCodec::canUncompress(const IOBuf* data, uint64_t) const {
813 if (type() == CodecType::ZLIB) {
816 if (!cursor.tryReadBE(value)) {
819 // zlib compressed if using deflate and is a multiple of 31.
820 return (value & 0x0F00) == 0x0800 && value % 31 == 0;
822 return dataStartsWithLE(data, kGZIPMagicLE);
826 std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
827 return make_unique<ZlibCodec>(level, type);
830 ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
831 DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP);
833 case COMPRESSION_LEVEL_FASTEST:
836 case COMPRESSION_LEVEL_DEFAULT:
837 level = Z_DEFAULT_COMPRESSION;
839 case COMPRESSION_LEVEL_BEST:
843 if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
844 throw std::invalid_argument(to<std::string>(
845 "ZlibCodec: invalid level: ", level));
850 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
852 CHECK_EQ(stream->avail_out, 0);
854 auto buf = IOBuf::create(length);
855 buf->append(buf->capacity());
857 stream->next_out = buf->writableData();
858 stream->avail_out = buf->length();
863 bool ZlibCodec::doInflate(z_stream* stream,
865 uint32_t bufferLength) {
866 if (stream->avail_out == 0) {
867 head->prependChain(addOutputBuffer(stream, bufferLength));
870 int rc = inflate(stream, Z_NO_FLUSH);
881 throw std::runtime_error(to<std::string>(
882 "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
884 CHECK(false) << rc << ": " << stream->msg;
890 std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
892 stream.zalloc = nullptr;
893 stream.zfree = nullptr;
894 stream.opaque = nullptr;
896 // Using deflateInit2() to support gzip. "The windowBits parameter is the
897 // base two logarithm of the maximum window size (...) The default value is
898 // 15 (...) Add 16 to windowBits to write a simple gzip header and trailer
899 // around the compressed data instead of a zlib wrapper. The gzip header
900 // will have no file name, no extra data, no comment, no modification time
901 // (set to zero), no header crc, and the operating system will be set to 255
903 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
904 // All other parameters (method, memLevel, strategy) get default values from
906 int rc = deflateInit2(&stream,
913 throw std::runtime_error(to<std::string>(
914 "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
917 stream.next_in = stream.next_out = nullptr;
918 stream.avail_in = stream.avail_out = 0;
919 stream.total_in = stream.total_out = 0;
921 bool success = false;
924 rc = deflateEnd(&stream);
925 // If we're here because of an exception, it's okay if some data
927 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
928 << rc << ": " << stream.msg;
931 uint64_t uncompressedLength = data->computeChainDataLength();
932 uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
934 // Max 64MiB in one go
935 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
936 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
938 auto out = addOutputBuffer(
940 (maxCompressedLength <= maxSingleStepLength ?
941 maxCompressedLength :
942 defaultBufferLength));
944 for (auto& range : *data) {
945 uint64_t remaining = range.size();
946 uint64_t written = 0;
948 uint32_t step = (remaining > maxSingleStepLength ?
949 maxSingleStepLength : remaining);
950 stream.next_in = const_cast<uint8_t*>(range.data() + written);
951 stream.avail_in = step;
955 while (stream.avail_in != 0) {
956 if (stream.avail_out == 0) {
957 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
960 rc = deflate(&stream, Z_NO_FLUSH);
962 CHECK_EQ(rc, Z_OK) << stream.msg;
968 if (stream.avail_out == 0) {
969 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
972 rc = deflate(&stream, Z_FINISH);
973 } while (rc == Z_OK);
975 CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
977 out->prev()->trimEnd(stream.avail_out);
979 success = true; // we survived
984 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
985 uint64_t uncompressedLength) {
987 stream.zalloc = nullptr;
988 stream.zfree = nullptr;
989 stream.opaque = nullptr;
991 // "The windowBits parameter is the base two logarithm of the maximum window
992 // size (...) The default value is 15 (...) add 16 to decode only the gzip
993 // format (the zlib format will return a Z_DATA_ERROR)."
994 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
995 int rc = inflateInit2(&stream, windowBits);
997 throw std::runtime_error(to<std::string>(
998 "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
1001 stream.next_in = stream.next_out = nullptr;
1002 stream.avail_in = stream.avail_out = 0;
1003 stream.total_in = stream.total_out = 0;
1005 bool success = false;
1008 rc = inflateEnd(&stream);
1009 // If we're here because of an exception, it's okay if some data
1011 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
1012 << rc << ": " << stream.msg;
1015 // Max 64MiB in one go
1016 constexpr uint64_t maxSingleStepLength = uint64_t(64) << 20; // 64MiB
1017 constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB
1018 const uint64_t defaultBufferLength =
1019 computeBufferLength(data->computeChainDataLength(), kBlockSize);
1021 auto out = addOutputBuffer(
1023 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1024 uncompressedLength <= maxSingleStepLength) ?
1025 uncompressedLength :
1026 defaultBufferLength));
1028 bool streamEnd = false;
1029 for (auto& range : *data) {
1030 if (range.empty()) {
1034 stream.next_in = const_cast<uint8_t*>(range.data());
1035 stream.avail_in = range.size();
1037 while (stream.avail_in != 0) {
1039 throw std::runtime_error(to<std::string>(
1040 "ZlibCodec: junk after end of data"));
1043 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1047 while (!streamEnd) {
1048 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1051 out->prev()->trimEnd(stream.avail_out);
1053 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1054 uncompressedLength != stream.total_out) {
1055 throw std::runtime_error(to<std::string>(
1056 "ZlibCodec: invalid uncompressed length"));
1059 success = true; // we survived
1064 #endif // FOLLY_HAVE_LIBZ
1066 #if FOLLY_HAVE_LIBLZMA
1071 class LZMA2Codec final : public Codec {
1073 static std::unique_ptr<Codec> create(int level, CodecType type);
1074 explicit LZMA2Codec(int level, CodecType type);
1076 std::vector<std::string> validPrefixes() const override;
1077 bool canUncompress(const IOBuf* data, uint64_t uncompressedLength)
1081 bool doNeedsUncompressedLength() const override;
1082 uint64_t doMaxUncompressedLength() const override;
1084 bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
1086 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1087 std::unique_ptr<IOBuf> doUncompress(
1089 uint64_t uncompressedLength) override;
1091 std::unique_ptr<IOBuf> addOutputBuffer(lzma_stream* stream, size_t length);
1092 bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength);
1097 static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD;
1098 static constexpr unsigned kLZMA2MagicBytes = 6;
1100 std::vector<std::string> LZMA2Codec::validPrefixes() const {
1101 if (type() == CodecType::LZMA2_VARINT_SIZE) {
1104 return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)};
1107 bool LZMA2Codec::canUncompress(const IOBuf* data, uint64_t) const {
1108 if (type() == CodecType::LZMA2_VARINT_SIZE) {
1111 // Returns false for all inputs less than 8 bytes.
1112 // This is okay, because no valid LZMA2 streams are less than 8 bytes.
1113 return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes);
1116 std::unique_ptr<Codec> LZMA2Codec::create(int level, CodecType type) {
1117 return make_unique<LZMA2Codec>(level, type);
1120 LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) {
1121 DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
1123 case COMPRESSION_LEVEL_FASTEST:
1126 case COMPRESSION_LEVEL_DEFAULT:
1127 level = LZMA_PRESET_DEFAULT;
1129 case COMPRESSION_LEVEL_BEST:
1133 if (level < 0 || level > 9) {
1134 throw std::invalid_argument(to<std::string>(
1135 "LZMA2Codec: invalid level: ", level));
1140 bool LZMA2Codec::doNeedsUncompressedLength() const {
1144 uint64_t LZMA2Codec::doMaxUncompressedLength() const {
1145 // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
1146 return uint64_t(1) << 63;
1149 std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
1150 lzma_stream* stream,
1153 CHECK_EQ(stream->avail_out, 0);
1155 auto buf = IOBuf::create(length);
1156 buf->append(buf->capacity());
1158 stream->next_out = buf->writableData();
1159 stream->avail_out = buf->length();
1164 std::unique_ptr<IOBuf> LZMA2Codec::doCompress(const IOBuf* data) {
1166 lzma_stream stream = LZMA_STREAM_INIT;
1168 rc = lzma_easy_encoder(&stream, level_, LZMA_CHECK_NONE);
1169 if (rc != LZMA_OK) {
1170 throw std::runtime_error(folly::to<std::string>(
1171 "LZMA2Codec: lzma_easy_encoder error: ", rc));
1174 SCOPE_EXIT { lzma_end(&stream); };
1176 uint64_t uncompressedLength = data->computeChainDataLength();
1177 uint64_t maxCompressedLength = lzma_stream_buffer_bound(uncompressedLength);
1179 // Max 64MiB in one go
1180 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
1181 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
1183 auto out = addOutputBuffer(
1185 (maxCompressedLength <= maxSingleStepLength ?
1186 maxCompressedLength :
1187 defaultBufferLength));
1190 auto size = IOBuf::createCombined(kMaxVarintLength64);
1191 encodeVarintToIOBuf(uncompressedLength, size.get());
1192 size->appendChain(std::move(out));
1193 out = std::move(size);
1196 for (auto& range : *data) {
1197 if (range.empty()) {
1201 stream.next_in = const_cast<uint8_t*>(range.data());
1202 stream.avail_in = range.size();
1204 while (stream.avail_in != 0) {
1205 if (stream.avail_out == 0) {
1206 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1209 rc = lzma_code(&stream, LZMA_RUN);
1211 if (rc != LZMA_OK) {
1212 throw std::runtime_error(folly::to<std::string>(
1213 "LZMA2Codec: lzma_code error: ", rc));
1219 if (stream.avail_out == 0) {
1220 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1223 rc = lzma_code(&stream, LZMA_FINISH);
1224 } while (rc == LZMA_OK);
1226 if (rc != LZMA_STREAM_END) {
1227 throw std::runtime_error(folly::to<std::string>(
1228 "LZMA2Codec: lzma_code ended with error: ", rc));
1231 out->prev()->trimEnd(stream.avail_out);
1236 bool LZMA2Codec::doInflate(lzma_stream* stream,
1238 size_t bufferLength) {
1239 if (stream->avail_out == 0) {
1240 head->prependChain(addOutputBuffer(stream, bufferLength));
1243 lzma_ret rc = lzma_code(stream, LZMA_RUN);
1248 case LZMA_STREAM_END:
1251 throw std::runtime_error(to<std::string>(
1252 "LZMA2Codec: lzma_code error: ", rc));
1258 std::unique_ptr<IOBuf> LZMA2Codec::doUncompress(const IOBuf* data,
1259 uint64_t uncompressedLength) {
1261 lzma_stream stream = LZMA_STREAM_INIT;
1263 rc = lzma_auto_decoder(&stream, std::numeric_limits<uint64_t>::max(), 0);
1264 if (rc != LZMA_OK) {
1265 throw std::runtime_error(folly::to<std::string>(
1266 "LZMA2Codec: lzma_auto_decoder error: ", rc));
1269 SCOPE_EXIT { lzma_end(&stream); };
1271 // Max 64MiB in one go
1272 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
1273 constexpr uint32_t defaultBufferLength = uint32_t(256) << 10; // 256 KiB
1275 folly::io::Cursor cursor(data);
1277 const uint64_t actualUncompressedLength = decodeVarintFromCursor(cursor);
1278 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1279 uncompressedLength != actualUncompressedLength) {
1280 throw std::runtime_error("LZMA2Codec: invalid uncompressed length");
1282 uncompressedLength = actualUncompressedLength;
1285 auto out = addOutputBuffer(
1287 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1288 uncompressedLength <= maxSingleStepLength)
1289 ? uncompressedLength
1290 : defaultBufferLength));
1292 bool streamEnd = false;
1293 auto buf = cursor.peekBytes();
1294 while (!buf.empty()) {
1295 stream.next_in = const_cast<uint8_t*>(buf.data());
1296 stream.avail_in = buf.size();
1298 while (stream.avail_in != 0) {
1300 throw std::runtime_error(to<std::string>(
1301 "LZMA2Codec: junk after end of data"));
1304 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1307 cursor.skip(buf.size());
1308 buf = cursor.peekBytes();
1311 while (!streamEnd) {
1312 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1315 out->prev()->trimEnd(stream.avail_out);
1317 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1318 uncompressedLength != stream.total_out) {
1319 throw std::runtime_error(
1320 to<std::string>("LZMA2Codec: invalid uncompressed length"));
1326 #endif // FOLLY_HAVE_LIBLZMA
1328 #ifdef FOLLY_HAVE_LIBZSTD
1333 class ZSTDCodec final : public Codec {
1335 static std::unique_ptr<Codec> create(int level, CodecType);
1336 explicit ZSTDCodec(int level, CodecType type);
1338 std::vector<std::string> validPrefixes() const override;
1339 bool canUncompress(const IOBuf* data, uint64_t uncompressedLength)
1343 bool doNeedsUncompressedLength() const override;
1344 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1345 std::unique_ptr<IOBuf> doUncompress(
1347 uint64_t uncompressedLength) override;
1352 static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528;
1354 std::vector<std::string> ZSTDCodec::validPrefixes() const {
1355 return {prefixToStringLE(kZSTDMagicLE)};
1358 bool ZSTDCodec::canUncompress(const IOBuf* data, uint64_t) const {
1359 return dataStartsWithLE(data, kZSTDMagicLE);
1362 std::unique_ptr<Codec> ZSTDCodec::create(int level, CodecType type) {
1363 return make_unique<ZSTDCodec>(level, type);
1366 ZSTDCodec::ZSTDCodec(int level, CodecType type) : Codec(type) {
1367 DCHECK(type == CodecType::ZSTD);
1369 case COMPRESSION_LEVEL_FASTEST:
1372 case COMPRESSION_LEVEL_DEFAULT:
1375 case COMPRESSION_LEVEL_BEST:
1379 if (level < 1 || level > ZSTD_maxCLevel()) {
1380 throw std::invalid_argument(
1381 to<std::string>("ZSTD: invalid level: ", level));
1386 bool ZSTDCodec::doNeedsUncompressedLength() const {
1390 void zstdThrowIfError(size_t rc) {
1391 if (!ZSTD_isError(rc)) {
1394 throw std::runtime_error(
1395 to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1398 std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
1399 // Support earlier versions of the codec (working with a single IOBuf,
1400 // and using ZSTD_decompress which requires ZSTD frame to contain size,
1401 // which isn't populated by streaming API).
1402 if (!data->isChained()) {
1403 auto out = IOBuf::createCombined(ZSTD_compressBound(data->length()));
1404 const auto rc = ZSTD_compress(
1405 out->writableData(),
1410 zstdThrowIfError(rc);
1415 auto zcs = ZSTD_createCStream();
1417 ZSTD_freeCStream(zcs);
1420 auto rc = ZSTD_initCStream(zcs, level_);
1421 zstdThrowIfError(rc);
1423 Cursor cursor(data);
1424 auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength()));
1427 out.dst = result->writableTail();
1428 out.size = result->capacity();
1431 for (auto buffer = cursor.peekBytes(); !buffer.empty();) {
1433 in.src = buffer.data();
1434 in.size = buffer.size();
1435 for (in.pos = 0; in.pos != in.size;) {
1436 rc = ZSTD_compressStream(zcs, &out, &in);
1437 zstdThrowIfError(rc);
1439 cursor.skip(in.size);
1440 buffer = cursor.peekBytes();
1443 rc = ZSTD_endStream(zcs, &out);
1444 zstdThrowIfError(rc);
1447 result->append(out.pos);
1451 static std::unique_ptr<IOBuf> zstdUncompressBuffer(
1453 uint64_t uncompressedLength) {
1454 // Check preconditions
1455 DCHECK(!data->isChained());
1456 DCHECK(uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH);
1458 auto uncompressed = IOBuf::create(uncompressedLength);
1459 const auto decompressedSize = ZSTD_decompress(
1460 uncompressed->writableTail(),
1461 uncompressed->tailroom(),
1464 zstdThrowIfError(decompressedSize);
1465 if (decompressedSize != uncompressedLength) {
1466 throw std::runtime_error("ZSTD: invalid uncompressed length");
1468 uncompressed->append(decompressedSize);
1469 return uncompressed;
1472 static std::unique_ptr<IOBuf> zstdUncompressStream(
1474 uint64_t uncompressedLength) {
1475 auto zds = ZSTD_createDStream();
1477 ZSTD_freeDStream(zds);
1480 auto rc = ZSTD_initDStream(zds);
1481 zstdThrowIfError(rc);
1483 ZSTD_outBuffer out{};
1486 auto outputSize = ZSTD_DStreamOutSize();
1487 if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH) {
1488 outputSize = uncompressedLength;
1491 IOBufQueue queue(IOBufQueue::cacheChainLength());
1493 Cursor cursor(data);
1495 if (in.pos == in.size) {
1496 auto buffer = cursor.peekBytes();
1497 in.src = buffer.data();
1498 in.size = buffer.size();
1500 cursor.skip(in.size);
1501 if (rc > 1 && in.size == 0) {
1502 throw std::runtime_error(to<std::string>("ZSTD: incomplete input"));
1505 if (out.pos == out.size) {
1507 queue.postallocate(out.pos);
1509 auto buffer = queue.preallocate(outputSize, outputSize);
1510 out.dst = buffer.first;
1511 out.size = buffer.second;
1513 outputSize = ZSTD_DStreamOutSize();
1515 rc = ZSTD_decompressStream(zds, &out, &in);
1516 zstdThrowIfError(rc);
1522 queue.postallocate(out.pos);
1524 if (in.pos != in.size || !cursor.isAtEnd()) {
1525 throw std::runtime_error("ZSTD: junk after end of data");
1527 if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH &&
1528 queue.chainLength() != uncompressedLength) {
1529 throw std::runtime_error("ZSTD: invalid uncompressed length");
1532 return queue.move();
1535 std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(
1537 uint64_t uncompressedLength) {
1539 // Read decompressed size from frame if available in first IOBuf.
1540 const auto decompressedSize =
1541 ZSTD_getDecompressedSize(data->data(), data->length());
1542 if (decompressedSize != 0) {
1543 if (uncompressedLength != Codec::UNKNOWN_UNCOMPRESSED_LENGTH &&
1544 uncompressedLength != decompressedSize) {
1545 throw std::runtime_error("ZSTD: invalid uncompressed length");
1547 uncompressedLength = decompressedSize;
1550 // Faster to decompress using ZSTD_decompress() if we can.
1551 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && !data->isChained()) {
1552 return zstdUncompressBuffer(data, uncompressedLength);
1554 // Fall back to slower streaming decompression.
1555 return zstdUncompressStream(data, uncompressedLength);
1558 #endif // FOLLY_HAVE_LIBZSTD
1560 #if FOLLY_HAVE_LIBBZ2
1562 class Bzip2Codec final : public Codec {
1564 static std::unique_ptr<Codec> create(int level, CodecType type);
1565 explicit Bzip2Codec(int level, CodecType type);
1567 std::vector<std::string> validPrefixes() const override;
1568 bool canUncompress(IOBuf const* data, uint64_t uncompressedLength)
1572 std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
1573 std::unique_ptr<IOBuf> doUncompress(
1575 uint64_t uncompressedLength) override;
1580 /* static */ std::unique_ptr<Codec> Bzip2Codec::create(
1583 return make_unique<Bzip2Codec>(level, type);
1586 Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
1587 DCHECK(type == CodecType::BZIP2);
1589 case COMPRESSION_LEVEL_FASTEST:
1592 case COMPRESSION_LEVEL_DEFAULT:
1595 case COMPRESSION_LEVEL_BEST:
1599 if (level < 1 || level > 9) {
1600 throw std::invalid_argument(
1601 to<std::string>("Bzip2: invalid level: ", level));
1606 static uint32_t constexpr kBzip2MagicLE = 0x685a42;
1607 static uint64_t constexpr kBzip2MagicBytes = 3;
1609 std::vector<std::string> Bzip2Codec::validPrefixes() const {
1610 return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
1613 bool Bzip2Codec::canUncompress(IOBuf const* data, uint64_t) const {
1614 return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
1617 static bz_stream createBzStream() {
1619 stream.bzalloc = nullptr;
1620 stream.bzfree = nullptr;
1621 stream.opaque = nullptr;
1622 stream.next_in = stream.next_out = nullptr;
1623 stream.avail_in = stream.avail_out = 0;
1627 // Throws on error condition, otherwise returns the code.
1628 static int bzCheck(int const rc) {
1637 throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
1641 static uint64_t bzCompressBound(uint64_t const uncompressedLength) {
1642 // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
1643 // To guarantee that the compressed data will fit in its buffer, allocate an
1644 // output buffer of size 1% larger than the uncompressed data, plus six
1645 // hundred extra bytes.
1646 return uncompressedLength + uncompressedLength / 100 + 600;
1649 static std::unique_ptr<IOBuf> addOutputBuffer(
1651 uint64_t const bufferLength) {
1652 DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
1653 DCHECK_EQ(stream->avail_out, 0);
1655 auto buf = IOBuf::create(bufferLength);
1656 buf->append(buf->capacity());
1658 stream->next_out = reinterpret_cast<char*>(buf->writableData());
1659 stream->avail_out = buf->length();
1664 std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
1665 bz_stream stream = createBzStream();
1666 bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
1668 bzCheck(BZ2_bzCompressEnd(&stream));
1671 uint64_t const uncompressedLength = data->computeChainDataLength();
1672 uint64_t const maxCompressedLength = bzCompressBound(uncompressedLength);
1673 uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1674 uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
1676 auto out = addOutputBuffer(
1678 maxCompressedLength <= kMaxSingleStepLength ? maxCompressedLength
1679 : kDefaultBufferLength);
1681 for (auto range : *data) {
1682 while (!range.empty()) {
1683 auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1685 const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1686 stream.avail_in = inSize;
1688 if (stream.avail_out == 0) {
1689 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1692 bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
1693 range.uncheckedAdvance(inSize - stream.avail_in);
1697 if (stream.avail_out == 0) {
1698 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1700 } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
1702 out->prev()->trimEnd(stream.avail_out);
1707 std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
1709 uint64_t uncompressedLength) {
1710 bz_stream stream = createBzStream();
1711 bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
1713 bzCheck(BZ2_bzDecompressEnd(&stream));
1716 uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1717 uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
1718 uint64_t const kDefaultBufferLength =
1719 computeBufferLength(data->computeChainDataLength(), kBlockSize);
1721 auto out = addOutputBuffer(
1723 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1724 uncompressedLength <= kMaxSingleStepLength)
1725 ? uncompressedLength
1726 : kDefaultBufferLength));
1729 for (auto range : *data) {
1730 while (!range.empty()) {
1731 auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1733 const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1734 stream.avail_in = inSize;
1736 if (stream.avail_out == 0) {
1737 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1740 rc = bzCheck(BZ2_bzDecompress(&stream));
1741 range.uncheckedAdvance(inSize - stream.avail_in);
1744 while (rc != BZ_STREAM_END) {
1745 if (stream.avail_out == 0) {
1746 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1749 rc = bzCheck(BZ2_bzDecompress(&stream));
1752 out->prev()->trimEnd(stream.avail_out);
1754 uint64_t const totalOut =
1755 (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
1756 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1757 uncompressedLength != totalOut) {
1758 throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
1764 #endif // FOLLY_HAVE_LIBBZ2
1767 * Automatic decompression
1769 class AutomaticCodec final : public Codec {
1771 static std::unique_ptr<Codec> create(
1772 std::vector<std::unique_ptr<Codec>> customCodecs);
1773 explicit AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs);
1775 std::vector<std::string> validPrefixes() const override;
1776 bool canUncompress(const IOBuf* data, uint64_t uncompressedLength)
1780 bool doNeedsUncompressedLength() const override;
1781 uint64_t doMaxUncompressedLength() const override;
1783 std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
1784 throw std::runtime_error("AutomaticCodec error: compress() not supported.");
1786 std::unique_ptr<IOBuf> doUncompress(
1788 uint64_t uncompressedLength) override;
1790 void addCodecIfSupported(CodecType type);
1792 // Throws iff the codecs aren't compatible (very slow)
1793 void checkCompatibleCodecs() const;
1795 std::vector<std::unique_ptr<Codec>> codecs_;
1796 bool needsUncompressedLength_;
1797 uint64_t maxUncompressedLength_;
1800 std::vector<std::string> AutomaticCodec::validPrefixes() const {
1801 std::unordered_set<std::string> prefixes;
1802 for (const auto& codec : codecs_) {
1803 const auto codecPrefixes = codec->validPrefixes();
1804 prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
1806 return std::vector<std::string>{prefixes.begin(), prefixes.end()};
1809 bool AutomaticCodec::canUncompress(
1811 uint64_t uncompressedLength) const {
1815 [data, uncompressedLength](std::unique_ptr<Codec> const& codec) {
1816 return codec->canUncompress(data, uncompressedLength);
1820 void AutomaticCodec::addCodecIfSupported(CodecType type) {
1821 const bool present = std::any_of(
1824 [&type](std::unique_ptr<Codec> const& codec) {
1825 return codec->type() == type;
1827 if (hasCodec(type) && !present) {
1828 codecs_.push_back(getCodec(type));
1832 /* static */ std::unique_ptr<Codec> AutomaticCodec::create(
1833 std::vector<std::unique_ptr<Codec>> customCodecs) {
1834 return make_unique<AutomaticCodec>(std::move(customCodecs));
1837 AutomaticCodec::AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs)
1838 : Codec(CodecType::USER_DEFINED), codecs_(std::move(customCodecs)) {
1839 // Fastest -> slowest
1840 addCodecIfSupported(CodecType::LZ4_FRAME);
1841 addCodecIfSupported(CodecType::ZSTD);
1842 addCodecIfSupported(CodecType::ZLIB);
1843 addCodecIfSupported(CodecType::GZIP);
1844 addCodecIfSupported(CodecType::LZMA2);
1845 addCodecIfSupported(CodecType::BZIP2);
1847 checkCompatibleCodecs();
1849 // Check that none of the codes are are null
1850 DCHECK(std::none_of(
1851 codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1852 return codec == nullptr;
1855 needsUncompressedLength_ = std::any_of(
1856 codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1857 return codec->needsUncompressedLength();
1860 const auto it = std::max_element(
1863 [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) {
1864 return lhs->maxUncompressedLength() < rhs->maxUncompressedLength();
1866 DCHECK(it != codecs_.end());
1867 maxUncompressedLength_ = (*it)->maxUncompressedLength();
1870 void AutomaticCodec::checkCompatibleCodecs() const {
1871 // Keep track of all the possible headers.
1872 std::unordered_set<std::string> headers;
1873 // The empty header is not allowed.
1876 // Construct a set of headers and check that none of the headers occur twice.
1877 // Eliminate edge cases.
1878 for (auto&& codec : codecs_) {
1879 const auto codecHeaders = codec->validPrefixes();
1880 // Codecs without any valid headers are not allowed.
1881 if (codecHeaders.empty()) {
1882 throw std::invalid_argument{
1883 "AutomaticCodec: validPrefixes() must not be empty."};
1885 // Insert all the headers for the current codec.
1886 const size_t beforeSize = headers.size();
1887 headers.insert(codecHeaders.begin(), codecHeaders.end());
1888 // Codecs are not compatible if any header occurred twice.
1889 if (beforeSize + codecHeaders.size() != headers.size()) {
1890 throw std::invalid_argument{
1891 "AutomaticCodec: Two valid prefixes collide."};
1895 // Check if any strict non-empty prefix of any header is a header.
1896 for (const auto& header : headers) {
1897 for (size_t i = 1; i < header.size(); ++i) {
1898 if (headers.count(header.substr(0, i))) {
1899 throw std::invalid_argument{
1900 "AutomaticCodec: One valid prefix is a prefix of another valid "
1907 bool AutomaticCodec::doNeedsUncompressedLength() const {
1908 return needsUncompressedLength_;
1911 uint64_t AutomaticCodec::doMaxUncompressedLength() const {
1912 return maxUncompressedLength_;
1915 std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
1917 uint64_t uncompressedLength) {
1918 for (auto&& codec : codecs_) {
1919 if (codec->canUncompress(data, uncompressedLength)) {
1920 return codec->uncompress(data, uncompressedLength);
1923 throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
1928 typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
1929 static constexpr CodecFactory
1930 codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
1931 nullptr, // USER_DEFINED
1932 NoCompressionCodec::create,
1934 #if FOLLY_HAVE_LIBLZ4
1940 #if FOLLY_HAVE_LIBSNAPPY
1941 SnappyCodec::create,
1952 #if FOLLY_HAVE_LIBLZ4
1958 #if FOLLY_HAVE_LIBLZMA
1966 #if FOLLY_HAVE_LIBZSTD
1978 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
1979 LZ4FrameCodec::create,
1984 #if FOLLY_HAVE_LIBBZ2
1991 bool hasCodec(CodecType type) {
1992 size_t idx = static_cast<size_t>(type);
1993 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
1994 throw std::invalid_argument(
1995 to<std::string>("Compression type ", idx, " invalid"));
1997 return codecFactories[idx] != nullptr;
2000 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
2001 size_t idx = static_cast<size_t>(type);
2002 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
2003 throw std::invalid_argument(
2004 to<std::string>("Compression type ", idx, " invalid"));
2006 auto factory = codecFactories[idx];
2008 throw std::invalid_argument(to<std::string>(
2009 "Compression type ", idx, " not supported"));
2011 auto codec = (*factory)(level, type);
2012 DCHECK_EQ(static_cast<size_t>(codec->type()), idx);
2016 std::unique_ptr<Codec> getAutoUncompressionCodec(
2017 std::vector<std::unique_ptr<Codec>> customCodecs) {
2018 return AutomaticCodec::create(std::move(customCodecs));