2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
22 #if LZ4_VERSION_NUMBER >= 10301
27 #include <glog/logging.h>
29 #if FOLLY_HAVE_LIBSNAPPY
31 #include <snappy-sinksource.h>
38 #if FOLLY_HAVE_LIBLZMA
42 #if FOLLY_HAVE_LIBZSTD
50 #include <folly/Bits.h>
51 #include <folly/Conv.h>
52 #include <folly/Memory.h>
53 #include <folly/Portability.h>
54 #include <folly/ScopeGuard.h>
55 #include <folly/Varint.h>
56 #include <folly/io/Cursor.h>
58 #include <unordered_set>
60 namespace folly { namespace io {
62 Codec::Codec(CodecType type) : type_(type) { }
64 // Ensure consistent behavior in the nullptr case
65 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
66 uint64_t len = data->computeChainDataLength();
68 return IOBuf::create(0);
70 if (len > maxUncompressedLength()) {
71 throw std::runtime_error("Codec: uncompressed length too large");
74 return doCompress(data);
77 std::string Codec::compress(const StringPiece data) {
78 const uint64_t len = data.size();
82 if (len > maxUncompressedLength()) {
83 throw std::runtime_error("Codec: uncompressed length too large");
86 return doCompressString(data);
89 std::unique_ptr<IOBuf> Codec::uncompress(
91 Optional<uint64_t> uncompressedLength) {
92 if (!uncompressedLength) {
93 if (needsUncompressedLength()) {
94 throw std::invalid_argument("Codec: uncompressed length required");
96 } else if (*uncompressedLength > maxUncompressedLength()) {
97 throw std::runtime_error("Codec: uncompressed length too large");
101 if (uncompressedLength.value_or(0) != 0) {
102 throw std::runtime_error("Codec: invalid uncompressed length");
104 return IOBuf::create(0);
107 return doUncompress(data, uncompressedLength);
110 std::string Codec::uncompress(
111 const StringPiece data,
112 Optional<uint64_t> uncompressedLength) {
113 if (!uncompressedLength) {
114 if (needsUncompressedLength()) {
115 throw std::invalid_argument("Codec: uncompressed length required");
117 } else if (*uncompressedLength > maxUncompressedLength()) {
118 throw std::runtime_error("Codec: uncompressed length too large");
122 if (uncompressedLength.value_or(0) != 0) {
123 throw std::runtime_error("Codec: invalid uncompressed length");
128 return doUncompressString(data, uncompressedLength);
131 bool Codec::needsUncompressedLength() const {
132 return doNeedsUncompressedLength();
135 uint64_t Codec::maxUncompressedLength() const {
136 return doMaxUncompressedLength();
139 bool Codec::doNeedsUncompressedLength() const {
143 uint64_t Codec::doMaxUncompressedLength() const {
144 return UNLIMITED_UNCOMPRESSED_LENGTH;
147 std::vector<std::string> Codec::validPrefixes() const {
151 bool Codec::canUncompress(const IOBuf*, Optional<uint64_t>) const {
155 std::string Codec::doCompressString(const StringPiece data) {
156 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
157 auto outputBuffer = doCompress(&inputBuffer);
159 output.reserve(outputBuffer->computeChainDataLength());
160 for (auto range : *outputBuffer) {
161 output.append(reinterpret_cast<const char*>(range.data()), range.size());
166 std::string Codec::doUncompressString(
167 const StringPiece data,
168 Optional<uint64_t> uncompressedLength) {
169 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
170 auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
172 output.reserve(outputBuffer->computeChainDataLength());
173 for (auto range : *outputBuffer) {
174 output.append(reinterpret_cast<const char*>(range.data()), range.size());
179 uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
180 if (uncompressedLength == 0) {
183 return doMaxCompressedLength(uncompressedLength);
186 Optional<uint64_t> Codec::getUncompressedLength(
187 const folly::IOBuf* data,
188 Optional<uint64_t> uncompressedLength) const {
189 auto const compressedLength = data->computeChainDataLength();
190 if (uncompressedLength == uint64_t(0) || compressedLength == 0) {
191 if (uncompressedLength.value_or(0) != 0 || compressedLength != 0) {
192 throw std::runtime_error("Invalid uncompressed length");
196 return doGetUncompressedLength(data, uncompressedLength);
199 Optional<uint64_t> Codec::doGetUncompressedLength(
201 Optional<uint64_t> uncompressedLength) const {
202 return uncompressedLength;
205 bool StreamCodec::needsDataLength() const {
206 return doNeedsDataLength();
209 bool StreamCodec::doNeedsDataLength() const {
213 void StreamCodec::assertStateIs(State expected) const {
214 if (state_ != expected) {
215 throw std::logic_error(folly::to<std::string>(
216 "Codec: state is ", state_, "; expected state ", expected));
220 void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
221 state_ = State::RESET;
222 uncompressedLength_ = uncompressedLength;
226 bool StreamCodec::compressStream(
228 MutableByteRange& output,
229 StreamCodec::FlushOp flushOp) {
230 if (state_ == State::RESET && input.empty()) {
231 if (flushOp == StreamCodec::FlushOp::NONE) {
234 if (flushOp == StreamCodec::FlushOp::END &&
235 uncompressedLength().value_or(0) != 0) {
236 throw std::runtime_error("Codec: invalid uncompressed length");
240 if (state_ == State::RESET && !input.empty() &&
241 uncompressedLength() == uint64_t(0)) {
242 throw std::runtime_error("Codec: invalid uncompressed length");
244 // Handle input state transitions
246 case StreamCodec::FlushOp::NONE:
247 if (state_ == State::RESET) {
248 state_ = State::COMPRESS;
250 assertStateIs(State::COMPRESS);
252 case StreamCodec::FlushOp::FLUSH:
253 if (state_ == State::RESET || state_ == State::COMPRESS) {
254 state_ = State::COMPRESS_FLUSH;
256 assertStateIs(State::COMPRESS_FLUSH);
258 case StreamCodec::FlushOp::END:
259 if (state_ == State::RESET || state_ == State::COMPRESS) {
260 state_ = State::COMPRESS_END;
262 assertStateIs(State::COMPRESS_END);
265 bool const done = doCompressStream(input, output, flushOp);
266 // Handle output state transitions
268 if (state_ == State::COMPRESS_FLUSH) {
269 state_ = State::COMPRESS;
270 } else if (state_ == State::COMPRESS_END) {
273 // Check internal invariants
274 DCHECK(input.empty());
275 DCHECK(flushOp != StreamCodec::FlushOp::NONE);
280 bool StreamCodec::uncompressStream(
282 MutableByteRange& output,
283 StreamCodec::FlushOp flushOp) {
284 if (state_ == State::RESET && input.empty()) {
285 if (uncompressedLength().value_or(0) == 0) {
290 // Handle input state transitions
291 if (state_ == State::RESET) {
292 state_ = State::UNCOMPRESS;
294 assertStateIs(State::UNCOMPRESS);
295 bool const done = doUncompressStream(input, output, flushOp);
296 // Handle output state transitions
303 static std::unique_ptr<IOBuf> addOutputBuffer(
304 MutableByteRange& output,
306 DCHECK(output.empty());
307 auto buffer = IOBuf::create(size);
308 buffer->append(buffer->capacity());
309 output = {buffer->writableData(), buffer->length()};
313 std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
314 uint64_t const uncompressedLength = data->computeChainDataLength();
315 resetStream(uncompressedLength);
316 uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
318 auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
319 auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
321 MutableByteRange output;
322 auto buffer = addOutputBuffer(
324 maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
325 : kDefaultBufferLength);
327 // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
328 IOBuf const* current = data;
329 ByteRange input{current->data(), current->length()};
330 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
332 while (input.empty() && current->next() != data) {
333 current = current->next();
334 input = {current->data(), current->length()};
336 if (current->next() == data) {
337 // This is the last input buffer so end the stream
338 flushOp = StreamCodec::FlushOp::END;
340 if (output.empty()) {
341 buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
343 bool const done = compressStream(input, output, flushOp);
345 DCHECK(input.empty());
346 DCHECK(flushOp == StreamCodec::FlushOp::END);
347 DCHECK_EQ(current->next(), data);
351 buffer->prev()->trimEnd(output.size());
355 static uint64_t computeBufferLength(
356 uint64_t const compressedLength,
357 uint64_t const blockSize) {
358 uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
359 uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
360 return std::min(goodBufferSize, kMaxBufferLength);
363 std::unique_ptr<IOBuf> StreamCodec::doUncompress(
365 Optional<uint64_t> uncompressedLength) {
366 auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
367 auto constexpr kBlockSize = uint64_t(128) << 10;
368 auto const defaultBufferLength =
369 computeBufferLength(data->computeChainDataLength(), kBlockSize);
371 uncompressedLength = getUncompressedLength(data, uncompressedLength);
372 resetStream(uncompressedLength);
374 MutableByteRange output;
375 auto buffer = addOutputBuffer(
377 (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
378 ? *uncompressedLength
379 : defaultBufferLength));
381 // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
382 IOBuf const* current = data;
383 ByteRange input{current->data(), current->length()};
384 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
386 while (input.empty() && current->next() != data) {
387 current = current->next();
388 input = {current->data(), current->length()};
390 if (current->next() == data) {
391 // Tell the uncompressor there is no more input (it may optimize)
392 flushOp = StreamCodec::FlushOp::END;
394 if (output.empty()) {
395 buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
397 bool const done = uncompressStream(input, output, flushOp);
402 if (!input.empty()) {
403 throw std::runtime_error("Codec: Junk after end of data");
406 buffer->prev()->trimEnd(output.size());
407 if (uncompressedLength &&
408 *uncompressedLength != buffer->computeChainDataLength()) {
409 throw std::runtime_error("Codec: invalid uncompressed length");
420 class NoCompressionCodec final : public Codec {
422 static std::unique_ptr<Codec> create(int level, CodecType type);
423 explicit NoCompressionCodec(int level, CodecType type);
426 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
427 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
428 std::unique_ptr<IOBuf> doUncompress(
430 Optional<uint64_t> uncompressedLength) override;
433 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
434 return std::make_unique<NoCompressionCodec>(level, type);
437 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
439 DCHECK(type == CodecType::NO_COMPRESSION);
441 case COMPRESSION_LEVEL_DEFAULT:
442 case COMPRESSION_LEVEL_FASTEST:
443 case COMPRESSION_LEVEL_BEST:
447 throw std::invalid_argument(to<std::string>(
448 "NoCompressionCodec: invalid level ", level));
452 uint64_t NoCompressionCodec::doMaxCompressedLength(
453 uint64_t uncompressedLength) const {
454 return uncompressedLength;
457 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
459 return data->clone();
462 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
464 Optional<uint64_t> uncompressedLength) {
465 if (uncompressedLength &&
466 data->computeChainDataLength() != *uncompressedLength) {
467 throw std::runtime_error(
468 to<std::string>("NoCompressionCodec: invalid uncompressed length"));
470 return data->clone();
473 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
477 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
478 DCHECK_GE(out->tailroom(), kMaxVarintLength64);
479 out->append(encodeVarint(val, out->writableTail()));
482 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
485 for (int shift = 0; shift <= 63; shift += 7) {
486 b = cursor.read<int8_t>();
487 val |= static_cast<uint64_t>(b & 0x7f) << shift;
493 throw std::invalid_argument("Invalid varint value. Too big.");
500 #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
504 * Reads sizeof(T) bytes, and returns false if not enough bytes are available.
505 * Returns true if the first n bytes are equal to prefix when interpreted as
508 template <typename T>
509 typename std::enable_if<std::is_unsigned<T>::value, bool>::type
510 dataStartsWithLE(const IOBuf* data, T prefix, uint64_t n = sizeof(T)) {
512 DCHECK_LE(n, sizeof(T));
515 if (!cursor.tryReadLE(value)) {
518 const T mask = n == sizeof(T) ? T(-1) : (T(1) << (8 * n)) - 1;
519 return prefix == (value & mask);
522 template <typename T>
523 typename std::enable_if<std::is_arithmetic<T>::value, std::string>::type
524 prefixToStringLE(T prefix, uint64_t n = sizeof(T)) {
526 DCHECK_LE(n, sizeof(T));
527 prefix = Endian::little(prefix);
530 memcpy(&result[0], &prefix, n);
535 #if FOLLY_HAVE_LIBLZ4
540 class LZ4Codec final : public Codec {
542 static std::unique_ptr<Codec> create(int level, CodecType type);
543 explicit LZ4Codec(int level, CodecType type);
546 bool doNeedsUncompressedLength() const override;
547 uint64_t doMaxUncompressedLength() const override;
548 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
550 bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
552 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
553 std::unique_ptr<IOBuf> doUncompress(
555 Optional<uint64_t> uncompressedLength) override;
557 bool highCompression_;
560 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
561 return std::make_unique<LZ4Codec>(level, type);
564 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
565 DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
568 case COMPRESSION_LEVEL_FASTEST:
569 case COMPRESSION_LEVEL_DEFAULT:
572 case COMPRESSION_LEVEL_BEST:
576 if (level < 1 || level > 2) {
577 throw std::invalid_argument(to<std::string>(
578 "LZ4Codec: invalid level: ", level));
580 highCompression_ = (level > 1);
583 bool LZ4Codec::doNeedsUncompressedLength() const {
584 return !encodeSize();
587 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
588 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
590 #ifndef LZ4_MAX_INPUT_SIZE
591 # define LZ4_MAX_INPUT_SIZE 0x7E000000
594 uint64_t LZ4Codec::doMaxUncompressedLength() const {
595 return LZ4_MAX_INPUT_SIZE;
598 uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
599 return LZ4_compressBound(uncompressedLength) +
600 (encodeSize() ? kMaxVarintLength64 : 0);
603 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
605 if (data->isChained()) {
606 // LZ4 doesn't support streaming, so we have to coalesce
607 clone = data->cloneCoalescedAsValue();
611 auto out = IOBuf::create(maxCompressedLength(data->length()));
613 encodeVarintToIOBuf(data->length(), out.get());
617 auto input = reinterpret_cast<const char*>(data->data());
618 auto output = reinterpret_cast<char*>(out->writableTail());
619 const auto inputLength = data->length();
620 #if LZ4_VERSION_NUMBER >= 10700
621 if (highCompression_) {
622 n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
624 n = LZ4_compress_default(input, output, inputLength, out->tailroom());
627 if (highCompression_) {
628 n = LZ4_compressHC(input, output, inputLength);
630 n = LZ4_compress(input, output, inputLength);
635 CHECK_LE(n, out->capacity());
641 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
643 Optional<uint64_t> uncompressedLength) {
645 if (data->isChained()) {
646 // LZ4 doesn't support streaming, so we have to coalesce
647 clone = data->cloneCoalescedAsValue();
651 folly::io::Cursor cursor(data);
652 uint64_t actualUncompressedLength;
654 actualUncompressedLength = decodeVarintFromCursor(cursor);
655 if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
656 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
660 DCHECK(uncompressedLength.hasValue());
661 DCHECK(*uncompressedLength <= maxUncompressedLength());
662 actualUncompressedLength = *uncompressedLength;
665 auto sp = StringPiece{cursor.peekBytes()};
666 auto out = IOBuf::create(actualUncompressedLength);
667 int n = LZ4_decompress_safe(
669 reinterpret_cast<char*>(out->writableTail()),
671 actualUncompressedLength);
673 if (n < 0 || uint64_t(n) != actualUncompressedLength) {
674 throw std::runtime_error(to<std::string>(
675 "LZ4 decompression returned invalid value ", n));
677 out->append(actualUncompressedLength);
681 #if LZ4_VERSION_NUMBER >= 10301
683 class LZ4FrameCodec final : public Codec {
685 static std::unique_ptr<Codec> create(int level, CodecType type);
686 explicit LZ4FrameCodec(int level, CodecType type);
687 ~LZ4FrameCodec() override;
689 std::vector<std::string> validPrefixes() const override;
690 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
694 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
696 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
697 std::unique_ptr<IOBuf> doUncompress(
699 Optional<uint64_t> uncompressedLength) override;
701 // Reset the dctx_ if it is dirty or null.
705 LZ4F_decompressionContext_t dctx_{nullptr};
709 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
712 return std::make_unique<LZ4FrameCodec>(level, type);
715 static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204;
717 std::vector<std::string> LZ4FrameCodec::validPrefixes() const {
718 return {prefixToStringLE(kLZ4FrameMagicLE)};
721 bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
722 return dataStartsWithLE(data, kLZ4FrameMagicLE);
725 uint64_t LZ4FrameCodec::doMaxCompressedLength(
726 uint64_t uncompressedLength) const {
727 LZ4F_preferences_t prefs{};
728 prefs.compressionLevel = level_;
729 prefs.frameInfo.contentSize = uncompressedLength;
730 return LZ4F_compressFrameBound(uncompressedLength, &prefs);
733 static size_t lz4FrameThrowOnError(size_t code) {
734 if (LZ4F_isError(code)) {
735 throw std::runtime_error(
736 to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
741 void LZ4FrameCodec::resetDCtx() {
742 if (dctx_ && !dirty_) {
746 LZ4F_freeDecompressionContext(dctx_);
748 lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
752 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
753 DCHECK(type == CodecType::LZ4_FRAME);
755 case COMPRESSION_LEVEL_FASTEST:
756 case COMPRESSION_LEVEL_DEFAULT:
759 case COMPRESSION_LEVEL_BEST:
768 LZ4FrameCodec::~LZ4FrameCodec() {
770 LZ4F_freeDecompressionContext(dctx_);
774 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
775 // LZ4 Frame compression doesn't support streaming so we have to coalesce
777 if (data->isChained()) {
778 clone = data->cloneCoalescedAsValue();
782 const auto uncompressedLength = data->length();
783 LZ4F_preferences_t prefs{};
784 prefs.compressionLevel = level_;
785 prefs.frameInfo.contentSize = uncompressedLength;
787 auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
788 const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
794 buf->append(written);
798 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
800 Optional<uint64_t> uncompressedLength) {
801 // Reset the dctx if any errors have occurred
804 ByteRange in = *data->begin();
806 if (data->isChained()) {
807 clone = data->cloneCoalescedAsValue();
808 in = clone.coalesce();
811 // Select decompression options
812 LZ4F_decompressOptions_t options;
813 options.stableDst = 1;
814 // Select blockSize and growthSize for the IOBufQueue
815 IOBufQueue queue(IOBufQueue::cacheChainLength());
816 auto blockSize = uint64_t{64} << 10;
817 auto growthSize = uint64_t{4} << 20;
818 if (uncompressedLength) {
819 // Allocate uncompressedLength in one chunk (up to 64 MB)
820 const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20);
821 queue.preallocate(allocateSize, allocateSize);
822 blockSize = std::min(*uncompressedLength, blockSize);
823 growthSize = std::min(*uncompressedLength, growthSize);
825 // Reduce growthSize for small data
826 const auto guessUncompressedLen =
827 4 * std::max<uint64_t>(blockSize, in.size());
828 growthSize = std::min(guessUncompressedLen, growthSize);
830 // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
833 // Decompress until the frame is over
836 // Allocate enough space to decompress at least a block
839 std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
841 size_t inSize = in.size();
842 code = lz4FrameThrowOnError(
843 LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
844 if (in.empty() && outSize == 0 && code != 0) {
845 // We passed no input, no output was produced, and the frame isn't over
846 // No more forward progress is possible
847 throw std::runtime_error("LZ4Frame error: Incomplete frame");
849 in.uncheckedAdvance(inSize);
850 queue.postallocate(outSize);
852 // At this point the decompression context can be reused
854 if (uncompressedLength && queue.chainLength() != *uncompressedLength) {
855 throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
860 #endif // LZ4_VERSION_NUMBER >= 10301
861 #endif // FOLLY_HAVE_LIBLZ4
863 #if FOLLY_HAVE_LIBSNAPPY
870 * Implementation of snappy::Source that reads from a IOBuf chain.
872 class IOBufSnappySource final : public snappy::Source {
874 explicit IOBufSnappySource(const IOBuf* data);
875 size_t Available() const override;
876 const char* Peek(size_t* len) override;
877 void Skip(size_t n) override;
883 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
884 : available_(data->computeChainDataLength()),
888 size_t IOBufSnappySource::Available() const {
892 const char* IOBufSnappySource::Peek(size_t* len) {
893 auto sp = StringPiece{cursor_.peekBytes()};
898 void IOBufSnappySource::Skip(size_t n) {
899 CHECK_LE(n, available_);
904 class SnappyCodec final : public Codec {
906 static std::unique_ptr<Codec> create(int level, CodecType type);
907 explicit SnappyCodec(int level, CodecType type);
910 uint64_t doMaxUncompressedLength() const override;
911 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
912 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
913 std::unique_ptr<IOBuf> doUncompress(
915 Optional<uint64_t> uncompressedLength) override;
918 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
919 return std::make_unique<SnappyCodec>(level, type);
922 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
923 DCHECK(type == CodecType::SNAPPY);
925 case COMPRESSION_LEVEL_FASTEST:
926 case COMPRESSION_LEVEL_DEFAULT:
927 case COMPRESSION_LEVEL_BEST:
931 throw std::invalid_argument(to<std::string>(
932 "SnappyCodec: invalid level: ", level));
936 uint64_t SnappyCodec::doMaxUncompressedLength() const {
937 // snappy.h uses uint32_t for lengths, so there's that.
938 return std::numeric_limits<uint32_t>::max();
941 uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
942 return snappy::MaxCompressedLength(uncompressedLength);
945 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
946 IOBufSnappySource source(data);
947 auto out = IOBuf::create(maxCompressedLength(source.Available()));
949 snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
950 out->writableTail()));
952 size_t n = snappy::Compress(&source, &sink);
954 CHECK_LE(n, out->capacity());
959 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(
961 Optional<uint64_t> uncompressedLength) {
962 uint32_t actualUncompressedLength = 0;
965 IOBufSnappySource source(data);
966 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
967 throw std::runtime_error("snappy::GetUncompressedLength failed");
969 if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
970 throw std::runtime_error("snappy: invalid uncompressed length");
974 auto out = IOBuf::create(actualUncompressedLength);
977 IOBufSnappySource source(data);
978 if (!snappy::RawUncompress(&source,
979 reinterpret_cast<char*>(out->writableTail()))) {
980 throw std::runtime_error("snappy::RawUncompress failed");
984 out->append(actualUncompressedLength);
988 #endif // FOLLY_HAVE_LIBSNAPPY
994 class ZlibCodec final : public Codec {
996 static std::unique_ptr<Codec> create(int level, CodecType type);
997 explicit ZlibCodec(int level, CodecType type);
999 std::vector<std::string> validPrefixes() const override;
1000 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1004 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1005 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1006 std::unique_ptr<IOBuf> doUncompress(
1008 Optional<uint64_t> uncompressedLength) override;
1010 std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
1011 bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
1016 static constexpr uint16_t kGZIPMagicLE = 0x8B1F;
1018 std::vector<std::string> ZlibCodec::validPrefixes() const {
1019 if (type() == CodecType::ZLIB) {
1020 // Zlib streams start with a 2 byte header.
1027 // We won't restrict the values of any sub-fields except as described below.
1029 // The lowest 4 bits of CMF is the compression method (CM).
1030 // CM == 0x8 is the deflate compression method, which is currently the only
1031 // supported compression method, so any valid prefix must have CM == 0x8.
1033 // The lowest 5 bits of FLG is FCHECK.
1034 // FCHECK must be such that the two header bytes are a multiple of 31 when
1035 // interpreted as a big endian 16-bit number.
1036 std::vector<std::string> result;
1037 // 16 values for the first byte, 8 values for the second byte.
1038 // There are also 4 combinations where both 0x00 and 0x1F work as FCHECK.
1039 result.reserve(132);
1040 // Select all values for the CMF byte that use the deflate algorithm 0x8.
1041 for (uint32_t first = 0x0800; first <= 0xF800; first += 0x1000) {
1042 // Select all values for the FLG, but leave FCHECK as 0 since it's fixed.
1043 for (uint32_t second = 0x00; second <= 0xE0; second += 0x20) {
1044 uint16_t prefix = first | second;
1046 prefix += 31 - (prefix % 31);
1047 result.push_back(prefixToStringLE(Endian::big(prefix)));
1048 // zlib won't produce this, but it is a valid prefix.
1049 if ((prefix & 0x1F) == 31) {
1051 result.push_back(prefixToStringLE(Endian::big(prefix)));
1057 // The gzip frame starts with 2 magic bytes.
1058 return {prefixToStringLE(kGZIPMagicLE)};
1062 bool ZlibCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
1063 if (type() == CodecType::ZLIB) {
1065 Cursor cursor{data};
1066 if (!cursor.tryReadBE(value)) {
1069 // zlib compressed if using deflate and is a multiple of 31.
1070 return (value & 0x0F00) == 0x0800 && value % 31 == 0;
1072 return dataStartsWithLE(data, kGZIPMagicLE);
1076 uint64_t ZlibCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1077 return deflateBound(nullptr, uncompressedLength);
1080 std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
1081 return std::make_unique<ZlibCodec>(level, type);
1084 ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
1085 DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP);
1087 case COMPRESSION_LEVEL_FASTEST:
1090 case COMPRESSION_LEVEL_DEFAULT:
1091 level = Z_DEFAULT_COMPRESSION;
1093 case COMPRESSION_LEVEL_BEST:
1097 if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
1098 throw std::invalid_argument(to<std::string>(
1099 "ZlibCodec: invalid level: ", level));
1104 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
1106 CHECK_EQ(stream->avail_out, 0);
1108 auto buf = IOBuf::create(length);
1109 buf->append(buf->capacity());
1111 stream->next_out = buf->writableData();
1112 stream->avail_out = buf->length();
1117 bool ZlibCodec::doInflate(z_stream* stream,
1119 uint32_t bufferLength) {
1120 if (stream->avail_out == 0) {
1121 head->prependChain(addOutputBuffer(stream, bufferLength));
1124 int rc = inflate(stream, Z_NO_FLUSH);
1135 throw std::runtime_error(to<std::string>(
1136 "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
1138 CHECK(false) << rc << ": " << stream->msg;
1144 std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
1146 stream.zalloc = nullptr;
1147 stream.zfree = nullptr;
1148 stream.opaque = nullptr;
1150 // Using deflateInit2() to support gzip. "The windowBits parameter is the
1151 // base two logarithm of the maximum window size (...) The default value is
1152 // 15 (...) Add 16 to windowBits to write a simple gzip header and trailer
1153 // around the compressed data instead of a zlib wrapper. The gzip header
1154 // will have no file name, no extra data, no comment, no modification time
1155 // (set to zero), no header crc, and the operating system will be set to 255
1157 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
1158 // All other parameters (method, memLevel, strategy) get default values from
1160 int rc = deflateInit2(&stream,
1165 Z_DEFAULT_STRATEGY);
1167 throw std::runtime_error(to<std::string>(
1168 "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
1171 stream.next_in = stream.next_out = nullptr;
1172 stream.avail_in = stream.avail_out = 0;
1173 stream.total_in = stream.total_out = 0;
1175 bool success = false;
1178 rc = deflateEnd(&stream);
1179 // If we're here because of an exception, it's okay if some data
1181 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
1182 << rc << ": " << stream.msg;
1185 uint64_t uncompressedLength = data->computeChainDataLength();
1186 uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
1188 // Max 64MiB in one go
1189 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
1190 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
1192 auto out = addOutputBuffer(
1194 (maxCompressedLength <= maxSingleStepLength ?
1195 maxCompressedLength :
1196 defaultBufferLength));
1198 for (auto& range : *data) {
1199 uint64_t remaining = range.size();
1200 uint64_t written = 0;
1202 uint32_t step = (remaining > maxSingleStepLength ?
1203 maxSingleStepLength : remaining);
1204 stream.next_in = const_cast<uint8_t*>(range.data() + written);
1205 stream.avail_in = step;
1209 while (stream.avail_in != 0) {
1210 if (stream.avail_out == 0) {
1211 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1214 rc = deflate(&stream, Z_NO_FLUSH);
1216 CHECK_EQ(rc, Z_OK) << stream.msg;
1222 if (stream.avail_out == 0) {
1223 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1226 rc = deflate(&stream, Z_FINISH);
1227 } while (rc == Z_OK);
1229 CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
1231 out->prev()->trimEnd(stream.avail_out);
1233 success = true; // we survived
1238 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(
1240 Optional<uint64_t> uncompressedLength) {
1242 stream.zalloc = nullptr;
1243 stream.zfree = nullptr;
1244 stream.opaque = nullptr;
1246 // "The windowBits parameter is the base two logarithm of the maximum window
1247 // size (...) The default value is 15 (...) add 16 to decode only the gzip
1248 // format (the zlib format will return a Z_DATA_ERROR)."
1249 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
1250 int rc = inflateInit2(&stream, windowBits);
1252 throw std::runtime_error(to<std::string>(
1253 "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
1256 stream.next_in = stream.next_out = nullptr;
1257 stream.avail_in = stream.avail_out = 0;
1258 stream.total_in = stream.total_out = 0;
1260 bool success = false;
1263 rc = inflateEnd(&stream);
1264 // If we're here because of an exception, it's okay if some data
1266 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
1267 << rc << ": " << stream.msg;
1270 // Max 64MiB in one go
1271 constexpr uint64_t maxSingleStepLength = uint64_t(64) << 20; // 64MiB
1272 constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB
1273 const uint64_t defaultBufferLength =
1274 computeBufferLength(data->computeChainDataLength(), kBlockSize);
1276 auto out = addOutputBuffer(
1278 ((uncompressedLength && *uncompressedLength <= maxSingleStepLength)
1279 ? *uncompressedLength
1280 : defaultBufferLength));
1282 bool streamEnd = false;
1283 for (auto& range : *data) {
1284 if (range.empty()) {
1288 stream.next_in = const_cast<uint8_t*>(range.data());
1289 stream.avail_in = range.size();
1291 while (stream.avail_in != 0) {
1293 throw std::runtime_error(to<std::string>(
1294 "ZlibCodec: junk after end of data"));
1297 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1301 while (!streamEnd) {
1302 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1305 out->prev()->trimEnd(stream.avail_out);
1307 if (uncompressedLength && *uncompressedLength != stream.total_out) {
1308 throw std::runtime_error(
1309 to<std::string>("ZlibCodec: invalid uncompressed length"));
1312 success = true; // we survived
1317 #endif // FOLLY_HAVE_LIBZ
1319 #if FOLLY_HAVE_LIBLZMA
1324 class LZMA2Codec final : public Codec {
1326 static std::unique_ptr<Codec> create(int level, CodecType type);
1327 explicit LZMA2Codec(int level, CodecType type);
1329 std::vector<std::string> validPrefixes() const override;
1330 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1334 bool doNeedsUncompressedLength() const override;
1335 uint64_t doMaxUncompressedLength() const override;
1336 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1338 bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
1340 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1341 std::unique_ptr<IOBuf> doUncompress(
1343 Optional<uint64_t> uncompressedLength) override;
1345 std::unique_ptr<IOBuf> addOutputBuffer(lzma_stream* stream, size_t length);
1346 bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength);
1351 static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD;
1352 static constexpr unsigned kLZMA2MagicBytes = 6;
1354 std::vector<std::string> LZMA2Codec::validPrefixes() const {
1355 if (type() == CodecType::LZMA2_VARINT_SIZE) {
1358 return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)};
1361 bool LZMA2Codec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
1362 if (type() == CodecType::LZMA2_VARINT_SIZE) {
1365 // Returns false for all inputs less than 8 bytes.
1366 // This is okay, because no valid LZMA2 streams are less than 8 bytes.
1367 return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes);
1370 std::unique_ptr<Codec> LZMA2Codec::create(int level, CodecType type) {
1371 return std::make_unique<LZMA2Codec>(level, type);
1374 LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) {
1375 DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
1377 case COMPRESSION_LEVEL_FASTEST:
1380 case COMPRESSION_LEVEL_DEFAULT:
1381 level = LZMA_PRESET_DEFAULT;
1383 case COMPRESSION_LEVEL_BEST:
1387 if (level < 0 || level > 9) {
1388 throw std::invalid_argument(to<std::string>(
1389 "LZMA2Codec: invalid level: ", level));
1394 bool LZMA2Codec::doNeedsUncompressedLength() const {
1398 uint64_t LZMA2Codec::doMaxUncompressedLength() const {
1399 // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
1400 return uint64_t(1) << 63;
1403 uint64_t LZMA2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1404 return lzma_stream_buffer_bound(uncompressedLength) +
1405 (encodeSize() ? kMaxVarintLength64 : 0);
1408 std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
1409 lzma_stream* stream,
1412 CHECK_EQ(stream->avail_out, 0);
1414 auto buf = IOBuf::create(length);
1415 buf->append(buf->capacity());
1417 stream->next_out = buf->writableData();
1418 stream->avail_out = buf->length();
1423 std::unique_ptr<IOBuf> LZMA2Codec::doCompress(const IOBuf* data) {
1425 lzma_stream stream = LZMA_STREAM_INIT;
1427 rc = lzma_easy_encoder(&stream, level_, LZMA_CHECK_NONE);
1428 if (rc != LZMA_OK) {
1429 throw std::runtime_error(folly::to<std::string>(
1430 "LZMA2Codec: lzma_easy_encoder error: ", rc));
1433 SCOPE_EXIT { lzma_end(&stream); };
1435 uint64_t uncompressedLength = data->computeChainDataLength();
1436 uint64_t maxCompressedLength = lzma_stream_buffer_bound(uncompressedLength);
1438 // Max 64MiB in one go
1439 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
1440 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
1442 auto out = addOutputBuffer(
1444 (maxCompressedLength <= maxSingleStepLength ?
1445 maxCompressedLength :
1446 defaultBufferLength));
1449 auto size = IOBuf::createCombined(kMaxVarintLength64);
1450 encodeVarintToIOBuf(uncompressedLength, size.get());
1451 size->appendChain(std::move(out));
1452 out = std::move(size);
1455 for (auto& range : *data) {
1456 if (range.empty()) {
1460 stream.next_in = const_cast<uint8_t*>(range.data());
1461 stream.avail_in = range.size();
1463 while (stream.avail_in != 0) {
1464 if (stream.avail_out == 0) {
1465 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1468 rc = lzma_code(&stream, LZMA_RUN);
1470 if (rc != LZMA_OK) {
1471 throw std::runtime_error(folly::to<std::string>(
1472 "LZMA2Codec: lzma_code error: ", rc));
1478 if (stream.avail_out == 0) {
1479 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
1482 rc = lzma_code(&stream, LZMA_FINISH);
1483 } while (rc == LZMA_OK);
1485 if (rc != LZMA_STREAM_END) {
1486 throw std::runtime_error(folly::to<std::string>(
1487 "LZMA2Codec: lzma_code ended with error: ", rc));
1490 out->prev()->trimEnd(stream.avail_out);
1495 bool LZMA2Codec::doInflate(lzma_stream* stream,
1497 size_t bufferLength) {
1498 if (stream->avail_out == 0) {
1499 head->prependChain(addOutputBuffer(stream, bufferLength));
1502 lzma_ret rc = lzma_code(stream, LZMA_RUN);
1507 case LZMA_STREAM_END:
1510 throw std::runtime_error(to<std::string>(
1511 "LZMA2Codec: lzma_code error: ", rc));
1517 std::unique_ptr<IOBuf> LZMA2Codec::doUncompress(
1519 Optional<uint64_t> uncompressedLength) {
1521 lzma_stream stream = LZMA_STREAM_INIT;
1523 rc = lzma_auto_decoder(&stream, std::numeric_limits<uint64_t>::max(), 0);
1524 if (rc != LZMA_OK) {
1525 throw std::runtime_error(folly::to<std::string>(
1526 "LZMA2Codec: lzma_auto_decoder error: ", rc));
1529 SCOPE_EXIT { lzma_end(&stream); };
1531 // Max 64MiB in one go
1532 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
1533 constexpr uint32_t defaultBufferLength = uint32_t(256) << 10; // 256 KiB
1535 folly::io::Cursor cursor(data);
1537 const uint64_t actualUncompressedLength = decodeVarintFromCursor(cursor);
1538 if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
1539 throw std::runtime_error("LZMA2Codec: invalid uncompressed length");
1541 uncompressedLength = actualUncompressedLength;
1544 auto out = addOutputBuffer(
1546 ((uncompressedLength && *uncompressedLength <= maxSingleStepLength)
1547 ? *uncompressedLength
1548 : defaultBufferLength));
1550 bool streamEnd = false;
1551 auto buf = cursor.peekBytes();
1552 while (!buf.empty()) {
1553 stream.next_in = const_cast<uint8_t*>(buf.data());
1554 stream.avail_in = buf.size();
1556 while (stream.avail_in != 0) {
1558 throw std::runtime_error(to<std::string>(
1559 "LZMA2Codec: junk after end of data"));
1562 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1565 cursor.skip(buf.size());
1566 buf = cursor.peekBytes();
1569 while (!streamEnd) {
1570 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1573 out->prev()->trimEnd(stream.avail_out);
1575 if (uncompressedLength && *uncompressedLength != stream.total_out) {
1576 throw std::runtime_error(
1577 to<std::string>("LZMA2Codec: invalid uncompressed length"));
1583 #endif // FOLLY_HAVE_LIBLZMA
1585 #ifdef FOLLY_HAVE_LIBZSTD
1590 class ZSTDCodec final : public Codec {
1592 static std::unique_ptr<Codec> create(int level, CodecType);
1593 explicit ZSTDCodec(int level, CodecType type);
1595 std::vector<std::string> validPrefixes() const override;
1596 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1600 bool doNeedsUncompressedLength() const override;
1601 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1602 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1603 std::unique_ptr<IOBuf> doUncompress(
1605 Optional<uint64_t> uncompressedLength) override;
1610 static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528;
1612 std::vector<std::string> ZSTDCodec::validPrefixes() const {
1613 return {prefixToStringLE(kZSTDMagicLE)};
1616 bool ZSTDCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
1617 return dataStartsWithLE(data, kZSTDMagicLE);
1620 std::unique_ptr<Codec> ZSTDCodec::create(int level, CodecType type) {
1621 return std::make_unique<ZSTDCodec>(level, type);
1624 ZSTDCodec::ZSTDCodec(int level, CodecType type) : Codec(type) {
1625 DCHECK(type == CodecType::ZSTD);
1627 case COMPRESSION_LEVEL_FASTEST:
1630 case COMPRESSION_LEVEL_DEFAULT:
1633 case COMPRESSION_LEVEL_BEST:
1637 if (level < 1 || level > ZSTD_maxCLevel()) {
1638 throw std::invalid_argument(
1639 to<std::string>("ZSTD: invalid level: ", level));
1644 bool ZSTDCodec::doNeedsUncompressedLength() const {
1648 uint64_t ZSTDCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1649 return ZSTD_compressBound(uncompressedLength);
1652 void zstdThrowIfError(size_t rc) {
1653 if (!ZSTD_isError(rc)) {
1656 throw std::runtime_error(
1657 to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1660 std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
1661 // Support earlier versions of the codec (working with a single IOBuf,
1662 // and using ZSTD_decompress which requires ZSTD frame to contain size,
1663 // which isn't populated by streaming API).
1664 if (!data->isChained()) {
1665 auto out = IOBuf::createCombined(ZSTD_compressBound(data->length()));
1666 const auto rc = ZSTD_compress(
1667 out->writableData(),
1672 zstdThrowIfError(rc);
1677 auto zcs = ZSTD_createCStream();
1679 ZSTD_freeCStream(zcs);
1682 auto rc = ZSTD_initCStream(zcs, level_);
1683 zstdThrowIfError(rc);
1685 Cursor cursor(data);
1687 IOBuf::createCombined(maxCompressedLength(cursor.totalLength()));
1690 out.dst = result->writableTail();
1691 out.size = result->capacity();
1694 for (auto buffer = cursor.peekBytes(); !buffer.empty();) {
1696 in.src = buffer.data();
1697 in.size = buffer.size();
1698 for (in.pos = 0; in.pos != in.size;) {
1699 rc = ZSTD_compressStream(zcs, &out, &in);
1700 zstdThrowIfError(rc);
1702 cursor.skip(in.size);
1703 buffer = cursor.peekBytes();
1706 rc = ZSTD_endStream(zcs, &out);
1707 zstdThrowIfError(rc);
1710 result->append(out.pos);
1714 static std::unique_ptr<IOBuf> zstdUncompressBuffer(
1716 Optional<uint64_t> uncompressedLength) {
1717 // Check preconditions
1718 DCHECK(!data->isChained());
1719 DCHECK(uncompressedLength.hasValue());
1721 auto uncompressed = IOBuf::create(*uncompressedLength);
1722 const auto decompressedSize = ZSTD_decompress(
1723 uncompressed->writableTail(),
1724 uncompressed->tailroom(),
1727 zstdThrowIfError(decompressedSize);
1728 if (decompressedSize != uncompressedLength) {
1729 throw std::runtime_error("ZSTD: invalid uncompressed length");
1731 uncompressed->append(decompressedSize);
1732 return uncompressed;
1735 static std::unique_ptr<IOBuf> zstdUncompressStream(
1737 Optional<uint64_t> uncompressedLength) {
1738 auto zds = ZSTD_createDStream();
1740 ZSTD_freeDStream(zds);
1743 auto rc = ZSTD_initDStream(zds);
1744 zstdThrowIfError(rc);
1746 ZSTD_outBuffer out{};
1749 auto outputSize = uncompressedLength.value_or(ZSTD_DStreamOutSize());
1751 IOBufQueue queue(IOBufQueue::cacheChainLength());
1753 Cursor cursor(data);
1755 if (in.pos == in.size) {
1756 auto buffer = cursor.peekBytes();
1757 in.src = buffer.data();
1758 in.size = buffer.size();
1760 cursor.skip(in.size);
1761 if (rc > 1 && in.size == 0) {
1762 throw std::runtime_error(to<std::string>("ZSTD: incomplete input"));
1765 if (out.pos == out.size) {
1767 queue.postallocate(out.pos);
1769 auto buffer = queue.preallocate(outputSize, outputSize);
1770 out.dst = buffer.first;
1771 out.size = buffer.second;
1773 outputSize = ZSTD_DStreamOutSize();
1775 rc = ZSTD_decompressStream(zds, &out, &in);
1776 zstdThrowIfError(rc);
1782 queue.postallocate(out.pos);
1784 if (in.pos != in.size || !cursor.isAtEnd()) {
1785 throw std::runtime_error("ZSTD: junk after end of data");
1787 if (uncompressedLength && queue.chainLength() != *uncompressedLength) {
1788 throw std::runtime_error("ZSTD: invalid uncompressed length");
1791 return queue.move();
1794 std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(
1796 Optional<uint64_t> uncompressedLength) {
1798 // Read decompressed size from frame if available in first IOBuf.
1799 const auto decompressedSize =
1800 ZSTD_getDecompressedSize(data->data(), data->length());
1801 if (decompressedSize != 0) {
1802 if (uncompressedLength && *uncompressedLength != decompressedSize) {
1803 throw std::runtime_error("ZSTD: invalid uncompressed length");
1805 uncompressedLength = decompressedSize;
1808 // Faster to decompress using ZSTD_decompress() if we can.
1809 if (uncompressedLength && !data->isChained()) {
1810 return zstdUncompressBuffer(data, uncompressedLength);
1812 // Fall back to slower streaming decompression.
1813 return zstdUncompressStream(data, uncompressedLength);
1816 #endif // FOLLY_HAVE_LIBZSTD
1818 #if FOLLY_HAVE_LIBBZ2
1820 class Bzip2Codec final : public Codec {
1822 static std::unique_ptr<Codec> create(int level, CodecType type);
1823 explicit Bzip2Codec(int level, CodecType type);
1825 std::vector<std::string> validPrefixes() const override;
1826 bool canUncompress(IOBuf const* data, Optional<uint64_t> uncompressedLength)
1830 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1831 std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
1832 std::unique_ptr<IOBuf> doUncompress(
1834 Optional<uint64_t> uncompressedLength) override;
1839 /* static */ std::unique_ptr<Codec> Bzip2Codec::create(
1842 return std::make_unique<Bzip2Codec>(level, type);
1845 Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
1846 DCHECK(type == CodecType::BZIP2);
1848 case COMPRESSION_LEVEL_FASTEST:
1851 case COMPRESSION_LEVEL_DEFAULT:
1854 case COMPRESSION_LEVEL_BEST:
1858 if (level < 1 || level > 9) {
1859 throw std::invalid_argument(
1860 to<std::string>("Bzip2: invalid level: ", level));
1865 static uint32_t constexpr kBzip2MagicLE = 0x685a42;
1866 static uint64_t constexpr kBzip2MagicBytes = 3;
1868 std::vector<std::string> Bzip2Codec::validPrefixes() const {
1869 return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
1872 bool Bzip2Codec::canUncompress(IOBuf const* data, Optional<uint64_t>) const {
1873 return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
1876 uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1877 // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
1878 // To guarantee that the compressed data will fit in its buffer, allocate an
1879 // output buffer of size 1% larger than the uncompressed data, plus six
1880 // hundred extra bytes.
1881 return uncompressedLength + uncompressedLength / 100 + 600;
1884 static bz_stream createBzStream() {
1886 stream.bzalloc = nullptr;
1887 stream.bzfree = nullptr;
1888 stream.opaque = nullptr;
1889 stream.next_in = stream.next_out = nullptr;
1890 stream.avail_in = stream.avail_out = 0;
1894 // Throws on error condition, otherwise returns the code.
1895 static int bzCheck(int const rc) {
1904 throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
1908 static std::unique_ptr<IOBuf> addOutputBuffer(
1910 uint64_t const bufferLength) {
1911 DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
1912 DCHECK_EQ(stream->avail_out, 0);
1914 auto buf = IOBuf::create(bufferLength);
1915 buf->append(buf->capacity());
1917 stream->next_out = reinterpret_cast<char*>(buf->writableData());
1918 stream->avail_out = buf->length();
1923 std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
1924 bz_stream stream = createBzStream();
1925 bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
1927 bzCheck(BZ2_bzCompressEnd(&stream));
1930 uint64_t const uncompressedLength = data->computeChainDataLength();
1931 uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
1932 uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1933 uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
1935 auto out = addOutputBuffer(
1937 maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
1938 : kDefaultBufferLength);
1940 for (auto range : *data) {
1941 while (!range.empty()) {
1942 auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1944 const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1945 stream.avail_in = inSize;
1947 if (stream.avail_out == 0) {
1948 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1951 bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
1952 range.uncheckedAdvance(inSize - stream.avail_in);
1956 if (stream.avail_out == 0) {
1957 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1959 } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
1961 out->prev()->trimEnd(stream.avail_out);
1966 std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
1968 Optional<uint64_t> uncompressedLength) {
1969 bz_stream stream = createBzStream();
1970 bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
1972 bzCheck(BZ2_bzDecompressEnd(&stream));
1975 uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1976 uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
1977 uint64_t const kDefaultBufferLength =
1978 computeBufferLength(data->computeChainDataLength(), kBlockSize);
1980 auto out = addOutputBuffer(
1982 ((uncompressedLength && *uncompressedLength <= kMaxSingleStepLength)
1983 ? *uncompressedLength
1984 : kDefaultBufferLength));
1987 for (auto range : *data) {
1988 while (!range.empty()) {
1989 auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1991 const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1992 stream.avail_in = inSize;
1994 if (stream.avail_out == 0) {
1995 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1998 rc = bzCheck(BZ2_bzDecompress(&stream));
1999 range.uncheckedAdvance(inSize - stream.avail_in);
2002 while (rc != BZ_STREAM_END) {
2003 if (stream.avail_out == 0) {
2004 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
2007 rc = bzCheck(BZ2_bzDecompress(&stream));
2010 out->prev()->trimEnd(stream.avail_out);
2012 uint64_t const totalOut =
2013 (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
2014 if (uncompressedLength && uncompressedLength != totalOut) {
2015 throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
2021 #endif // FOLLY_HAVE_LIBBZ2
2024 * Automatic decompression
2026 class AutomaticCodec final : public Codec {
2028 static std::unique_ptr<Codec> create(
2029 std::vector<std::unique_ptr<Codec>> customCodecs);
2030 explicit AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs);
2032 std::vector<std::string> validPrefixes() const override;
2033 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
2037 bool doNeedsUncompressedLength() const override;
2038 uint64_t doMaxUncompressedLength() const override;
2040 uint64_t doMaxCompressedLength(uint64_t) const override {
2041 throw std::runtime_error(
2042 "AutomaticCodec error: maxCompressedLength() not supported.");
2044 std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
2045 throw std::runtime_error("AutomaticCodec error: compress() not supported.");
2047 std::unique_ptr<IOBuf> doUncompress(
2049 Optional<uint64_t> uncompressedLength) override;
2051 void addCodecIfSupported(CodecType type);
2053 // Throws iff the codecs aren't compatible (very slow)
2054 void checkCompatibleCodecs() const;
2056 std::vector<std::unique_ptr<Codec>> codecs_;
2057 bool needsUncompressedLength_;
2058 uint64_t maxUncompressedLength_;
2061 std::vector<std::string> AutomaticCodec::validPrefixes() const {
2062 std::unordered_set<std::string> prefixes;
2063 for (const auto& codec : codecs_) {
2064 const auto codecPrefixes = codec->validPrefixes();
2065 prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
2067 return std::vector<std::string>{prefixes.begin(), prefixes.end()};
2070 bool AutomaticCodec::canUncompress(
2072 Optional<uint64_t> uncompressedLength) const {
2076 [data, uncompressedLength](std::unique_ptr<Codec> const& codec) {
2077 return codec->canUncompress(data, uncompressedLength);
2081 void AutomaticCodec::addCodecIfSupported(CodecType type) {
2082 const bool present = std::any_of(
2085 [&type](std::unique_ptr<Codec> const& codec) {
2086 return codec->type() == type;
2088 if (hasCodec(type) && !present) {
2089 codecs_.push_back(getCodec(type));
2093 /* static */ std::unique_ptr<Codec> AutomaticCodec::create(
2094 std::vector<std::unique_ptr<Codec>> customCodecs) {
2095 return std::make_unique<AutomaticCodec>(std::move(customCodecs));
2098 AutomaticCodec::AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs)
2099 : Codec(CodecType::USER_DEFINED), codecs_(std::move(customCodecs)) {
2100 // Fastest -> slowest
2101 addCodecIfSupported(CodecType::LZ4_FRAME);
2102 addCodecIfSupported(CodecType::ZSTD);
2103 addCodecIfSupported(CodecType::ZLIB);
2104 addCodecIfSupported(CodecType::GZIP);
2105 addCodecIfSupported(CodecType::LZMA2);
2106 addCodecIfSupported(CodecType::BZIP2);
2108 checkCompatibleCodecs();
2110 // Check that none of the codes are are null
2111 DCHECK(std::none_of(
2112 codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
2113 return codec == nullptr;
2116 needsUncompressedLength_ = std::any_of(
2117 codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
2118 return codec->needsUncompressedLength();
2121 const auto it = std::max_element(
2124 [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) {
2125 return lhs->maxUncompressedLength() < rhs->maxUncompressedLength();
2127 DCHECK(it != codecs_.end());
2128 maxUncompressedLength_ = (*it)->maxUncompressedLength();
2131 void AutomaticCodec::checkCompatibleCodecs() const {
2132 // Keep track of all the possible headers.
2133 std::unordered_set<std::string> headers;
2134 // The empty header is not allowed.
2137 // Construct a set of headers and check that none of the headers occur twice.
2138 // Eliminate edge cases.
2139 for (auto&& codec : codecs_) {
2140 const auto codecHeaders = codec->validPrefixes();
2141 // Codecs without any valid headers are not allowed.
2142 if (codecHeaders.empty()) {
2143 throw std::invalid_argument{
2144 "AutomaticCodec: validPrefixes() must not be empty."};
2146 // Insert all the headers for the current codec.
2147 const size_t beforeSize = headers.size();
2148 headers.insert(codecHeaders.begin(), codecHeaders.end());
2149 // Codecs are not compatible if any header occurred twice.
2150 if (beforeSize + codecHeaders.size() != headers.size()) {
2151 throw std::invalid_argument{
2152 "AutomaticCodec: Two valid prefixes collide."};
2156 // Check if any strict non-empty prefix of any header is a header.
2157 for (const auto& header : headers) {
2158 for (size_t i = 1; i < header.size(); ++i) {
2159 if (headers.count(header.substr(0, i))) {
2160 throw std::invalid_argument{
2161 "AutomaticCodec: One valid prefix is a prefix of another valid "
2168 bool AutomaticCodec::doNeedsUncompressedLength() const {
2169 return needsUncompressedLength_;
2172 uint64_t AutomaticCodec::doMaxUncompressedLength() const {
2173 return maxUncompressedLength_;
2176 std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
2178 Optional<uint64_t> uncompressedLength) {
2179 for (auto&& codec : codecs_) {
2180 if (codec->canUncompress(data, uncompressedLength)) {
2181 return codec->uncompress(data, uncompressedLength);
2184 throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
2187 using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
2188 using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
2191 StreamCodecFactory stream;
2195 codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
2197 {NoCompressionCodec::create, nullptr},
2199 #if FOLLY_HAVE_LIBLZ4
2200 {LZ4Codec::create, nullptr},
2205 #if FOLLY_HAVE_LIBSNAPPY
2206 {SnappyCodec::create, nullptr},
2212 {ZlibCodec::create, nullptr},
2217 #if FOLLY_HAVE_LIBLZ4
2218 {LZ4Codec::create, nullptr},
2223 #if FOLLY_HAVE_LIBLZMA
2224 {LZMA2Codec::create, nullptr},
2225 {LZMA2Codec::create, nullptr},
2231 #if FOLLY_HAVE_LIBZSTD
2232 {ZSTDCodec::create, nullptr},
2238 {ZlibCodec::create, nullptr},
2243 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
2244 {LZ4FrameCodec::create, nullptr},
2249 #if FOLLY_HAVE_LIBBZ2
2250 {Bzip2Codec::create, nullptr},
2256 Factory const& getFactory(CodecType type) {
2257 size_t const idx = static_cast<size_t>(type);
2258 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
2259 throw std::invalid_argument(
2260 to<std::string>("Compression type ", idx, " invalid"));
2262 return codecFactories[idx];
2266 bool hasCodec(CodecType type) {
2267 return getFactory(type).codec != nullptr;
2270 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
2271 auto const factory = getFactory(type).codec;
2273 throw std::invalid_argument(
2274 to<std::string>("Compression type ", type, " not supported"));
2276 auto codec = (*factory)(level, type);
2277 DCHECK(codec->type() == type);
2281 bool hasStreamCodec(CodecType type) {
2282 return getFactory(type).stream != nullptr;
2285 std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
2286 auto const factory = getFactory(type).stream;
2288 throw std::invalid_argument(
2289 to<std::string>("Compression type ", type, " not supported"));
2291 auto codec = (*factory)(level, type);
2292 DCHECK(codec->type() == type);
2296 std::unique_ptr<Codec> getAutoUncompressionCodec(
2297 std::vector<std::unique_ptr<Codec>> customCodecs) {
2298 return AutomaticCodec::create(std::move(customCodecs));