2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
22 #if LZ4_VERSION_NUMBER >= 10301
27 #include <glog/logging.h>
29 #if FOLLY_HAVE_LIBSNAPPY
30 #include <snappy-sinksource.h>
35 #include <folly/io/compression/Zlib.h>
38 #if FOLLY_HAVE_LIBLZMA
42 #if FOLLY_HAVE_LIBZSTD
43 #define ZSTD_STATIC_LINKING_ONLY
51 #include <folly/Bits.h>
52 #include <folly/Conv.h>
53 #include <folly/Memory.h>
54 #include <folly/Portability.h>
55 #include <folly/ScopeGuard.h>
56 #include <folly/Varint.h>
57 #include <folly/io/Cursor.h>
58 #include <folly/io/compression/Utils.h>
60 #include <unordered_set>
62 using folly::io::compression::detail::dataStartsWithLE;
63 using folly::io::compression::detail::prefixToStringLE;
65 namespace zlib = folly::io::zlib;
70 Codec::Codec(CodecType type) : type_(type) { }
72 // Ensure consistent behavior in the nullptr case
73 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
74 if (data == nullptr) {
75 throw std::invalid_argument("Codec: data must not be nullptr");
77 uint64_t len = data->computeChainDataLength();
79 return IOBuf::create(0);
81 if (len > maxUncompressedLength()) {
82 throw std::runtime_error("Codec: uncompressed length too large");
85 return doCompress(data);
88 std::string Codec::compress(const StringPiece data) {
89 const uint64_t len = data.size();
93 if (len > maxUncompressedLength()) {
94 throw std::runtime_error("Codec: uncompressed length too large");
97 return doCompressString(data);
100 std::unique_ptr<IOBuf> Codec::uncompress(
102 Optional<uint64_t> uncompressedLength) {
103 if (data == nullptr) {
104 throw std::invalid_argument("Codec: data must not be nullptr");
106 if (!uncompressedLength) {
107 if (needsUncompressedLength()) {
108 throw std::invalid_argument("Codec: uncompressed length required");
110 } else if (*uncompressedLength > maxUncompressedLength()) {
111 throw std::runtime_error("Codec: uncompressed length too large");
115 if (uncompressedLength.value_or(0) != 0) {
116 throw std::runtime_error("Codec: invalid uncompressed length");
118 return IOBuf::create(0);
121 return doUncompress(data, uncompressedLength);
124 std::string Codec::uncompress(
125 const StringPiece data,
126 Optional<uint64_t> uncompressedLength) {
127 if (!uncompressedLength) {
128 if (needsUncompressedLength()) {
129 throw std::invalid_argument("Codec: uncompressed length required");
131 } else if (*uncompressedLength > maxUncompressedLength()) {
132 throw std::runtime_error("Codec: uncompressed length too large");
136 if (uncompressedLength.value_or(0) != 0) {
137 throw std::runtime_error("Codec: invalid uncompressed length");
142 return doUncompressString(data, uncompressedLength);
145 bool Codec::needsUncompressedLength() const {
146 return doNeedsUncompressedLength();
149 uint64_t Codec::maxUncompressedLength() const {
150 return doMaxUncompressedLength();
153 bool Codec::doNeedsUncompressedLength() const {
157 uint64_t Codec::doMaxUncompressedLength() const {
158 return UNLIMITED_UNCOMPRESSED_LENGTH;
161 std::vector<std::string> Codec::validPrefixes() const {
165 bool Codec::canUncompress(const IOBuf*, Optional<uint64_t>) const {
169 std::string Codec::doCompressString(const StringPiece data) {
170 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
171 auto outputBuffer = doCompress(&inputBuffer);
173 output.reserve(outputBuffer->computeChainDataLength());
174 for (auto range : *outputBuffer) {
175 output.append(reinterpret_cast<const char*>(range.data()), range.size());
180 std::string Codec::doUncompressString(
181 const StringPiece data,
182 Optional<uint64_t> uncompressedLength) {
183 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
184 auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
186 output.reserve(outputBuffer->computeChainDataLength());
187 for (auto range : *outputBuffer) {
188 output.append(reinterpret_cast<const char*>(range.data()), range.size());
193 uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
194 if (uncompressedLength == 0) {
197 return doMaxCompressedLength(uncompressedLength);
200 Optional<uint64_t> Codec::getUncompressedLength(
201 const folly::IOBuf* data,
202 Optional<uint64_t> uncompressedLength) const {
203 auto const compressedLength = data->computeChainDataLength();
204 if (uncompressedLength == uint64_t(0) || compressedLength == 0) {
205 if (uncompressedLength.value_or(0) != 0 || compressedLength != 0) {
206 throw std::runtime_error("Invalid uncompressed length");
210 return doGetUncompressedLength(data, uncompressedLength);
213 Optional<uint64_t> Codec::doGetUncompressedLength(
215 Optional<uint64_t> uncompressedLength) const {
216 return uncompressedLength;
219 bool StreamCodec::needsDataLength() const {
220 return doNeedsDataLength();
223 bool StreamCodec::doNeedsDataLength() const {
227 void StreamCodec::assertStateIs(State expected) const {
228 if (state_ != expected) {
229 throw std::logic_error(folly::to<std::string>(
230 "Codec: state is ", state_, "; expected state ", expected));
234 void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
235 state_ = State::RESET;
236 uncompressedLength_ = uncompressedLength;
237 progressMade_ = true;
241 bool StreamCodec::compressStream(
243 MutableByteRange& output,
244 StreamCodec::FlushOp flushOp) {
245 if (state_ == State::RESET && input.empty()) {
246 if (flushOp == StreamCodec::FlushOp::NONE) {
249 if (flushOp == StreamCodec::FlushOp::END &&
250 uncompressedLength().value_or(0) != 0) {
251 throw std::runtime_error("Codec: invalid uncompressed length");
255 if (!uncompressedLength() && needsDataLength()) {
256 throw std::runtime_error("Codec: uncompressed length required");
258 if (state_ == State::RESET && !input.empty() &&
259 uncompressedLength() == uint64_t(0)) {
260 throw std::runtime_error("Codec: invalid uncompressed length");
262 // Handle input state transitions
264 case StreamCodec::FlushOp::NONE:
265 if (state_ == State::RESET) {
266 state_ = State::COMPRESS;
268 assertStateIs(State::COMPRESS);
270 case StreamCodec::FlushOp::FLUSH:
271 if (state_ == State::RESET || state_ == State::COMPRESS) {
272 state_ = State::COMPRESS_FLUSH;
274 assertStateIs(State::COMPRESS_FLUSH);
276 case StreamCodec::FlushOp::END:
277 if (state_ == State::RESET || state_ == State::COMPRESS) {
278 state_ = State::COMPRESS_END;
280 assertStateIs(State::COMPRESS_END);
283 size_t const inputSize = input.size();
284 size_t const outputSize = output.size();
285 bool const done = doCompressStream(input, output, flushOp);
286 if (!done && inputSize == input.size() && outputSize == output.size()) {
287 if (!progressMade_) {
288 throw std::runtime_error("Codec: No forward progress made");
290 // Throw an exception if there is no progress again next time
291 progressMade_ = false;
293 progressMade_ = true;
295 // Handle output state transitions
297 if (state_ == State::COMPRESS_FLUSH) {
298 state_ = State::COMPRESS;
299 } else if (state_ == State::COMPRESS_END) {
302 // Check internal invariants
303 DCHECK(input.empty());
304 DCHECK(flushOp != StreamCodec::FlushOp::NONE);
309 bool StreamCodec::uncompressStream(
311 MutableByteRange& output,
312 StreamCodec::FlushOp flushOp) {
313 if (state_ == State::RESET && input.empty()) {
314 if (uncompressedLength().value_or(0) == 0) {
319 // Handle input state transitions
320 if (state_ == State::RESET) {
321 state_ = State::UNCOMPRESS;
323 assertStateIs(State::UNCOMPRESS);
324 size_t const inputSize = input.size();
325 size_t const outputSize = output.size();
326 bool const done = doUncompressStream(input, output, flushOp);
327 if (!done && inputSize == input.size() && outputSize == output.size()) {
328 if (!progressMade_) {
329 throw std::runtime_error("Codec: no forward progress made");
331 // Throw an exception if there is no progress again next time
332 progressMade_ = false;
334 progressMade_ = true;
336 // Handle output state transitions
343 static std::unique_ptr<IOBuf> addOutputBuffer(
344 MutableByteRange& output,
346 DCHECK(output.empty());
347 auto buffer = IOBuf::create(size);
348 buffer->append(buffer->capacity());
349 output = {buffer->writableData(), buffer->length()};
353 std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
354 uint64_t const uncompressedLength = data->computeChainDataLength();
355 resetStream(uncompressedLength);
356 uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
358 auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
359 auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
361 MutableByteRange output;
362 auto buffer = addOutputBuffer(
364 maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
365 : kDefaultBufferLength);
367 // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
368 IOBuf const* current = data;
369 ByteRange input{current->data(), current->length()};
370 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
373 while (input.empty() && current->next() != data) {
374 current = current->next();
375 input = {current->data(), current->length()};
377 if (current->next() == data) {
378 // This is the last input buffer so end the stream
379 flushOp = StreamCodec::FlushOp::END;
381 if (output.empty()) {
382 buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
384 done = compressStream(input, output, flushOp);
386 DCHECK(input.empty());
387 DCHECK(flushOp == StreamCodec::FlushOp::END);
388 DCHECK_EQ(current->next(), data);
391 buffer->prev()->trimEnd(output.size());
395 static uint64_t computeBufferLength(
396 uint64_t const compressedLength,
397 uint64_t const blockSize) {
398 uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
399 uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
400 return std::min(goodBufferSize, kMaxBufferLength);
403 std::unique_ptr<IOBuf> StreamCodec::doUncompress(
405 Optional<uint64_t> uncompressedLength) {
406 auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
407 auto constexpr kBlockSize = uint64_t(128) << 10;
408 auto const defaultBufferLength =
409 computeBufferLength(data->computeChainDataLength(), kBlockSize);
411 uncompressedLength = getUncompressedLength(data, uncompressedLength);
412 resetStream(uncompressedLength);
414 MutableByteRange output;
415 auto buffer = addOutputBuffer(
417 (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
418 ? *uncompressedLength
419 : defaultBufferLength));
421 // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
422 IOBuf const* current = data;
423 ByteRange input{current->data(), current->length()};
424 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
427 while (input.empty() && current->next() != data) {
428 current = current->next();
429 input = {current->data(), current->length()};
431 if (current->next() == data) {
432 // Tell the uncompressor there is no more input (it may optimize)
433 flushOp = StreamCodec::FlushOp::END;
435 if (output.empty()) {
436 buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
438 done = uncompressStream(input, output, flushOp);
440 if (!input.empty()) {
441 throw std::runtime_error("Codec: Junk after end of data");
444 buffer->prev()->trimEnd(output.size());
445 if (uncompressedLength &&
446 *uncompressedLength != buffer->computeChainDataLength()) {
447 throw std::runtime_error("Codec: invalid uncompressed length");
458 class NoCompressionCodec final : public Codec {
460 static std::unique_ptr<Codec> create(int level, CodecType type);
461 explicit NoCompressionCodec(int level, CodecType type);
464 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
465 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
466 std::unique_ptr<IOBuf> doUncompress(
468 Optional<uint64_t> uncompressedLength) override;
471 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
472 return std::make_unique<NoCompressionCodec>(level, type);
475 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
477 DCHECK(type == CodecType::NO_COMPRESSION);
479 case COMPRESSION_LEVEL_DEFAULT:
480 case COMPRESSION_LEVEL_FASTEST:
481 case COMPRESSION_LEVEL_BEST:
485 throw std::invalid_argument(to<std::string>(
486 "NoCompressionCodec: invalid level ", level));
490 uint64_t NoCompressionCodec::doMaxCompressedLength(
491 uint64_t uncompressedLength) const {
492 return uncompressedLength;
495 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
497 return data->clone();
500 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
502 Optional<uint64_t> uncompressedLength) {
503 if (uncompressedLength &&
504 data->computeChainDataLength() != *uncompressedLength) {
505 throw std::runtime_error(
506 to<std::string>("NoCompressionCodec: invalid uncompressed length"));
508 return data->clone();
511 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
515 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
516 DCHECK_GE(out->tailroom(), kMaxVarintLength64);
517 out->append(encodeVarint(val, out->writableTail()));
520 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
523 for (int shift = 0; shift <= 63; shift += 7) {
524 b = cursor.read<int8_t>();
525 val |= static_cast<uint64_t>(b & 0x7f) << shift;
531 throw std::invalid_argument("Invalid varint value. Too big.");
538 #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
540 #if FOLLY_HAVE_LIBLZ4
545 class LZ4Codec final : public Codec {
547 static std::unique_ptr<Codec> create(int level, CodecType type);
548 explicit LZ4Codec(int level, CodecType type);
551 bool doNeedsUncompressedLength() const override;
552 uint64_t doMaxUncompressedLength() const override;
553 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
555 bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
557 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
558 std::unique_ptr<IOBuf> doUncompress(
560 Optional<uint64_t> uncompressedLength) override;
562 bool highCompression_;
565 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
566 return std::make_unique<LZ4Codec>(level, type);
569 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
570 DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
573 case COMPRESSION_LEVEL_FASTEST:
574 case COMPRESSION_LEVEL_DEFAULT:
577 case COMPRESSION_LEVEL_BEST:
581 if (level < 1 || level > 2) {
582 throw std::invalid_argument(to<std::string>(
583 "LZ4Codec: invalid level: ", level));
585 highCompression_ = (level > 1);
588 bool LZ4Codec::doNeedsUncompressedLength() const {
589 return !encodeSize();
592 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
593 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
595 #ifndef LZ4_MAX_INPUT_SIZE
596 # define LZ4_MAX_INPUT_SIZE 0x7E000000
599 uint64_t LZ4Codec::doMaxUncompressedLength() const {
600 return LZ4_MAX_INPUT_SIZE;
603 uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
604 return LZ4_compressBound(uncompressedLength) +
605 (encodeSize() ? kMaxVarintLength64 : 0);
608 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
610 if (data->isChained()) {
611 // LZ4 doesn't support streaming, so we have to coalesce
612 clone = data->cloneCoalescedAsValue();
616 auto out = IOBuf::create(maxCompressedLength(data->length()));
618 encodeVarintToIOBuf(data->length(), out.get());
622 auto input = reinterpret_cast<const char*>(data->data());
623 auto output = reinterpret_cast<char*>(out->writableTail());
624 const auto inputLength = data->length();
625 #if LZ4_VERSION_NUMBER >= 10700
626 if (highCompression_) {
627 n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
629 n = LZ4_compress_default(input, output, inputLength, out->tailroom());
632 if (highCompression_) {
633 n = LZ4_compressHC(input, output, inputLength);
635 n = LZ4_compress(input, output, inputLength);
640 CHECK_LE(n, out->capacity());
646 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
648 Optional<uint64_t> uncompressedLength) {
650 if (data->isChained()) {
651 // LZ4 doesn't support streaming, so we have to coalesce
652 clone = data->cloneCoalescedAsValue();
656 folly::io::Cursor cursor(data);
657 uint64_t actualUncompressedLength;
659 actualUncompressedLength = decodeVarintFromCursor(cursor);
660 if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
661 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
665 DCHECK(uncompressedLength.hasValue());
666 DCHECK(*uncompressedLength <= maxUncompressedLength());
667 actualUncompressedLength = *uncompressedLength;
670 auto sp = StringPiece{cursor.peekBytes()};
671 auto out = IOBuf::create(actualUncompressedLength);
672 int n = LZ4_decompress_safe(
674 reinterpret_cast<char*>(out->writableTail()),
676 actualUncompressedLength);
678 if (n < 0 || uint64_t(n) != actualUncompressedLength) {
679 throw std::runtime_error(to<std::string>(
680 "LZ4 decompression returned invalid value ", n));
682 out->append(actualUncompressedLength);
686 #if LZ4_VERSION_NUMBER >= 10301
688 class LZ4FrameCodec final : public Codec {
690 static std::unique_ptr<Codec> create(int level, CodecType type);
691 explicit LZ4FrameCodec(int level, CodecType type);
692 ~LZ4FrameCodec() override;
694 std::vector<std::string> validPrefixes() const override;
695 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
699 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
701 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
702 std::unique_ptr<IOBuf> doUncompress(
704 Optional<uint64_t> uncompressedLength) override;
706 // Reset the dctx_ if it is dirty or null.
710 LZ4F_decompressionContext_t dctx_{nullptr};
714 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
717 return std::make_unique<LZ4FrameCodec>(level, type);
720 static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204;
722 std::vector<std::string> LZ4FrameCodec::validPrefixes() const {
723 return {prefixToStringLE(kLZ4FrameMagicLE)};
726 bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
727 return dataStartsWithLE(data, kLZ4FrameMagicLE);
730 uint64_t LZ4FrameCodec::doMaxCompressedLength(
731 uint64_t uncompressedLength) const {
732 LZ4F_preferences_t prefs{};
733 prefs.compressionLevel = level_;
734 prefs.frameInfo.contentSize = uncompressedLength;
735 return LZ4F_compressFrameBound(uncompressedLength, &prefs);
738 static size_t lz4FrameThrowOnError(size_t code) {
739 if (LZ4F_isError(code)) {
740 throw std::runtime_error(
741 to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
746 void LZ4FrameCodec::resetDCtx() {
747 if (dctx_ && !dirty_) {
751 LZ4F_freeDecompressionContext(dctx_);
753 lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
757 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
758 DCHECK(type == CodecType::LZ4_FRAME);
760 case COMPRESSION_LEVEL_FASTEST:
761 case COMPRESSION_LEVEL_DEFAULT:
764 case COMPRESSION_LEVEL_BEST:
773 LZ4FrameCodec::~LZ4FrameCodec() {
775 LZ4F_freeDecompressionContext(dctx_);
779 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
780 // LZ4 Frame compression doesn't support streaming so we have to coalesce
782 if (data->isChained()) {
783 clone = data->cloneCoalescedAsValue();
787 const auto uncompressedLength = data->length();
788 LZ4F_preferences_t prefs{};
789 prefs.compressionLevel = level_;
790 prefs.frameInfo.contentSize = uncompressedLength;
792 auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
793 const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
799 buf->append(written);
803 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
805 Optional<uint64_t> uncompressedLength) {
806 // Reset the dctx if any errors have occurred
809 ByteRange in = *data->begin();
811 if (data->isChained()) {
812 clone = data->cloneCoalescedAsValue();
813 in = clone.coalesce();
816 // Select decompression options
817 LZ4F_decompressOptions_t options;
818 options.stableDst = 1;
819 // Select blockSize and growthSize for the IOBufQueue
820 IOBufQueue queue(IOBufQueue::cacheChainLength());
821 auto blockSize = uint64_t{64} << 10;
822 auto growthSize = uint64_t{4} << 20;
823 if (uncompressedLength) {
824 // Allocate uncompressedLength in one chunk (up to 64 MB)
825 const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20);
826 queue.preallocate(allocateSize, allocateSize);
827 blockSize = std::min(*uncompressedLength, blockSize);
828 growthSize = std::min(*uncompressedLength, growthSize);
830 // Reduce growthSize for small data
831 const auto guessUncompressedLen =
832 4 * std::max<uint64_t>(blockSize, in.size());
833 growthSize = std::min(guessUncompressedLen, growthSize);
835 // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
838 // Decompress until the frame is over
841 // Allocate enough space to decompress at least a block
844 std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
846 size_t inSize = in.size();
847 code = lz4FrameThrowOnError(
848 LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
849 if (in.empty() && outSize == 0 && code != 0) {
850 // We passed no input, no output was produced, and the frame isn't over
851 // No more forward progress is possible
852 throw std::runtime_error("LZ4Frame error: Incomplete frame");
854 in.uncheckedAdvance(inSize);
855 queue.postallocate(outSize);
857 // At this point the decompression context can be reused
859 if (uncompressedLength && queue.chainLength() != *uncompressedLength) {
860 throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
865 #endif // LZ4_VERSION_NUMBER >= 10301
866 #endif // FOLLY_HAVE_LIBLZ4
868 #if FOLLY_HAVE_LIBSNAPPY
875 * Implementation of snappy::Source that reads from a IOBuf chain.
877 class IOBufSnappySource final : public snappy::Source {
879 explicit IOBufSnappySource(const IOBuf* data);
880 size_t Available() const override;
881 const char* Peek(size_t* len) override;
882 void Skip(size_t n) override;
888 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
889 : available_(data->computeChainDataLength()),
893 size_t IOBufSnappySource::Available() const {
897 const char* IOBufSnappySource::Peek(size_t* len) {
898 auto sp = StringPiece{cursor_.peekBytes()};
903 void IOBufSnappySource::Skip(size_t n) {
904 CHECK_LE(n, available_);
909 class SnappyCodec final : public Codec {
911 static std::unique_ptr<Codec> create(int level, CodecType type);
912 explicit SnappyCodec(int level, CodecType type);
915 uint64_t doMaxUncompressedLength() const override;
916 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
917 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
918 std::unique_ptr<IOBuf> doUncompress(
920 Optional<uint64_t> uncompressedLength) override;
923 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
924 return std::make_unique<SnappyCodec>(level, type);
927 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
928 DCHECK(type == CodecType::SNAPPY);
930 case COMPRESSION_LEVEL_FASTEST:
931 case COMPRESSION_LEVEL_DEFAULT:
932 case COMPRESSION_LEVEL_BEST:
936 throw std::invalid_argument(to<std::string>(
937 "SnappyCodec: invalid level: ", level));
941 uint64_t SnappyCodec::doMaxUncompressedLength() const {
942 // snappy.h uses uint32_t for lengths, so there's that.
943 return std::numeric_limits<uint32_t>::max();
946 uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
947 return snappy::MaxCompressedLength(uncompressedLength);
950 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
951 IOBufSnappySource source(data);
952 auto out = IOBuf::create(maxCompressedLength(source.Available()));
954 snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
955 out->writableTail()));
957 size_t n = snappy::Compress(&source, &sink);
959 CHECK_LE(n, out->capacity());
964 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(
966 Optional<uint64_t> uncompressedLength) {
967 uint32_t actualUncompressedLength = 0;
970 IOBufSnappySource source(data);
971 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
972 throw std::runtime_error("snappy::GetUncompressedLength failed");
974 if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
975 throw std::runtime_error("snappy: invalid uncompressed length");
979 auto out = IOBuf::create(actualUncompressedLength);
982 IOBufSnappySource source(data);
983 if (!snappy::RawUncompress(&source,
984 reinterpret_cast<char*>(out->writableTail()))) {
985 throw std::runtime_error("snappy::RawUncompress failed");
989 out->append(actualUncompressedLength);
993 #endif // FOLLY_HAVE_LIBSNAPPY
995 #if FOLLY_HAVE_LIBLZMA
1000 class LZMA2StreamCodec final : public StreamCodec {
1002 static std::unique_ptr<Codec> createCodec(int level, CodecType type);
1003 static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
1004 explicit LZMA2StreamCodec(int level, CodecType type);
1005 ~LZMA2StreamCodec() override;
1007 std::vector<std::string> validPrefixes() const override;
1008 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1012 bool doNeedsDataLength() const override;
1013 uint64_t doMaxUncompressedLength() const override;
1014 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1016 bool encodeSize() const {
1017 return type() == CodecType::LZMA2_VARINT_SIZE;
1020 void doResetStream() override;
1021 bool doCompressStream(
1023 MutableByteRange& output,
1024 StreamCodec::FlushOp flushOp) override;
1025 bool doUncompressStream(
1027 MutableByteRange& output,
1028 StreamCodec::FlushOp flushOp) override;
1030 void resetCStream();
1031 void resetDStream();
1033 size_t decodeVarint(ByteRange& input);
1034 bool flushVarintBuffer(MutableByteRange& output);
1035 void resetVarintBuffer();
1037 Optional<lzma_stream> cstream_{};
1038 Optional<lzma_stream> dstream_{};
1040 std::array<uint8_t, kMaxVarintLength64> varintBuffer_;
1041 ByteRange varintToEncode_;
1042 size_t varintBufferPos_{0};
1045 bool needReset_{true};
1046 bool needDecodeSize_{false};
1049 static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD;
1050 static constexpr unsigned kLZMA2MagicBytes = 6;
1052 std::vector<std::string> LZMA2StreamCodec::validPrefixes() const {
1053 if (type() == CodecType::LZMA2_VARINT_SIZE) {
1056 return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)};
1059 bool LZMA2StreamCodec::doNeedsDataLength() const {
1060 return encodeSize();
1063 bool LZMA2StreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1065 if (type() == CodecType::LZMA2_VARINT_SIZE) {
1068 // Returns false for all inputs less than 8 bytes.
1069 // This is okay, because no valid LZMA2 streams are less than 8 bytes.
1070 return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes);
1073 std::unique_ptr<Codec> LZMA2StreamCodec::createCodec(
1076 return make_unique<LZMA2StreamCodec>(level, type);
1079 std::unique_ptr<StreamCodec> LZMA2StreamCodec::createStream(
1082 return make_unique<LZMA2StreamCodec>(level, type);
1085 LZMA2StreamCodec::LZMA2StreamCodec(int level, CodecType type)
1086 : StreamCodec(type) {
1087 DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
1089 case COMPRESSION_LEVEL_FASTEST:
1092 case COMPRESSION_LEVEL_DEFAULT:
1093 level = LZMA_PRESET_DEFAULT;
1095 case COMPRESSION_LEVEL_BEST:
1099 if (level < 0 || level > 9) {
1100 throw std::invalid_argument(
1101 to<std::string>("LZMA2Codec: invalid level: ", level));
1106 LZMA2StreamCodec::~LZMA2StreamCodec() {
1108 lzma_end(cstream_.get_pointer());
1112 lzma_end(dstream_.get_pointer());
1117 uint64_t LZMA2StreamCodec::doMaxUncompressedLength() const {
1118 // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
1119 return uint64_t(1) << 63;
1122 uint64_t LZMA2StreamCodec::doMaxCompressedLength(
1123 uint64_t uncompressedLength) const {
1124 return lzma_stream_buffer_bound(uncompressedLength) +
1125 (encodeSize() ? kMaxVarintLength64 : 0);
1128 void LZMA2StreamCodec::doResetStream() {
1132 void LZMA2StreamCodec::resetCStream() {
1134 cstream_.assign(LZMA_STREAM_INIT);
1137 lzma_easy_encoder(cstream_.get_pointer(), level_, LZMA_CHECK_NONE);
1138 if (rc != LZMA_OK) {
1139 throw std::runtime_error(folly::to<std::string>(
1140 "LZMA2StreamCodec: lzma_easy_encoder error: ", rc));
1144 void LZMA2StreamCodec::resetDStream() {
1146 dstream_.assign(LZMA_STREAM_INIT);
1148 lzma_ret const rc = lzma_auto_decoder(
1149 dstream_.get_pointer(), std::numeric_limits<uint64_t>::max(), 0);
1150 if (rc != LZMA_OK) {
1151 throw std::runtime_error(folly::to<std::string>(
1152 "LZMA2StreamCodec: lzma_auto_decoder error: ", rc));
1156 static lzma_ret lzmaThrowOnError(lzma_ret const rc) {
1159 case LZMA_STREAM_END:
1160 case LZMA_BUF_ERROR: // not fatal: returned if no progress was made twice
1163 throw std::runtime_error(
1164 to<std::string>("LZMA2StreamCodec: error: ", rc));
1168 static lzma_action lzmaTranslateFlush(StreamCodec::FlushOp flush) {
1170 case StreamCodec::FlushOp::NONE:
1172 case StreamCodec::FlushOp::FLUSH:
1173 return LZMA_SYNC_FLUSH;
1174 case StreamCodec::FlushOp::END:
1177 throw std::invalid_argument("LZMA2StreamCodec: Invalid flush");
1182 * Flushes the varint buffer.
1183 * Advances output by the number of bytes written.
1184 * Returns true when flushing is complete.
1186 bool LZMA2StreamCodec::flushVarintBuffer(MutableByteRange& output) {
1187 if (varintToEncode_.empty()) {
1190 const size_t numBytesToCopy = std::min(varintToEncode_.size(), output.size());
1191 if (numBytesToCopy > 0) {
1192 memcpy(output.data(), varintToEncode_.data(), numBytesToCopy);
1194 varintToEncode_.advance(numBytesToCopy);
1195 output.advance(numBytesToCopy);
1196 return varintToEncode_.empty();
1199 bool LZMA2StreamCodec::doCompressStream(
1201 MutableByteRange& output,
1202 StreamCodec::FlushOp flushOp) {
1206 varintBufferPos_ = 0;
1207 size_t const varintSize =
1208 encodeVarint(*uncompressedLength(), varintBuffer_.data());
1209 varintToEncode_ = {varintBuffer_.data(), varintSize};
1214 if (!flushVarintBuffer(output)) {
1218 cstream_->next_in = const_cast<uint8_t*>(input.data());
1219 cstream_->avail_in = input.size();
1220 cstream_->next_out = output.data();
1221 cstream_->avail_out = output.size();
1223 input.uncheckedAdvance(input.size() - cstream_->avail_in);
1224 output.uncheckedAdvance(output.size() - cstream_->avail_out);
1226 lzma_ret const rc = lzmaThrowOnError(
1227 lzma_code(cstream_.get_pointer(), lzmaTranslateFlush(flushOp)));
1229 case StreamCodec::FlushOp::NONE:
1231 case StreamCodec::FlushOp::FLUSH:
1232 return cstream_->avail_in == 0 && cstream_->avail_out != 0;
1233 case StreamCodec::FlushOp::END:
1234 return rc == LZMA_STREAM_END;
1236 throw std::invalid_argument("LZMA2StreamCodec: invalid FlushOp");
1241 * Attempts to decode a varint from input.
1242 * The function advances input by the number of bytes read.
1244 * If there are too many bytes and the varint is not valid, throw a
1246 * Returns the decoded size or 0 if more bytes are needed.
1248 size_t LZMA2StreamCodec::decodeVarint(ByteRange& input) {
1249 if (input.empty()) {
1252 size_t const numBytesToCopy =
1253 std::min(kMaxVarintLength64 - varintBufferPos_, input.size());
1254 memcpy(varintBuffer_.data() + varintBufferPos_, input.data(), numBytesToCopy);
1256 size_t const rangeSize = varintBufferPos_ + numBytesToCopy;
1257 ByteRange range{varintBuffer_.data(), rangeSize};
1258 auto const ret = tryDecodeVarint(range);
1260 if (ret.hasValue()) {
1261 size_t const varintSize = rangeSize - range.size();
1262 input.advance(varintSize - varintBufferPos_);
1264 } else if (ret.error() == DecodeVarintError::TooManyBytes) {
1265 throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1268 input.advance(numBytesToCopy);
1269 varintBufferPos_ += numBytesToCopy;
1274 bool LZMA2StreamCodec::doUncompressStream(
1276 MutableByteRange& output,
1277 StreamCodec::FlushOp flushOp) {
1281 needDecodeSize_ = encodeSize();
1284 varintBufferPos_ = 0;
1288 if (needDecodeSize_) {
1289 // Try decoding the varint. If the input does not contain the entire varint,
1290 // buffer the input. If the varint can not be decoded, fail.
1291 size_t const size = decodeVarint(input);
1295 if (uncompressedLength() && *uncompressedLength() != size) {
1296 throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1298 needDecodeSize_ = false;
1301 dstream_->next_in = const_cast<uint8_t*>(input.data());
1302 dstream_->avail_in = input.size();
1303 dstream_->next_out = output.data();
1304 dstream_->avail_out = output.size();
1306 input.advance(input.size() - dstream_->avail_in);
1307 output.advance(output.size() - dstream_->avail_out);
1312 case StreamCodec::FlushOp::NONE:
1313 case StreamCodec::FlushOp::FLUSH:
1314 rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_RUN));
1316 case StreamCodec::FlushOp::END:
1317 rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_FINISH));
1320 throw std::invalid_argument("LZMA2StreamCodec: invalid flush");
1322 return rc == LZMA_STREAM_END;
1324 #endif // FOLLY_HAVE_LIBLZMA
1326 #ifdef FOLLY_HAVE_LIBZSTD
1329 void zstdFreeCStream(ZSTD_CStream* zcs) {
1330 ZSTD_freeCStream(zcs);
1333 void zstdFreeDStream(ZSTD_DStream* zds) {
1334 ZSTD_freeDStream(zds);
1341 class ZSTDStreamCodec final : public StreamCodec {
1343 static std::unique_ptr<Codec> createCodec(int level, CodecType);
1344 static std::unique_ptr<StreamCodec> createStream(int level, CodecType);
1345 explicit ZSTDStreamCodec(int level, CodecType type);
1347 std::vector<std::string> validPrefixes() const override;
1348 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1352 bool doNeedsUncompressedLength() const override;
1353 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1354 Optional<uint64_t> doGetUncompressedLength(
1356 Optional<uint64_t> uncompressedLength) const override;
1358 void doResetStream() override;
1359 bool doCompressStream(
1361 MutableByteRange& output,
1362 StreamCodec::FlushOp flushOp) override;
1363 bool doUncompressStream(
1365 MutableByteRange& output,
1366 StreamCodec::FlushOp flushOp) override;
1368 void resetCStream();
1369 void resetDStream();
1371 bool tryBlockCompress(ByteRange& input, MutableByteRange& output) const;
1372 bool tryBlockUncompress(ByteRange& input, MutableByteRange& output) const;
1375 bool needReset_{true};
1378 folly::static_function_deleter<ZSTD_CStream, &zstdFreeCStream>>
1382 folly::static_function_deleter<ZSTD_DStream, &zstdFreeDStream>>
1386 static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528;
1388 std::vector<std::string> ZSTDStreamCodec::validPrefixes() const {
1389 return {prefixToStringLE(kZSTDMagicLE)};
1392 bool ZSTDStreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1394 return dataStartsWithLE(data, kZSTDMagicLE);
1397 std::unique_ptr<Codec> ZSTDStreamCodec::createCodec(int level, CodecType type) {
1398 return make_unique<ZSTDStreamCodec>(level, type);
1401 std::unique_ptr<StreamCodec> ZSTDStreamCodec::createStream(
1404 return make_unique<ZSTDStreamCodec>(level, type);
1407 ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
1408 : StreamCodec(type) {
1409 DCHECK(type == CodecType::ZSTD);
1411 case COMPRESSION_LEVEL_FASTEST:
1414 case COMPRESSION_LEVEL_DEFAULT:
1417 case COMPRESSION_LEVEL_BEST:
1421 if (level < 1 || level > ZSTD_maxCLevel()) {
1422 throw std::invalid_argument(
1423 to<std::string>("ZSTD: invalid level: ", level));
1428 bool ZSTDStreamCodec::doNeedsUncompressedLength() const {
1432 uint64_t ZSTDStreamCodec::doMaxCompressedLength(
1433 uint64_t uncompressedLength) const {
1434 return ZSTD_compressBound(uncompressedLength);
1437 void zstdThrowIfError(size_t rc) {
1438 if (!ZSTD_isError(rc)) {
1441 throw std::runtime_error(
1442 to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1445 Optional<uint64_t> ZSTDStreamCodec::doGetUncompressedLength(
1447 Optional<uint64_t> uncompressedLength) const {
1448 // Read decompressed size from frame if available in first IOBuf.
1449 auto const decompressedSize =
1450 ZSTD_getDecompressedSize(data->data(), data->length());
1451 if (decompressedSize != 0) {
1452 if (uncompressedLength && *uncompressedLength != decompressedSize) {
1453 throw std::runtime_error("ZSTD: invalid uncompressed length");
1455 uncompressedLength = decompressedSize;
1457 return uncompressedLength;
1460 void ZSTDStreamCodec::doResetStream() {
1464 bool ZSTDStreamCodec::tryBlockCompress(
1466 MutableByteRange& output) const {
1468 // We need to know that we have enough output space to use block compression
1469 if (output.size() < ZSTD_compressBound(input.size())) {
1472 size_t const length = ZSTD_compress(
1473 output.data(), output.size(), input.data(), input.size(), level_);
1474 zstdThrowIfError(length);
1475 input.uncheckedAdvance(input.size());
1476 output.uncheckedAdvance(length);
1480 void ZSTDStreamCodec::resetCStream() {
1482 cstream_.reset(ZSTD_createCStream());
1484 throw std::bad_alloc{};
1487 // Advanced API usage works for all supported versions of zstd.
1488 // Required to set contentSizeFlag.
1489 auto params = ZSTD_getParams(level_, uncompressedLength().value_or(0), 0);
1490 params.fParams.contentSizeFlag = uncompressedLength().hasValue();
1491 zstdThrowIfError(ZSTD_initCStream_advanced(
1492 cstream_.get(), nullptr, 0, params, uncompressedLength().value_or(0)));
1495 bool ZSTDStreamCodec::doCompressStream(
1497 MutableByteRange& output,
1498 StreamCodec::FlushOp flushOp) {
1500 // If we are given all the input in one chunk try to use block compression
1501 if (flushOp == StreamCodec::FlushOp::END &&
1502 tryBlockCompress(input, output)) {
1508 ZSTD_inBuffer in = {input.data(), input.size(), 0};
1509 ZSTD_outBuffer out = {output.data(), output.size(), 0};
1511 input.uncheckedAdvance(in.pos);
1512 output.uncheckedAdvance(out.pos);
1514 if (flushOp == StreamCodec::FlushOp::NONE || !input.empty()) {
1515 zstdThrowIfError(ZSTD_compressStream(cstream_.get(), &out, &in));
1517 if (in.pos == in.size && flushOp != StreamCodec::FlushOp::NONE) {
1520 case StreamCodec::FlushOp::FLUSH:
1521 rc = ZSTD_flushStream(cstream_.get(), &out);
1523 case StreamCodec::FlushOp::END:
1524 rc = ZSTD_endStream(cstream_.get(), &out);
1527 throw std::invalid_argument("ZSTD: invalid FlushOp");
1529 zstdThrowIfError(rc);
1537 bool ZSTDStreamCodec::tryBlockUncompress(
1539 MutableByteRange& output) const {
1541 #if ZSTD_VERSION_NUMBER < 10104
1542 // We require ZSTD_findFrameCompressedSize() to perform this optimization.
1545 // We need to know the uncompressed length and have enough output space.
1546 if (!uncompressedLength() || output.size() < *uncompressedLength()) {
1549 size_t const compressedLength =
1550 ZSTD_findFrameCompressedSize(input.data(), input.size());
1551 zstdThrowIfError(compressedLength);
1552 size_t const length = ZSTD_decompress(
1553 output.data(), *uncompressedLength(), input.data(), compressedLength);
1554 zstdThrowIfError(length);
1555 if (length != *uncompressedLength()) {
1556 throw std::runtime_error("ZSTDStreamCodec: Incorrect uncompressed length");
1558 input.uncheckedAdvance(compressedLength);
1559 output.uncheckedAdvance(length);
1564 void ZSTDStreamCodec::resetDStream() {
1566 dstream_.reset(ZSTD_createDStream());
1568 throw std::bad_alloc{};
1571 zstdThrowIfError(ZSTD_initDStream(dstream_.get()));
1574 bool ZSTDStreamCodec::doUncompressStream(
1576 MutableByteRange& output,
1577 StreamCodec::FlushOp flushOp) {
1579 // If we are given all the input in one chunk try to use block uncompression
1580 if (flushOp == StreamCodec::FlushOp::END &&
1581 tryBlockUncompress(input, output)) {
1587 ZSTD_inBuffer in = {input.data(), input.size(), 0};
1588 ZSTD_outBuffer out = {output.data(), output.size(), 0};
1590 input.uncheckedAdvance(in.pos);
1591 output.uncheckedAdvance(out.pos);
1593 size_t const rc = ZSTD_decompressStream(dstream_.get(), &out, &in);
1594 zstdThrowIfError(rc);
1598 #endif // FOLLY_HAVE_LIBZSTD
1600 #if FOLLY_HAVE_LIBBZ2
1602 class Bzip2Codec final : public Codec {
1604 static std::unique_ptr<Codec> create(int level, CodecType type);
1605 explicit Bzip2Codec(int level, CodecType type);
1607 std::vector<std::string> validPrefixes() const override;
1608 bool canUncompress(IOBuf const* data, Optional<uint64_t> uncompressedLength)
1612 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1613 std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
1614 std::unique_ptr<IOBuf> doUncompress(
1616 Optional<uint64_t> uncompressedLength) override;
1621 /* static */ std::unique_ptr<Codec> Bzip2Codec::create(
1624 return std::make_unique<Bzip2Codec>(level, type);
1627 Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
1628 DCHECK(type == CodecType::BZIP2);
1630 case COMPRESSION_LEVEL_FASTEST:
1633 case COMPRESSION_LEVEL_DEFAULT:
1636 case COMPRESSION_LEVEL_BEST:
1640 if (level < 1 || level > 9) {
1641 throw std::invalid_argument(
1642 to<std::string>("Bzip2: invalid level: ", level));
1647 static uint32_t constexpr kBzip2MagicLE = 0x685a42;
1648 static uint64_t constexpr kBzip2MagicBytes = 3;
1650 std::vector<std::string> Bzip2Codec::validPrefixes() const {
1651 return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
1654 bool Bzip2Codec::canUncompress(IOBuf const* data, Optional<uint64_t>) const {
1655 return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
1658 uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1659 // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
1660 // To guarantee that the compressed data will fit in its buffer, allocate an
1661 // output buffer of size 1% larger than the uncompressed data, plus six
1662 // hundred extra bytes.
1663 return uncompressedLength + uncompressedLength / 100 + 600;
1666 static bz_stream createBzStream() {
1668 stream.bzalloc = nullptr;
1669 stream.bzfree = nullptr;
1670 stream.opaque = nullptr;
1671 stream.next_in = stream.next_out = nullptr;
1672 stream.avail_in = stream.avail_out = 0;
1676 // Throws on error condition, otherwise returns the code.
1677 static int bzCheck(int const rc) {
1686 throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
1690 static std::unique_ptr<IOBuf> addOutputBuffer(
1692 uint64_t const bufferLength) {
1693 DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
1694 DCHECK_EQ(stream->avail_out, 0);
1696 auto buf = IOBuf::create(bufferLength);
1697 buf->append(buf->capacity());
1699 stream->next_out = reinterpret_cast<char*>(buf->writableData());
1700 stream->avail_out = buf->length();
1705 std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
1706 bz_stream stream = createBzStream();
1707 bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
1709 bzCheck(BZ2_bzCompressEnd(&stream));
1712 uint64_t const uncompressedLength = data->computeChainDataLength();
1713 uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
1714 uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1715 uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
1717 auto out = addOutputBuffer(
1719 maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
1720 : kDefaultBufferLength);
1722 for (auto range : *data) {
1723 while (!range.empty()) {
1724 auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1726 const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1727 stream.avail_in = inSize;
1729 if (stream.avail_out == 0) {
1730 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1733 bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
1734 range.uncheckedAdvance(inSize - stream.avail_in);
1738 if (stream.avail_out == 0) {
1739 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1741 } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
1743 out->prev()->trimEnd(stream.avail_out);
1748 std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
1750 Optional<uint64_t> uncompressedLength) {
1751 bz_stream stream = createBzStream();
1752 bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
1754 bzCheck(BZ2_bzDecompressEnd(&stream));
1757 uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1758 uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
1759 uint64_t const kDefaultBufferLength =
1760 computeBufferLength(data->computeChainDataLength(), kBlockSize);
1762 auto out = addOutputBuffer(
1764 ((uncompressedLength && *uncompressedLength <= kMaxSingleStepLength)
1765 ? *uncompressedLength
1766 : kDefaultBufferLength));
1769 for (auto range : *data) {
1770 while (!range.empty()) {
1771 auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1773 const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1774 stream.avail_in = inSize;
1776 if (stream.avail_out == 0) {
1777 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1780 rc = bzCheck(BZ2_bzDecompress(&stream));
1781 range.uncheckedAdvance(inSize - stream.avail_in);
1784 while (rc != BZ_STREAM_END) {
1785 if (stream.avail_out == 0) {
1786 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1788 size_t const outputSize = stream.avail_out;
1789 rc = bzCheck(BZ2_bzDecompress(&stream));
1790 if (outputSize == stream.avail_out) {
1791 throw std::runtime_error("Bzip2Codec: Truncated input");
1795 out->prev()->trimEnd(stream.avail_out);
1797 uint64_t const totalOut =
1798 (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
1799 if (uncompressedLength && uncompressedLength != totalOut) {
1800 throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
1806 #endif // FOLLY_HAVE_LIBBZ2
1810 zlib::Options getZlibOptions(CodecType type) {
1811 DCHECK(type == CodecType::GZIP || type == CodecType::ZLIB);
1812 return type == CodecType::GZIP ? zlib::defaultGzipOptions()
1813 : zlib::defaultZlibOptions();
1816 std::unique_ptr<Codec> getZlibCodec(int level, CodecType type) {
1817 return zlib::getCodec(getZlibOptions(type), level);
1820 std::unique_ptr<StreamCodec> getZlibStreamCodec(int level, CodecType type) {
1821 return zlib::getStreamCodec(getZlibOptions(type), level);
1824 #endif // FOLLY_HAVE_LIBZ
1827 * Automatic decompression
1829 class AutomaticCodec final : public Codec {
1831 static std::unique_ptr<Codec> create(
1832 std::vector<std::unique_ptr<Codec>> customCodecs);
1833 explicit AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs);
1835 std::vector<std::string> validPrefixes() const override;
1836 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1840 bool doNeedsUncompressedLength() const override;
1841 uint64_t doMaxUncompressedLength() const override;
1843 uint64_t doMaxCompressedLength(uint64_t) const override {
1844 throw std::runtime_error(
1845 "AutomaticCodec error: maxCompressedLength() not supported.");
1847 std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
1848 throw std::runtime_error("AutomaticCodec error: compress() not supported.");
1850 std::unique_ptr<IOBuf> doUncompress(
1852 Optional<uint64_t> uncompressedLength) override;
1854 void addCodecIfSupported(CodecType type);
1856 // Throws iff the codecs aren't compatible (very slow)
1857 void checkCompatibleCodecs() const;
1859 std::vector<std::unique_ptr<Codec>> codecs_;
1860 bool needsUncompressedLength_;
1861 uint64_t maxUncompressedLength_;
1864 std::vector<std::string> AutomaticCodec::validPrefixes() const {
1865 std::unordered_set<std::string> prefixes;
1866 for (const auto& codec : codecs_) {
1867 const auto codecPrefixes = codec->validPrefixes();
1868 prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
1870 return std::vector<std::string>{prefixes.begin(), prefixes.end()};
1873 bool AutomaticCodec::canUncompress(
1875 Optional<uint64_t> uncompressedLength) const {
1879 [data, uncompressedLength](std::unique_ptr<Codec> const& codec) {
1880 return codec->canUncompress(data, uncompressedLength);
1884 void AutomaticCodec::addCodecIfSupported(CodecType type) {
1885 const bool present = std::any_of(
1888 [&type](std::unique_ptr<Codec> const& codec) {
1889 return codec->type() == type;
1891 if (hasCodec(type) && !present) {
1892 codecs_.push_back(getCodec(type));
1896 /* static */ std::unique_ptr<Codec> AutomaticCodec::create(
1897 std::vector<std::unique_ptr<Codec>> customCodecs) {
1898 return std::make_unique<AutomaticCodec>(std::move(customCodecs));
1901 AutomaticCodec::AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs)
1902 : Codec(CodecType::USER_DEFINED), codecs_(std::move(customCodecs)) {
1903 // Fastest -> slowest
1904 addCodecIfSupported(CodecType::LZ4_FRAME);
1905 addCodecIfSupported(CodecType::ZSTD);
1906 addCodecIfSupported(CodecType::ZLIB);
1907 addCodecIfSupported(CodecType::GZIP);
1908 addCodecIfSupported(CodecType::LZMA2);
1909 addCodecIfSupported(CodecType::BZIP2);
1911 checkCompatibleCodecs();
1913 // Check that none of the codes are are null
1914 DCHECK(std::none_of(
1915 codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1916 return codec == nullptr;
1919 needsUncompressedLength_ = std::any_of(
1920 codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1921 return codec->needsUncompressedLength();
1924 const auto it = std::max_element(
1927 [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) {
1928 return lhs->maxUncompressedLength() < rhs->maxUncompressedLength();
1930 DCHECK(it != codecs_.end());
1931 maxUncompressedLength_ = (*it)->maxUncompressedLength();
1934 void AutomaticCodec::checkCompatibleCodecs() const {
1935 // Keep track of all the possible headers.
1936 std::unordered_set<std::string> headers;
1937 // The empty header is not allowed.
1940 // Construct a set of headers and check that none of the headers occur twice.
1941 // Eliminate edge cases.
1942 for (auto&& codec : codecs_) {
1943 const auto codecHeaders = codec->validPrefixes();
1944 // Codecs without any valid headers are not allowed.
1945 if (codecHeaders.empty()) {
1946 throw std::invalid_argument{
1947 "AutomaticCodec: validPrefixes() must not be empty."};
1949 // Insert all the headers for the current codec.
1950 const size_t beforeSize = headers.size();
1951 headers.insert(codecHeaders.begin(), codecHeaders.end());
1952 // Codecs are not compatible if any header occurred twice.
1953 if (beforeSize + codecHeaders.size() != headers.size()) {
1954 throw std::invalid_argument{
1955 "AutomaticCodec: Two valid prefixes collide."};
1959 // Check if any strict non-empty prefix of any header is a header.
1960 for (const auto& header : headers) {
1961 for (size_t i = 1; i < header.size(); ++i) {
1962 if (headers.count(header.substr(0, i))) {
1963 throw std::invalid_argument{
1964 "AutomaticCodec: One valid prefix is a prefix of another valid "
1971 bool AutomaticCodec::doNeedsUncompressedLength() const {
1972 return needsUncompressedLength_;
1975 uint64_t AutomaticCodec::doMaxUncompressedLength() const {
1976 return maxUncompressedLength_;
1979 std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
1981 Optional<uint64_t> uncompressedLength) {
1982 for (auto&& codec : codecs_) {
1983 if (codec->canUncompress(data, uncompressedLength)) {
1984 return codec->uncompress(data, uncompressedLength);
1987 throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
1990 using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
1991 using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
1994 StreamCodecFactory stream;
1998 codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
2000 {NoCompressionCodec::create, nullptr},
2002 #if FOLLY_HAVE_LIBLZ4
2003 {LZ4Codec::create, nullptr},
2008 #if FOLLY_HAVE_LIBSNAPPY
2009 {SnappyCodec::create, nullptr},
2015 {getZlibCodec, getZlibStreamCodec},
2020 #if FOLLY_HAVE_LIBLZ4
2021 {LZ4Codec::create, nullptr},
2026 #if FOLLY_HAVE_LIBLZMA
2027 {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2028 {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2034 #if FOLLY_HAVE_LIBZSTD
2035 {ZSTDStreamCodec::createCodec, ZSTDStreamCodec::createStream},
2041 {getZlibCodec, getZlibStreamCodec},
2046 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
2047 {LZ4FrameCodec::create, nullptr},
2052 #if FOLLY_HAVE_LIBBZ2
2053 {Bzip2Codec::create, nullptr},
2059 Factory const& getFactory(CodecType type) {
2060 size_t const idx = static_cast<size_t>(type);
2061 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
2062 throw std::invalid_argument(
2063 to<std::string>("Compression type ", idx, " invalid"));
2065 return codecFactories[idx];
2069 bool hasCodec(CodecType type) {
2070 return getFactory(type).codec != nullptr;
2073 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
2074 auto const factory = getFactory(type).codec;
2076 throw std::invalid_argument(
2077 to<std::string>("Compression type ", type, " not supported"));
2079 auto codec = (*factory)(level, type);
2080 DCHECK(codec->type() == type);
2084 bool hasStreamCodec(CodecType type) {
2085 return getFactory(type).stream != nullptr;
2088 std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
2089 auto const factory = getFactory(type).stream;
2091 throw std::invalid_argument(
2092 to<std::string>("Compression type ", type, " not supported"));
2094 auto codec = (*factory)(level, type);
2095 DCHECK(codec->type() == type);
2099 std::unique_ptr<Codec> getAutoUncompressionCodec(
2100 std::vector<std::unique_ptr<Codec>> customCodecs) {
2101 return AutomaticCodec::create(std::move(customCodecs));
2104 } // namespace folly