2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
22 #if LZ4_VERSION_NUMBER >= 10301
27 #include <glog/logging.h>
29 #if FOLLY_HAVE_LIBSNAPPY
30 #include <snappy-sinksource.h>
35 #include <folly/io/compression/Zlib.h>
38 #if FOLLY_HAVE_LIBLZMA
42 #if FOLLY_HAVE_LIBZSTD
43 #define ZSTD_STATIC_LINKING_ONLY
51 #include <folly/Bits.h>
52 #include <folly/Conv.h>
53 #include <folly/Memory.h>
54 #include <folly/Portability.h>
55 #include <folly/ScopeGuard.h>
56 #include <folly/Varint.h>
57 #include <folly/io/Cursor.h>
58 #include <folly/io/compression/Utils.h>
60 #include <unordered_set>
62 using folly::io::compression::detail::dataStartsWithLE;
63 using folly::io::compression::detail::prefixToStringLE;
65 namespace zlib = folly::io::zlib;
70 Codec::Codec(CodecType type) : type_(type) { }
72 // Ensure consistent behavior in the nullptr case
73 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
74 if (data == nullptr) {
75 throw std::invalid_argument("Codec: data must not be nullptr");
77 uint64_t len = data->computeChainDataLength();
79 return IOBuf::create(0);
81 if (len > maxUncompressedLength()) {
82 throw std::runtime_error("Codec: uncompressed length too large");
85 return doCompress(data);
88 std::string Codec::compress(const StringPiece data) {
89 const uint64_t len = data.size();
93 if (len > maxUncompressedLength()) {
94 throw std::runtime_error("Codec: uncompressed length too large");
97 return doCompressString(data);
100 std::unique_ptr<IOBuf> Codec::uncompress(
102 Optional<uint64_t> uncompressedLength) {
103 if (data == nullptr) {
104 throw std::invalid_argument("Codec: data must not be nullptr");
106 if (!uncompressedLength) {
107 if (needsUncompressedLength()) {
108 throw std::invalid_argument("Codec: uncompressed length required");
110 } else if (*uncompressedLength > maxUncompressedLength()) {
111 throw std::runtime_error("Codec: uncompressed length too large");
115 if (uncompressedLength.value_or(0) != 0) {
116 throw std::runtime_error("Codec: invalid uncompressed length");
118 return IOBuf::create(0);
121 return doUncompress(data, uncompressedLength);
124 std::string Codec::uncompress(
125 const StringPiece data,
126 Optional<uint64_t> uncompressedLength) {
127 if (!uncompressedLength) {
128 if (needsUncompressedLength()) {
129 throw std::invalid_argument("Codec: uncompressed length required");
131 } else if (*uncompressedLength > maxUncompressedLength()) {
132 throw std::runtime_error("Codec: uncompressed length too large");
136 if (uncompressedLength.value_or(0) != 0) {
137 throw std::runtime_error("Codec: invalid uncompressed length");
142 return doUncompressString(data, uncompressedLength);
145 bool Codec::needsUncompressedLength() const {
146 return doNeedsUncompressedLength();
149 uint64_t Codec::maxUncompressedLength() const {
150 return doMaxUncompressedLength();
153 bool Codec::doNeedsUncompressedLength() const {
157 uint64_t Codec::doMaxUncompressedLength() const {
158 return UNLIMITED_UNCOMPRESSED_LENGTH;
161 std::vector<std::string> Codec::validPrefixes() const {
165 bool Codec::canUncompress(const IOBuf*, Optional<uint64_t>) const {
169 std::string Codec::doCompressString(const StringPiece data) {
170 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
171 auto outputBuffer = doCompress(&inputBuffer);
173 output.reserve(outputBuffer->computeChainDataLength());
174 for (auto range : *outputBuffer) {
175 output.append(reinterpret_cast<const char*>(range.data()), range.size());
180 std::string Codec::doUncompressString(
181 const StringPiece data,
182 Optional<uint64_t> uncompressedLength) {
183 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
184 auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
186 output.reserve(outputBuffer->computeChainDataLength());
187 for (auto range : *outputBuffer) {
188 output.append(reinterpret_cast<const char*>(range.data()), range.size());
193 uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
194 if (uncompressedLength == 0) {
197 return doMaxCompressedLength(uncompressedLength);
200 Optional<uint64_t> Codec::getUncompressedLength(
201 const folly::IOBuf* data,
202 Optional<uint64_t> uncompressedLength) const {
203 auto const compressedLength = data->computeChainDataLength();
204 if (uncompressedLength == uint64_t(0) || compressedLength == 0) {
205 if (uncompressedLength.value_or(0) != 0 || compressedLength != 0) {
206 throw std::runtime_error("Invalid uncompressed length");
210 return doGetUncompressedLength(data, uncompressedLength);
213 Optional<uint64_t> Codec::doGetUncompressedLength(
215 Optional<uint64_t> uncompressedLength) const {
216 return uncompressedLength;
219 bool StreamCodec::needsDataLength() const {
220 return doNeedsDataLength();
223 bool StreamCodec::doNeedsDataLength() const {
227 void StreamCodec::assertStateIs(State expected) const {
228 if (state_ != expected) {
229 throw std::logic_error(folly::to<std::string>(
230 "Codec: state is ", state_, "; expected state ", expected));
234 void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
235 state_ = State::RESET;
236 uncompressedLength_ = uncompressedLength;
240 bool StreamCodec::compressStream(
242 MutableByteRange& output,
243 StreamCodec::FlushOp flushOp) {
244 if (state_ == State::RESET && input.empty()) {
245 if (flushOp == StreamCodec::FlushOp::NONE) {
248 if (flushOp == StreamCodec::FlushOp::END &&
249 uncompressedLength().value_or(0) != 0) {
250 throw std::runtime_error("Codec: invalid uncompressed length");
254 if (!uncompressedLength() && needsDataLength()) {
255 throw std::runtime_error("Codec: uncompressed length required");
257 if (state_ == State::RESET && !input.empty() &&
258 uncompressedLength() == uint64_t(0)) {
259 throw std::runtime_error("Codec: invalid uncompressed length");
261 // Handle input state transitions
263 case StreamCodec::FlushOp::NONE:
264 if (state_ == State::RESET) {
265 state_ = State::COMPRESS;
267 assertStateIs(State::COMPRESS);
269 case StreamCodec::FlushOp::FLUSH:
270 if (state_ == State::RESET || state_ == State::COMPRESS) {
271 state_ = State::COMPRESS_FLUSH;
273 assertStateIs(State::COMPRESS_FLUSH);
275 case StreamCodec::FlushOp::END:
276 if (state_ == State::RESET || state_ == State::COMPRESS) {
277 state_ = State::COMPRESS_END;
279 assertStateIs(State::COMPRESS_END);
282 bool const done = doCompressStream(input, output, flushOp);
283 // Handle output state transitions
285 if (state_ == State::COMPRESS_FLUSH) {
286 state_ = State::COMPRESS;
287 } else if (state_ == State::COMPRESS_END) {
290 // Check internal invariants
291 DCHECK(input.empty());
292 DCHECK(flushOp != StreamCodec::FlushOp::NONE);
297 bool StreamCodec::uncompressStream(
299 MutableByteRange& output,
300 StreamCodec::FlushOp flushOp) {
301 if (state_ == State::RESET && input.empty()) {
302 if (uncompressedLength().value_or(0) == 0) {
307 // Handle input state transitions
308 if (state_ == State::RESET) {
309 state_ = State::UNCOMPRESS;
311 assertStateIs(State::UNCOMPRESS);
312 bool const done = doUncompressStream(input, output, flushOp);
313 // Handle output state transitions
320 static std::unique_ptr<IOBuf> addOutputBuffer(
321 MutableByteRange& output,
323 DCHECK(output.empty());
324 auto buffer = IOBuf::create(size);
325 buffer->append(buffer->capacity());
326 output = {buffer->writableData(), buffer->length()};
330 std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
331 uint64_t const uncompressedLength = data->computeChainDataLength();
332 resetStream(uncompressedLength);
333 uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
335 auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
336 auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
338 MutableByteRange output;
339 auto buffer = addOutputBuffer(
341 maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
342 : kDefaultBufferLength);
344 // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
345 IOBuf const* current = data;
346 ByteRange input{current->data(), current->length()};
347 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
349 while (input.empty() && current->next() != data) {
350 current = current->next();
351 input = {current->data(), current->length()};
353 if (current->next() == data) {
354 // This is the last input buffer so end the stream
355 flushOp = StreamCodec::FlushOp::END;
357 if (output.empty()) {
358 buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
360 size_t const inputSize = input.size();
361 size_t const outputSize = output.size();
362 bool const done = compressStream(input, output, flushOp);
364 DCHECK(input.empty());
365 DCHECK(flushOp == StreamCodec::FlushOp::END);
366 DCHECK_EQ(current->next(), data);
369 if (inputSize == input.size() && outputSize == output.size()) {
370 throw std::runtime_error("Codec: No forward progress made");
373 buffer->prev()->trimEnd(output.size());
377 static uint64_t computeBufferLength(
378 uint64_t const compressedLength,
379 uint64_t const blockSize) {
380 uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
381 uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
382 return std::min(goodBufferSize, kMaxBufferLength);
385 std::unique_ptr<IOBuf> StreamCodec::doUncompress(
387 Optional<uint64_t> uncompressedLength) {
388 auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
389 auto constexpr kBlockSize = uint64_t(128) << 10;
390 auto const defaultBufferLength =
391 computeBufferLength(data->computeChainDataLength(), kBlockSize);
393 uncompressedLength = getUncompressedLength(data, uncompressedLength);
394 resetStream(uncompressedLength);
396 MutableByteRange output;
397 auto buffer = addOutputBuffer(
399 (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
400 ? *uncompressedLength
401 : defaultBufferLength));
403 // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
404 IOBuf const* current = data;
405 ByteRange input{current->data(), current->length()};
406 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
408 while (input.empty() && current->next() != data) {
409 current = current->next();
410 input = {current->data(), current->length()};
412 if (current->next() == data) {
413 // Tell the uncompressor there is no more input (it may optimize)
414 flushOp = StreamCodec::FlushOp::END;
416 if (output.empty()) {
417 buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
419 size_t const inputSize = input.size();
420 size_t const outputSize = output.size();
421 bool const done = uncompressStream(input, output, flushOp);
425 if (inputSize == input.size() && outputSize == output.size()) {
426 throw std::runtime_error("Codec: Truncated data");
429 if (!input.empty()) {
430 throw std::runtime_error("Codec: Junk after end of data");
433 buffer->prev()->trimEnd(output.size());
434 if (uncompressedLength &&
435 *uncompressedLength != buffer->computeChainDataLength()) {
436 throw std::runtime_error("Codec: invalid uncompressed length");
447 class NoCompressionCodec final : public Codec {
449 static std::unique_ptr<Codec> create(int level, CodecType type);
450 explicit NoCompressionCodec(int level, CodecType type);
453 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
454 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
455 std::unique_ptr<IOBuf> doUncompress(
457 Optional<uint64_t> uncompressedLength) override;
460 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
461 return std::make_unique<NoCompressionCodec>(level, type);
464 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
466 DCHECK(type == CodecType::NO_COMPRESSION);
468 case COMPRESSION_LEVEL_DEFAULT:
469 case COMPRESSION_LEVEL_FASTEST:
470 case COMPRESSION_LEVEL_BEST:
474 throw std::invalid_argument(to<std::string>(
475 "NoCompressionCodec: invalid level ", level));
479 uint64_t NoCompressionCodec::doMaxCompressedLength(
480 uint64_t uncompressedLength) const {
481 return uncompressedLength;
484 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
486 return data->clone();
489 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
491 Optional<uint64_t> uncompressedLength) {
492 if (uncompressedLength &&
493 data->computeChainDataLength() != *uncompressedLength) {
494 throw std::runtime_error(
495 to<std::string>("NoCompressionCodec: invalid uncompressed length"));
497 return data->clone();
500 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
504 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
505 DCHECK_GE(out->tailroom(), kMaxVarintLength64);
506 out->append(encodeVarint(val, out->writableTail()));
509 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
512 for (int shift = 0; shift <= 63; shift += 7) {
513 b = cursor.read<int8_t>();
514 val |= static_cast<uint64_t>(b & 0x7f) << shift;
520 throw std::invalid_argument("Invalid varint value. Too big.");
527 #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
529 #if FOLLY_HAVE_LIBLZ4
534 class LZ4Codec final : public Codec {
536 static std::unique_ptr<Codec> create(int level, CodecType type);
537 explicit LZ4Codec(int level, CodecType type);
540 bool doNeedsUncompressedLength() const override;
541 uint64_t doMaxUncompressedLength() const override;
542 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
544 bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
546 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
547 std::unique_ptr<IOBuf> doUncompress(
549 Optional<uint64_t> uncompressedLength) override;
551 bool highCompression_;
554 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
555 return std::make_unique<LZ4Codec>(level, type);
558 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
559 DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
562 case COMPRESSION_LEVEL_FASTEST:
563 case COMPRESSION_LEVEL_DEFAULT:
566 case COMPRESSION_LEVEL_BEST:
570 if (level < 1 || level > 2) {
571 throw std::invalid_argument(to<std::string>(
572 "LZ4Codec: invalid level: ", level));
574 highCompression_ = (level > 1);
577 bool LZ4Codec::doNeedsUncompressedLength() const {
578 return !encodeSize();
581 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
582 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
584 #ifndef LZ4_MAX_INPUT_SIZE
585 # define LZ4_MAX_INPUT_SIZE 0x7E000000
588 uint64_t LZ4Codec::doMaxUncompressedLength() const {
589 return LZ4_MAX_INPUT_SIZE;
592 uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
593 return LZ4_compressBound(uncompressedLength) +
594 (encodeSize() ? kMaxVarintLength64 : 0);
597 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
599 if (data->isChained()) {
600 // LZ4 doesn't support streaming, so we have to coalesce
601 clone = data->cloneCoalescedAsValue();
605 auto out = IOBuf::create(maxCompressedLength(data->length()));
607 encodeVarintToIOBuf(data->length(), out.get());
611 auto input = reinterpret_cast<const char*>(data->data());
612 auto output = reinterpret_cast<char*>(out->writableTail());
613 const auto inputLength = data->length();
614 #if LZ4_VERSION_NUMBER >= 10700
615 if (highCompression_) {
616 n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
618 n = LZ4_compress_default(input, output, inputLength, out->tailroom());
621 if (highCompression_) {
622 n = LZ4_compressHC(input, output, inputLength);
624 n = LZ4_compress(input, output, inputLength);
629 CHECK_LE(n, out->capacity());
635 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
637 Optional<uint64_t> uncompressedLength) {
639 if (data->isChained()) {
640 // LZ4 doesn't support streaming, so we have to coalesce
641 clone = data->cloneCoalescedAsValue();
645 folly::io::Cursor cursor(data);
646 uint64_t actualUncompressedLength;
648 actualUncompressedLength = decodeVarintFromCursor(cursor);
649 if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
650 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
654 DCHECK(uncompressedLength.hasValue());
655 DCHECK(*uncompressedLength <= maxUncompressedLength());
656 actualUncompressedLength = *uncompressedLength;
659 auto sp = StringPiece{cursor.peekBytes()};
660 auto out = IOBuf::create(actualUncompressedLength);
661 int n = LZ4_decompress_safe(
663 reinterpret_cast<char*>(out->writableTail()),
665 actualUncompressedLength);
667 if (n < 0 || uint64_t(n) != actualUncompressedLength) {
668 throw std::runtime_error(to<std::string>(
669 "LZ4 decompression returned invalid value ", n));
671 out->append(actualUncompressedLength);
675 #if LZ4_VERSION_NUMBER >= 10301
677 class LZ4FrameCodec final : public Codec {
679 static std::unique_ptr<Codec> create(int level, CodecType type);
680 explicit LZ4FrameCodec(int level, CodecType type);
681 ~LZ4FrameCodec() override;
683 std::vector<std::string> validPrefixes() const override;
684 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
688 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
690 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
691 std::unique_ptr<IOBuf> doUncompress(
693 Optional<uint64_t> uncompressedLength) override;
695 // Reset the dctx_ if it is dirty or null.
699 LZ4F_decompressionContext_t dctx_{nullptr};
703 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
706 return std::make_unique<LZ4FrameCodec>(level, type);
709 static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204;
711 std::vector<std::string> LZ4FrameCodec::validPrefixes() const {
712 return {prefixToStringLE(kLZ4FrameMagicLE)};
715 bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
716 return dataStartsWithLE(data, kLZ4FrameMagicLE);
719 uint64_t LZ4FrameCodec::doMaxCompressedLength(
720 uint64_t uncompressedLength) const {
721 LZ4F_preferences_t prefs{};
722 prefs.compressionLevel = level_;
723 prefs.frameInfo.contentSize = uncompressedLength;
724 return LZ4F_compressFrameBound(uncompressedLength, &prefs);
727 static size_t lz4FrameThrowOnError(size_t code) {
728 if (LZ4F_isError(code)) {
729 throw std::runtime_error(
730 to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
735 void LZ4FrameCodec::resetDCtx() {
736 if (dctx_ && !dirty_) {
740 LZ4F_freeDecompressionContext(dctx_);
742 lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
746 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
747 DCHECK(type == CodecType::LZ4_FRAME);
749 case COMPRESSION_LEVEL_FASTEST:
750 case COMPRESSION_LEVEL_DEFAULT:
753 case COMPRESSION_LEVEL_BEST:
762 LZ4FrameCodec::~LZ4FrameCodec() {
764 LZ4F_freeDecompressionContext(dctx_);
768 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
769 // LZ4 Frame compression doesn't support streaming so we have to coalesce
771 if (data->isChained()) {
772 clone = data->cloneCoalescedAsValue();
776 const auto uncompressedLength = data->length();
777 LZ4F_preferences_t prefs{};
778 prefs.compressionLevel = level_;
779 prefs.frameInfo.contentSize = uncompressedLength;
781 auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
782 const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
788 buf->append(written);
792 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
794 Optional<uint64_t> uncompressedLength) {
795 // Reset the dctx if any errors have occurred
798 ByteRange in = *data->begin();
800 if (data->isChained()) {
801 clone = data->cloneCoalescedAsValue();
802 in = clone.coalesce();
805 // Select decompression options
806 LZ4F_decompressOptions_t options;
807 options.stableDst = 1;
808 // Select blockSize and growthSize for the IOBufQueue
809 IOBufQueue queue(IOBufQueue::cacheChainLength());
810 auto blockSize = uint64_t{64} << 10;
811 auto growthSize = uint64_t{4} << 20;
812 if (uncompressedLength) {
813 // Allocate uncompressedLength in one chunk (up to 64 MB)
814 const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20);
815 queue.preallocate(allocateSize, allocateSize);
816 blockSize = std::min(*uncompressedLength, blockSize);
817 growthSize = std::min(*uncompressedLength, growthSize);
819 // Reduce growthSize for small data
820 const auto guessUncompressedLen =
821 4 * std::max<uint64_t>(blockSize, in.size());
822 growthSize = std::min(guessUncompressedLen, growthSize);
824 // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
827 // Decompress until the frame is over
830 // Allocate enough space to decompress at least a block
833 std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
835 size_t inSize = in.size();
836 code = lz4FrameThrowOnError(
837 LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
838 if (in.empty() && outSize == 0 && code != 0) {
839 // We passed no input, no output was produced, and the frame isn't over
840 // No more forward progress is possible
841 throw std::runtime_error("LZ4Frame error: Incomplete frame");
843 in.uncheckedAdvance(inSize);
844 queue.postallocate(outSize);
846 // At this point the decompression context can be reused
848 if (uncompressedLength && queue.chainLength() != *uncompressedLength) {
849 throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
854 #endif // LZ4_VERSION_NUMBER >= 10301
855 #endif // FOLLY_HAVE_LIBLZ4
857 #if FOLLY_HAVE_LIBSNAPPY
864 * Implementation of snappy::Source that reads from a IOBuf chain.
866 class IOBufSnappySource final : public snappy::Source {
868 explicit IOBufSnappySource(const IOBuf* data);
869 size_t Available() const override;
870 const char* Peek(size_t* len) override;
871 void Skip(size_t n) override;
877 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
878 : available_(data->computeChainDataLength()),
882 size_t IOBufSnappySource::Available() const {
886 const char* IOBufSnappySource::Peek(size_t* len) {
887 auto sp = StringPiece{cursor_.peekBytes()};
892 void IOBufSnappySource::Skip(size_t n) {
893 CHECK_LE(n, available_);
898 class SnappyCodec final : public Codec {
900 static std::unique_ptr<Codec> create(int level, CodecType type);
901 explicit SnappyCodec(int level, CodecType type);
904 uint64_t doMaxUncompressedLength() const override;
905 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
906 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
907 std::unique_ptr<IOBuf> doUncompress(
909 Optional<uint64_t> uncompressedLength) override;
912 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
913 return std::make_unique<SnappyCodec>(level, type);
916 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
917 DCHECK(type == CodecType::SNAPPY);
919 case COMPRESSION_LEVEL_FASTEST:
920 case COMPRESSION_LEVEL_DEFAULT:
921 case COMPRESSION_LEVEL_BEST:
925 throw std::invalid_argument(to<std::string>(
926 "SnappyCodec: invalid level: ", level));
930 uint64_t SnappyCodec::doMaxUncompressedLength() const {
931 // snappy.h uses uint32_t for lengths, so there's that.
932 return std::numeric_limits<uint32_t>::max();
935 uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
936 return snappy::MaxCompressedLength(uncompressedLength);
939 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
940 IOBufSnappySource source(data);
941 auto out = IOBuf::create(maxCompressedLength(source.Available()));
943 snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
944 out->writableTail()));
946 size_t n = snappy::Compress(&source, &sink);
948 CHECK_LE(n, out->capacity());
953 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(
955 Optional<uint64_t> uncompressedLength) {
956 uint32_t actualUncompressedLength = 0;
959 IOBufSnappySource source(data);
960 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
961 throw std::runtime_error("snappy::GetUncompressedLength failed");
963 if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
964 throw std::runtime_error("snappy: invalid uncompressed length");
968 auto out = IOBuf::create(actualUncompressedLength);
971 IOBufSnappySource source(data);
972 if (!snappy::RawUncompress(&source,
973 reinterpret_cast<char*>(out->writableTail()))) {
974 throw std::runtime_error("snappy::RawUncompress failed");
978 out->append(actualUncompressedLength);
982 #endif // FOLLY_HAVE_LIBSNAPPY
984 #if FOLLY_HAVE_LIBLZMA
989 class LZMA2StreamCodec final : public StreamCodec {
991 static std::unique_ptr<Codec> createCodec(int level, CodecType type);
992 static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
993 explicit LZMA2StreamCodec(int level, CodecType type);
994 ~LZMA2StreamCodec() override;
996 std::vector<std::string> validPrefixes() const override;
997 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1001 bool doNeedsDataLength() const override;
1002 uint64_t doMaxUncompressedLength() const override;
1003 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1005 bool encodeSize() const {
1006 return type() == CodecType::LZMA2_VARINT_SIZE;
1009 void doResetStream() override;
1010 bool doCompressStream(
1012 MutableByteRange& output,
1013 StreamCodec::FlushOp flushOp) override;
1014 bool doUncompressStream(
1016 MutableByteRange& output,
1017 StreamCodec::FlushOp flushOp) override;
1019 void resetCStream();
1020 void resetDStream();
1022 size_t decodeVarint(ByteRange& input);
1023 bool flushVarintBuffer(MutableByteRange& output);
1024 void resetVarintBuffer();
1026 Optional<lzma_stream> cstream_{};
1027 Optional<lzma_stream> dstream_{};
1029 std::array<uint8_t, kMaxVarintLength64> varintBuffer_;
1030 ByteRange varintToEncode_;
1031 size_t varintBufferPos_{0};
1034 bool needReset_{true};
1035 bool needDecodeSize_{false};
1038 static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD;
1039 static constexpr unsigned kLZMA2MagicBytes = 6;
1041 std::vector<std::string> LZMA2StreamCodec::validPrefixes() const {
1042 if (type() == CodecType::LZMA2_VARINT_SIZE) {
1045 return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)};
1048 bool LZMA2StreamCodec::doNeedsDataLength() const {
1049 return encodeSize();
1052 bool LZMA2StreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1054 if (type() == CodecType::LZMA2_VARINT_SIZE) {
1057 // Returns false for all inputs less than 8 bytes.
1058 // This is okay, because no valid LZMA2 streams are less than 8 bytes.
1059 return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes);
1062 std::unique_ptr<Codec> LZMA2StreamCodec::createCodec(
1065 return make_unique<LZMA2StreamCodec>(level, type);
1068 std::unique_ptr<StreamCodec> LZMA2StreamCodec::createStream(
1071 return make_unique<LZMA2StreamCodec>(level, type);
1074 LZMA2StreamCodec::LZMA2StreamCodec(int level, CodecType type)
1075 : StreamCodec(type) {
1076 DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
1078 case COMPRESSION_LEVEL_FASTEST:
1081 case COMPRESSION_LEVEL_DEFAULT:
1082 level = LZMA_PRESET_DEFAULT;
1084 case COMPRESSION_LEVEL_BEST:
1088 if (level < 0 || level > 9) {
1089 throw std::invalid_argument(
1090 to<std::string>("LZMA2Codec: invalid level: ", level));
1095 LZMA2StreamCodec::~LZMA2StreamCodec() {
1097 lzma_end(cstream_.get_pointer());
1101 lzma_end(dstream_.get_pointer());
1106 uint64_t LZMA2StreamCodec::doMaxUncompressedLength() const {
1107 // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
1108 return uint64_t(1) << 63;
1111 uint64_t LZMA2StreamCodec::doMaxCompressedLength(
1112 uint64_t uncompressedLength) const {
1113 return lzma_stream_buffer_bound(uncompressedLength) +
1114 (encodeSize() ? kMaxVarintLength64 : 0);
1117 void LZMA2StreamCodec::doResetStream() {
1121 void LZMA2StreamCodec::resetCStream() {
1123 cstream_.assign(LZMA_STREAM_INIT);
1126 lzma_easy_encoder(cstream_.get_pointer(), level_, LZMA_CHECK_NONE);
1127 if (rc != LZMA_OK) {
1128 throw std::runtime_error(folly::to<std::string>(
1129 "LZMA2StreamCodec: lzma_easy_encoder error: ", rc));
1133 void LZMA2StreamCodec::resetDStream() {
1135 dstream_.assign(LZMA_STREAM_INIT);
1137 lzma_ret const rc = lzma_auto_decoder(
1138 dstream_.get_pointer(), std::numeric_limits<uint64_t>::max(), 0);
1139 if (rc != LZMA_OK) {
1140 throw std::runtime_error(folly::to<std::string>(
1141 "LZMA2StreamCodec: lzma_auto_decoder error: ", rc));
1145 static lzma_ret lzmaThrowOnError(lzma_ret const rc) {
1148 case LZMA_STREAM_END:
1149 case LZMA_BUF_ERROR: // not fatal: returned if no progress was made twice
1152 throw std::runtime_error(
1153 to<std::string>("LZMA2StreamCodec: error: ", rc));
1157 static lzma_action lzmaTranslateFlush(StreamCodec::FlushOp flush) {
1159 case StreamCodec::FlushOp::NONE:
1161 case StreamCodec::FlushOp::FLUSH:
1162 return LZMA_SYNC_FLUSH;
1163 case StreamCodec::FlushOp::END:
1166 throw std::invalid_argument("LZMA2StreamCodec: Invalid flush");
1171 * Flushes the varint buffer.
1172 * Advances output by the number of bytes written.
1173 * Returns true when flushing is complete.
1175 bool LZMA2StreamCodec::flushVarintBuffer(MutableByteRange& output) {
1176 if (varintToEncode_.empty()) {
1179 const size_t numBytesToCopy = std::min(varintToEncode_.size(), output.size());
1180 if (numBytesToCopy > 0) {
1181 memcpy(output.data(), varintToEncode_.data(), numBytesToCopy);
1183 varintToEncode_.advance(numBytesToCopy);
1184 output.advance(numBytesToCopy);
1185 return varintToEncode_.empty();
1188 bool LZMA2StreamCodec::doCompressStream(
1190 MutableByteRange& output,
1191 StreamCodec::FlushOp flushOp) {
1195 varintBufferPos_ = 0;
1196 size_t const varintSize =
1197 encodeVarint(*uncompressedLength(), varintBuffer_.data());
1198 varintToEncode_ = {varintBuffer_.data(), varintSize};
1203 if (!flushVarintBuffer(output)) {
1207 cstream_->next_in = const_cast<uint8_t*>(input.data());
1208 cstream_->avail_in = input.size();
1209 cstream_->next_out = output.data();
1210 cstream_->avail_out = output.size();
1212 input.uncheckedAdvance(input.size() - cstream_->avail_in);
1213 output.uncheckedAdvance(output.size() - cstream_->avail_out);
1215 lzma_ret const rc = lzmaThrowOnError(
1216 lzma_code(cstream_.get_pointer(), lzmaTranslateFlush(flushOp)));
1218 case StreamCodec::FlushOp::NONE:
1220 case StreamCodec::FlushOp::FLUSH:
1221 return cstream_->avail_in == 0 && cstream_->avail_out != 0;
1222 case StreamCodec::FlushOp::END:
1223 return rc == LZMA_STREAM_END;
1225 throw std::invalid_argument("LZMA2StreamCodec: invalid FlushOp");
1230 * Attempts to decode a varint from input.
1231 * The function advances input by the number of bytes read.
1233 * If there are too many bytes and the varint is not valid, throw a
1235 * Returns the decoded size or 0 if more bytes are needed.
1237 size_t LZMA2StreamCodec::decodeVarint(ByteRange& input) {
1238 if (input.empty()) {
1241 size_t const numBytesToCopy =
1242 std::min(kMaxVarintLength64 - varintBufferPos_, input.size());
1243 memcpy(varintBuffer_.data() + varintBufferPos_, input.data(), numBytesToCopy);
1245 size_t const rangeSize = varintBufferPos_ + numBytesToCopy;
1246 ByteRange range{varintBuffer_.data(), rangeSize};
1247 auto const ret = tryDecodeVarint(range);
1249 if (ret.hasValue()) {
1250 size_t const varintSize = rangeSize - range.size();
1251 input.advance(varintSize - varintBufferPos_);
1253 } else if (ret.error() == DecodeVarintError::TooManyBytes) {
1254 throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1257 input.advance(numBytesToCopy);
1258 varintBufferPos_ += numBytesToCopy;
1263 bool LZMA2StreamCodec::doUncompressStream(
1265 MutableByteRange& output,
1266 StreamCodec::FlushOp flushOp) {
1270 needDecodeSize_ = encodeSize();
1273 varintBufferPos_ = 0;
1277 if (needDecodeSize_) {
1278 // Try decoding the varint. If the input does not contain the entire varint,
1279 // buffer the input. If the varint can not be decoded, fail.
1280 size_t const size = decodeVarint(input);
1284 if (uncompressedLength() && *uncompressedLength() != size) {
1285 throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1287 needDecodeSize_ = false;
1290 dstream_->next_in = const_cast<uint8_t*>(input.data());
1291 dstream_->avail_in = input.size();
1292 dstream_->next_out = output.data();
1293 dstream_->avail_out = output.size();
1295 input.advance(input.size() - dstream_->avail_in);
1296 output.advance(output.size() - dstream_->avail_out);
1301 case StreamCodec::FlushOp::NONE:
1302 case StreamCodec::FlushOp::FLUSH:
1303 rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_RUN));
1305 case StreamCodec::FlushOp::END:
1306 rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_FINISH));
1309 throw std::invalid_argument("LZMA2StreamCodec: invalid flush");
1311 return rc == LZMA_STREAM_END;
1313 #endif // FOLLY_HAVE_LIBLZMA
1315 #ifdef FOLLY_HAVE_LIBZSTD
1318 void zstdFreeCStream(ZSTD_CStream* zcs) {
1319 ZSTD_freeCStream(zcs);
1322 void zstdFreeDStream(ZSTD_DStream* zds) {
1323 ZSTD_freeDStream(zds);
1330 class ZSTDStreamCodec final : public StreamCodec {
1332 static std::unique_ptr<Codec> createCodec(int level, CodecType);
1333 static std::unique_ptr<StreamCodec> createStream(int level, CodecType);
1334 explicit ZSTDStreamCodec(int level, CodecType type);
1336 std::vector<std::string> validPrefixes() const override;
1337 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1341 bool doNeedsUncompressedLength() const override;
1342 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1343 Optional<uint64_t> doGetUncompressedLength(
1345 Optional<uint64_t> uncompressedLength) const override;
1347 void doResetStream() override;
1348 bool doCompressStream(
1350 MutableByteRange& output,
1351 StreamCodec::FlushOp flushOp) override;
1352 bool doUncompressStream(
1354 MutableByteRange& output,
1355 StreamCodec::FlushOp flushOp) override;
1357 void resetCStream();
1358 void resetDStream();
1360 bool tryBlockCompress(ByteRange& input, MutableByteRange& output) const;
1361 bool tryBlockUncompress(ByteRange& input, MutableByteRange& output) const;
1364 bool needReset_{true};
1367 folly::static_function_deleter<ZSTD_CStream, &zstdFreeCStream>>
1371 folly::static_function_deleter<ZSTD_DStream, &zstdFreeDStream>>
1375 static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528;
1377 std::vector<std::string> ZSTDStreamCodec::validPrefixes() const {
1378 return {prefixToStringLE(kZSTDMagicLE)};
1381 bool ZSTDStreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1383 return dataStartsWithLE(data, kZSTDMagicLE);
1386 std::unique_ptr<Codec> ZSTDStreamCodec::createCodec(int level, CodecType type) {
1387 return make_unique<ZSTDStreamCodec>(level, type);
1390 std::unique_ptr<StreamCodec> ZSTDStreamCodec::createStream(
1393 return make_unique<ZSTDStreamCodec>(level, type);
1396 ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
1397 : StreamCodec(type) {
1398 DCHECK(type == CodecType::ZSTD);
1400 case COMPRESSION_LEVEL_FASTEST:
1403 case COMPRESSION_LEVEL_DEFAULT:
1406 case COMPRESSION_LEVEL_BEST:
1410 if (level < 1 || level > ZSTD_maxCLevel()) {
1411 throw std::invalid_argument(
1412 to<std::string>("ZSTD: invalid level: ", level));
1417 bool ZSTDStreamCodec::doNeedsUncompressedLength() const {
1421 uint64_t ZSTDStreamCodec::doMaxCompressedLength(
1422 uint64_t uncompressedLength) const {
1423 return ZSTD_compressBound(uncompressedLength);
1426 void zstdThrowIfError(size_t rc) {
1427 if (!ZSTD_isError(rc)) {
1430 throw std::runtime_error(
1431 to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1434 Optional<uint64_t> ZSTDStreamCodec::doGetUncompressedLength(
1436 Optional<uint64_t> uncompressedLength) const {
1437 // Read decompressed size from frame if available in first IOBuf.
1438 auto const decompressedSize =
1439 ZSTD_getDecompressedSize(data->data(), data->length());
1440 if (decompressedSize != 0) {
1441 if (uncompressedLength && *uncompressedLength != decompressedSize) {
1442 throw std::runtime_error("ZSTD: invalid uncompressed length");
1444 uncompressedLength = decompressedSize;
1446 return uncompressedLength;
1449 void ZSTDStreamCodec::doResetStream() {
1453 bool ZSTDStreamCodec::tryBlockCompress(
1455 MutableByteRange& output) const {
1457 // We need to know that we have enough output space to use block compression
1458 if (output.size() < ZSTD_compressBound(input.size())) {
1461 size_t const length = ZSTD_compress(
1462 output.data(), output.size(), input.data(), input.size(), level_);
1463 zstdThrowIfError(length);
1464 input.uncheckedAdvance(input.size());
1465 output.uncheckedAdvance(length);
1469 void ZSTDStreamCodec::resetCStream() {
1471 cstream_.reset(ZSTD_createCStream());
1473 throw std::bad_alloc{};
1476 // Advanced API usage works for all supported versions of zstd.
1477 // Required to set contentSizeFlag.
1478 auto params = ZSTD_getParams(level_, uncompressedLength().value_or(0), 0);
1479 params.fParams.contentSizeFlag = uncompressedLength().hasValue();
1480 zstdThrowIfError(ZSTD_initCStream_advanced(
1481 cstream_.get(), nullptr, 0, params, uncompressedLength().value_or(0)));
1484 bool ZSTDStreamCodec::doCompressStream(
1486 MutableByteRange& output,
1487 StreamCodec::FlushOp flushOp) {
1489 // If we are given all the input in one chunk try to use block compression
1490 if (flushOp == StreamCodec::FlushOp::END &&
1491 tryBlockCompress(input, output)) {
1497 ZSTD_inBuffer in = {input.data(), input.size(), 0};
1498 ZSTD_outBuffer out = {output.data(), output.size(), 0};
1500 input.uncheckedAdvance(in.pos);
1501 output.uncheckedAdvance(out.pos);
1503 if (flushOp == StreamCodec::FlushOp::NONE || !input.empty()) {
1504 zstdThrowIfError(ZSTD_compressStream(cstream_.get(), &out, &in));
1506 if (in.pos == in.size && flushOp != StreamCodec::FlushOp::NONE) {
1509 case StreamCodec::FlushOp::FLUSH:
1510 rc = ZSTD_flushStream(cstream_.get(), &out);
1512 case StreamCodec::FlushOp::END:
1513 rc = ZSTD_endStream(cstream_.get(), &out);
1516 throw std::invalid_argument("ZSTD: invalid FlushOp");
1518 zstdThrowIfError(rc);
1526 bool ZSTDStreamCodec::tryBlockUncompress(
1528 MutableByteRange& output) const {
1530 #if ZSTD_VERSION_NUMBER < 10104
1531 // We require ZSTD_findFrameCompressedSize() to perform this optimization.
1534 // We need to know the uncompressed length and have enough output space.
1535 if (!uncompressedLength() || output.size() < *uncompressedLength()) {
1538 size_t const compressedLength =
1539 ZSTD_findFrameCompressedSize(input.data(), input.size());
1540 zstdThrowIfError(compressedLength);
1541 size_t const length = ZSTD_decompress(
1542 output.data(), *uncompressedLength(), input.data(), compressedLength);
1543 zstdThrowIfError(length);
1544 if (length != *uncompressedLength()) {
1545 throw std::runtime_error("ZSTDStreamCodec: Incorrect uncompressed length");
1547 input.uncheckedAdvance(compressedLength);
1548 output.uncheckedAdvance(length);
1553 void ZSTDStreamCodec::resetDStream() {
1555 dstream_.reset(ZSTD_createDStream());
1557 throw std::bad_alloc{};
1560 zstdThrowIfError(ZSTD_initDStream(dstream_.get()));
1563 bool ZSTDStreamCodec::doUncompressStream(
1565 MutableByteRange& output,
1566 StreamCodec::FlushOp flushOp) {
1568 // If we are given all the input in one chunk try to use block uncompression
1569 if (flushOp == StreamCodec::FlushOp::END &&
1570 tryBlockUncompress(input, output)) {
1576 ZSTD_inBuffer in = {input.data(), input.size(), 0};
1577 ZSTD_outBuffer out = {output.data(), output.size(), 0};
1579 input.uncheckedAdvance(in.pos);
1580 output.uncheckedAdvance(out.pos);
1582 size_t const rc = ZSTD_decompressStream(dstream_.get(), &out, &in);
1583 zstdThrowIfError(rc);
1587 #endif // FOLLY_HAVE_LIBZSTD
1589 #if FOLLY_HAVE_LIBBZ2
1591 class Bzip2Codec final : public Codec {
1593 static std::unique_ptr<Codec> create(int level, CodecType type);
1594 explicit Bzip2Codec(int level, CodecType type);
1596 std::vector<std::string> validPrefixes() const override;
1597 bool canUncompress(IOBuf const* data, Optional<uint64_t> uncompressedLength)
1601 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1602 std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
1603 std::unique_ptr<IOBuf> doUncompress(
1605 Optional<uint64_t> uncompressedLength) override;
1610 /* static */ std::unique_ptr<Codec> Bzip2Codec::create(
1613 return std::make_unique<Bzip2Codec>(level, type);
1616 Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
1617 DCHECK(type == CodecType::BZIP2);
1619 case COMPRESSION_LEVEL_FASTEST:
1622 case COMPRESSION_LEVEL_DEFAULT:
1625 case COMPRESSION_LEVEL_BEST:
1629 if (level < 1 || level > 9) {
1630 throw std::invalid_argument(
1631 to<std::string>("Bzip2: invalid level: ", level));
1636 static uint32_t constexpr kBzip2MagicLE = 0x685a42;
1637 static uint64_t constexpr kBzip2MagicBytes = 3;
1639 std::vector<std::string> Bzip2Codec::validPrefixes() const {
1640 return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
1643 bool Bzip2Codec::canUncompress(IOBuf const* data, Optional<uint64_t>) const {
1644 return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
1647 uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1648 // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
1649 // To guarantee that the compressed data will fit in its buffer, allocate an
1650 // output buffer of size 1% larger than the uncompressed data, plus six
1651 // hundred extra bytes.
1652 return uncompressedLength + uncompressedLength / 100 + 600;
1655 static bz_stream createBzStream() {
1657 stream.bzalloc = nullptr;
1658 stream.bzfree = nullptr;
1659 stream.opaque = nullptr;
1660 stream.next_in = stream.next_out = nullptr;
1661 stream.avail_in = stream.avail_out = 0;
1665 // Throws on error condition, otherwise returns the code.
1666 static int bzCheck(int const rc) {
1675 throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
1679 static std::unique_ptr<IOBuf> addOutputBuffer(
1681 uint64_t const bufferLength) {
1682 DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
1683 DCHECK_EQ(stream->avail_out, 0);
1685 auto buf = IOBuf::create(bufferLength);
1686 buf->append(buf->capacity());
1688 stream->next_out = reinterpret_cast<char*>(buf->writableData());
1689 stream->avail_out = buf->length();
1694 std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
1695 bz_stream stream = createBzStream();
1696 bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
1698 bzCheck(BZ2_bzCompressEnd(&stream));
1701 uint64_t const uncompressedLength = data->computeChainDataLength();
1702 uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
1703 uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1704 uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
1706 auto out = addOutputBuffer(
1708 maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
1709 : kDefaultBufferLength);
1711 for (auto range : *data) {
1712 while (!range.empty()) {
1713 auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1715 const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1716 stream.avail_in = inSize;
1718 if (stream.avail_out == 0) {
1719 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1722 bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
1723 range.uncheckedAdvance(inSize - stream.avail_in);
1727 if (stream.avail_out == 0) {
1728 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1730 } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
1732 out->prev()->trimEnd(stream.avail_out);
1737 std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
1739 Optional<uint64_t> uncompressedLength) {
1740 bz_stream stream = createBzStream();
1741 bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
1743 bzCheck(BZ2_bzDecompressEnd(&stream));
1746 uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1747 uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
1748 uint64_t const kDefaultBufferLength =
1749 computeBufferLength(data->computeChainDataLength(), kBlockSize);
1751 auto out = addOutputBuffer(
1753 ((uncompressedLength && *uncompressedLength <= kMaxSingleStepLength)
1754 ? *uncompressedLength
1755 : kDefaultBufferLength));
1758 for (auto range : *data) {
1759 while (!range.empty()) {
1760 auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1762 const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1763 stream.avail_in = inSize;
1765 if (stream.avail_out == 0) {
1766 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1769 rc = bzCheck(BZ2_bzDecompress(&stream));
1770 range.uncheckedAdvance(inSize - stream.avail_in);
1773 while (rc != BZ_STREAM_END) {
1774 if (stream.avail_out == 0) {
1775 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1777 size_t const outputSize = stream.avail_out;
1778 rc = bzCheck(BZ2_bzDecompress(&stream));
1779 if (outputSize == stream.avail_out) {
1780 throw std::runtime_error("Bzip2Codec: Truncated input");
1784 out->prev()->trimEnd(stream.avail_out);
1786 uint64_t const totalOut =
1787 (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
1788 if (uncompressedLength && uncompressedLength != totalOut) {
1789 throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
1795 #endif // FOLLY_HAVE_LIBBZ2
1799 zlib::Options getZlibOptions(CodecType type) {
1800 DCHECK(type == CodecType::GZIP || type == CodecType::ZLIB);
1801 return type == CodecType::GZIP ? zlib::defaultGzipOptions()
1802 : zlib::defaultZlibOptions();
1805 std::unique_ptr<Codec> getZlibCodec(int level, CodecType type) {
1806 return zlib::getCodec(getZlibOptions(type), level);
1809 std::unique_ptr<StreamCodec> getZlibStreamCodec(int level, CodecType type) {
1810 return zlib::getStreamCodec(getZlibOptions(type), level);
1813 #endif // FOLLY_HAVE_LIBZ
1816 * Automatic decompression
1818 class AutomaticCodec final : public Codec {
1820 static std::unique_ptr<Codec> create(
1821 std::vector<std::unique_ptr<Codec>> customCodecs);
1822 explicit AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs);
1824 std::vector<std::string> validPrefixes() const override;
1825 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1829 bool doNeedsUncompressedLength() const override;
1830 uint64_t doMaxUncompressedLength() const override;
1832 uint64_t doMaxCompressedLength(uint64_t) const override {
1833 throw std::runtime_error(
1834 "AutomaticCodec error: maxCompressedLength() not supported.");
1836 std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
1837 throw std::runtime_error("AutomaticCodec error: compress() not supported.");
1839 std::unique_ptr<IOBuf> doUncompress(
1841 Optional<uint64_t> uncompressedLength) override;
1843 void addCodecIfSupported(CodecType type);
1845 // Throws iff the codecs aren't compatible (very slow)
1846 void checkCompatibleCodecs() const;
1848 std::vector<std::unique_ptr<Codec>> codecs_;
1849 bool needsUncompressedLength_;
1850 uint64_t maxUncompressedLength_;
1853 std::vector<std::string> AutomaticCodec::validPrefixes() const {
1854 std::unordered_set<std::string> prefixes;
1855 for (const auto& codec : codecs_) {
1856 const auto codecPrefixes = codec->validPrefixes();
1857 prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
1859 return std::vector<std::string>{prefixes.begin(), prefixes.end()};
1862 bool AutomaticCodec::canUncompress(
1864 Optional<uint64_t> uncompressedLength) const {
1868 [data, uncompressedLength](std::unique_ptr<Codec> const& codec) {
1869 return codec->canUncompress(data, uncompressedLength);
1873 void AutomaticCodec::addCodecIfSupported(CodecType type) {
1874 const bool present = std::any_of(
1877 [&type](std::unique_ptr<Codec> const& codec) {
1878 return codec->type() == type;
1880 if (hasCodec(type) && !present) {
1881 codecs_.push_back(getCodec(type));
1885 /* static */ std::unique_ptr<Codec> AutomaticCodec::create(
1886 std::vector<std::unique_ptr<Codec>> customCodecs) {
1887 return std::make_unique<AutomaticCodec>(std::move(customCodecs));
1890 AutomaticCodec::AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs)
1891 : Codec(CodecType::USER_DEFINED), codecs_(std::move(customCodecs)) {
1892 // Fastest -> slowest
1893 addCodecIfSupported(CodecType::LZ4_FRAME);
1894 addCodecIfSupported(CodecType::ZSTD);
1895 addCodecIfSupported(CodecType::ZLIB);
1896 addCodecIfSupported(CodecType::GZIP);
1897 addCodecIfSupported(CodecType::LZMA2);
1898 addCodecIfSupported(CodecType::BZIP2);
1900 checkCompatibleCodecs();
1902 // Check that none of the codes are are null
1903 DCHECK(std::none_of(
1904 codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1905 return codec == nullptr;
1908 needsUncompressedLength_ = std::any_of(
1909 codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
1910 return codec->needsUncompressedLength();
1913 const auto it = std::max_element(
1916 [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) {
1917 return lhs->maxUncompressedLength() < rhs->maxUncompressedLength();
1919 DCHECK(it != codecs_.end());
1920 maxUncompressedLength_ = (*it)->maxUncompressedLength();
1923 void AutomaticCodec::checkCompatibleCodecs() const {
1924 // Keep track of all the possible headers.
1925 std::unordered_set<std::string> headers;
1926 // The empty header is not allowed.
1929 // Construct a set of headers and check that none of the headers occur twice.
1930 // Eliminate edge cases.
1931 for (auto&& codec : codecs_) {
1932 const auto codecHeaders = codec->validPrefixes();
1933 // Codecs without any valid headers are not allowed.
1934 if (codecHeaders.empty()) {
1935 throw std::invalid_argument{
1936 "AutomaticCodec: validPrefixes() must not be empty."};
1938 // Insert all the headers for the current codec.
1939 const size_t beforeSize = headers.size();
1940 headers.insert(codecHeaders.begin(), codecHeaders.end());
1941 // Codecs are not compatible if any header occurred twice.
1942 if (beforeSize + codecHeaders.size() != headers.size()) {
1943 throw std::invalid_argument{
1944 "AutomaticCodec: Two valid prefixes collide."};
1948 // Check if any strict non-empty prefix of any header is a header.
1949 for (const auto& header : headers) {
1950 for (size_t i = 1; i < header.size(); ++i) {
1951 if (headers.count(header.substr(0, i))) {
1952 throw std::invalid_argument{
1953 "AutomaticCodec: One valid prefix is a prefix of another valid "
1960 bool AutomaticCodec::doNeedsUncompressedLength() const {
1961 return needsUncompressedLength_;
1964 uint64_t AutomaticCodec::doMaxUncompressedLength() const {
1965 return maxUncompressedLength_;
1968 std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
1970 Optional<uint64_t> uncompressedLength) {
1971 for (auto&& codec : codecs_) {
1972 if (codec->canUncompress(data, uncompressedLength)) {
1973 return codec->uncompress(data, uncompressedLength);
1976 throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
1979 using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
1980 using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
1983 StreamCodecFactory stream;
1987 codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
1989 {NoCompressionCodec::create, nullptr},
1991 #if FOLLY_HAVE_LIBLZ4
1992 {LZ4Codec::create, nullptr},
1997 #if FOLLY_HAVE_LIBSNAPPY
1998 {SnappyCodec::create, nullptr},
2004 {getZlibCodec, getZlibStreamCodec},
2009 #if FOLLY_HAVE_LIBLZ4
2010 {LZ4Codec::create, nullptr},
2015 #if FOLLY_HAVE_LIBLZMA
2016 {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2017 {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2023 #if FOLLY_HAVE_LIBZSTD
2024 {ZSTDStreamCodec::createCodec, ZSTDStreamCodec::createStream},
2030 {getZlibCodec, getZlibStreamCodec},
2035 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
2036 {LZ4FrameCodec::create, nullptr},
2041 #if FOLLY_HAVE_LIBBZ2
2042 {Bzip2Codec::create, nullptr},
2048 Factory const& getFactory(CodecType type) {
2049 size_t const idx = static_cast<size_t>(type);
2050 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
2051 throw std::invalid_argument(
2052 to<std::string>("Compression type ", idx, " invalid"));
2054 return codecFactories[idx];
2058 bool hasCodec(CodecType type) {
2059 return getFactory(type).codec != nullptr;
2062 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
2063 auto const factory = getFactory(type).codec;
2065 throw std::invalid_argument(
2066 to<std::string>("Compression type ", type, " not supported"));
2068 auto codec = (*factory)(level, type);
2069 DCHECK(codec->type() == type);
2073 bool hasStreamCodec(CodecType type) {
2074 return getFactory(type).stream != nullptr;
2077 std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
2078 auto const factory = getFactory(type).stream;
2080 throw std::invalid_argument(
2081 to<std::string>("Compression type ", type, " not supported"));
2083 auto codec = (*factory)(level, type);
2084 DCHECK(codec->type() == type);
2088 std::unique_ptr<Codec> getAutoUncompressionCodec(
2089 std::vector<std::unique_ptr<Codec>> customCodecs) {
2090 return AutomaticCodec::create(std::move(customCodecs));
2093 } // namespace folly