2 * Copyright 2013-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/compression/Compression.h>
22 #if LZ4_VERSION_NUMBER >= 10301
27 #include <glog/logging.h>
29 #if FOLLY_HAVE_LIBSNAPPY
30 #include <snappy-sinksource.h>
35 #include <folly/compression/Zlib.h>
38 #if FOLLY_HAVE_LIBLZMA
42 #if FOLLY_HAVE_LIBZSTD
43 #define ZSTD_STATIC_LINKING_ONLY
51 #include <folly/Conv.h>
52 #include <folly/Memory.h>
53 #include <folly/Portability.h>
54 #include <folly/Random.h>
55 #include <folly/ScopeGuard.h>
56 #include <folly/Varint.h>
57 #include <folly/compression/Utils.h>
58 #include <folly/io/Cursor.h>
59 #include <folly/lang/Bits.h>
60 #include <folly/stop_watch.h>
62 #include <unordered_set>
64 using folly::io::compression::detail::dataStartsWithLE;
65 using folly::io::compression::detail::prefixToStringLE;
77 bytesBeforeCompression_ = {type,
80 CompressionCounterKey::BYTES_BEFORE_COMPRESSION,
81 CompressionCounterType::SUM};
82 bytesAfterCompression_ = {type,
85 CompressionCounterKey::BYTES_AFTER_COMPRESSION,
86 CompressionCounterType::SUM};
87 bytesBeforeDecompression_ = {
91 CompressionCounterKey::BYTES_BEFORE_DECOMPRESSION,
92 CompressionCounterType::SUM};
93 bytesAfterDecompression_ = {
97 CompressionCounterKey::BYTES_AFTER_DECOMPRESSION,
98 CompressionCounterType::SUM};
99 compressions_ = {type,
102 CompressionCounterKey::COMPRESSIONS,
103 CompressionCounterType::SUM};
104 decompressions_ = {type,
107 CompressionCounterKey::DECOMPRESSIONS,
108 CompressionCounterType::SUM};
109 compressionMilliseconds_ = {type,
112 CompressionCounterKey::COMPRESSION_MILLISECONDS,
113 CompressionCounterType::SUM};
114 decompressionMilliseconds_ = {
118 CompressionCounterKey::DECOMPRESSION_MILLISECONDS,
119 CompressionCounterType::SUM};
124 constexpr uint32_t kLoggingRate = 50;
128 explicit Timer(folly::detail::CompressionCounter& counter)
129 : counter_(&counter) {}
132 *counter_ += timer_.elapsed().count();
136 folly::detail::CompressionCounter* counter_;
137 stop_watch<std::chrono::milliseconds> timer_;
141 // Ensure consistent behavior in the nullptr case
142 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
143 if (data == nullptr) {
144 throw std::invalid_argument("Codec: data must not be nullptr");
146 const uint64_t len = data->computeChainDataLength();
147 if (len > maxUncompressedLength()) {
148 throw std::runtime_error("Codec: uncompressed length too large");
150 bool const logging = folly::Random::oneIn(kLoggingRate);
151 folly::Optional<Timer> const timer =
152 logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>();
153 auto result = doCompress(data);
156 bytesBeforeCompression_ += len;
157 bytesAfterCompression_ += result->computeChainDataLength();
162 std::string Codec::compress(const StringPiece data) {
163 const uint64_t len = data.size();
164 if (len > maxUncompressedLength()) {
165 throw std::runtime_error("Codec: uncompressed length too large");
167 bool const logging = folly::Random::oneIn(kLoggingRate);
168 folly::Optional<Timer> const timer =
169 logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>();
170 auto result = doCompressString(data);
173 bytesBeforeCompression_ += len;
174 bytesAfterCompression_ += result.size();
179 std::unique_ptr<IOBuf> Codec::uncompress(
181 Optional<uint64_t> uncompressedLength) {
182 if (data == nullptr) {
183 throw std::invalid_argument("Codec: data must not be nullptr");
185 if (!uncompressedLength) {
186 if (needsUncompressedLength()) {
187 throw std::invalid_argument("Codec: uncompressed length required");
189 } else if (*uncompressedLength > maxUncompressedLength()) {
190 throw std::runtime_error("Codec: uncompressed length too large");
194 if (uncompressedLength.value_or(0) != 0) {
195 throw std::runtime_error("Codec: invalid uncompressed length");
197 return IOBuf::create(0);
200 bool const logging = folly::Random::oneIn(kLoggingRate);
201 folly::Optional<Timer> const timer =
202 logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>();
203 auto result = doUncompress(data, uncompressedLength);
206 bytesBeforeDecompression_ += data->computeChainDataLength();
207 bytesAfterDecompression_ += result->computeChainDataLength();
212 std::string Codec::uncompress(
213 const StringPiece data,
214 Optional<uint64_t> uncompressedLength) {
215 if (!uncompressedLength) {
216 if (needsUncompressedLength()) {
217 throw std::invalid_argument("Codec: uncompressed length required");
219 } else if (*uncompressedLength > maxUncompressedLength()) {
220 throw std::runtime_error("Codec: uncompressed length too large");
224 if (uncompressedLength.value_or(0) != 0) {
225 throw std::runtime_error("Codec: invalid uncompressed length");
230 bool const logging = folly::Random::oneIn(kLoggingRate);
231 folly::Optional<Timer> const timer =
232 logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>();
233 auto result = doUncompressString(data, uncompressedLength);
236 bytesBeforeDecompression_ += data.size();
237 bytesAfterDecompression_ += result.size();
242 bool Codec::needsUncompressedLength() const {
243 return doNeedsUncompressedLength();
246 uint64_t Codec::maxUncompressedLength() const {
247 return doMaxUncompressedLength();
250 bool Codec::doNeedsUncompressedLength() const {
254 uint64_t Codec::doMaxUncompressedLength() const {
255 return UNLIMITED_UNCOMPRESSED_LENGTH;
258 std::vector<std::string> Codec::validPrefixes() const {
262 bool Codec::canUncompress(const IOBuf*, Optional<uint64_t>) const {
266 std::string Codec::doCompressString(const StringPiece data) {
267 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
268 auto outputBuffer = doCompress(&inputBuffer);
270 output.reserve(outputBuffer->computeChainDataLength());
271 for (auto range : *outputBuffer) {
272 output.append(reinterpret_cast<const char*>(range.data()), range.size());
277 std::string Codec::doUncompressString(
278 const StringPiece data,
279 Optional<uint64_t> uncompressedLength) {
280 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
281 auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
283 output.reserve(outputBuffer->computeChainDataLength());
284 for (auto range : *outputBuffer) {
285 output.append(reinterpret_cast<const char*>(range.data()), range.size());
290 uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
291 return doMaxCompressedLength(uncompressedLength);
294 Optional<uint64_t> Codec::getUncompressedLength(
295 const folly::IOBuf* data,
296 Optional<uint64_t> uncompressedLength) const {
297 auto const compressedLength = data->computeChainDataLength();
298 if (compressedLength == 0) {
299 if (uncompressedLength.value_or(0) != 0) {
300 throw std::runtime_error("Invalid uncompressed length");
304 return doGetUncompressedLength(data, uncompressedLength);
307 Optional<uint64_t> Codec::doGetUncompressedLength(
309 Optional<uint64_t> uncompressedLength) const {
310 return uncompressedLength;
313 bool StreamCodec::needsDataLength() const {
314 return doNeedsDataLength();
317 bool StreamCodec::doNeedsDataLength() const {
321 void StreamCodec::assertStateIs(State expected) const {
322 if (state_ != expected) {
323 throw std::logic_error(folly::to<std::string>(
324 "Codec: state is ", state_, "; expected state ", expected));
328 void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
329 state_ = State::RESET;
330 uncompressedLength_ = uncompressedLength;
331 progressMade_ = true;
335 bool StreamCodec::compressStream(
337 MutableByteRange& output,
338 StreamCodec::FlushOp flushOp) {
339 if (state_ == State::RESET && input.empty() &&
340 flushOp == StreamCodec::FlushOp::END &&
341 uncompressedLength().value_or(0) != 0) {
342 throw std::runtime_error("Codec: invalid uncompressed length");
345 if (!uncompressedLength() && needsDataLength()) {
346 throw std::runtime_error("Codec: uncompressed length required");
348 if (state_ == State::RESET && !input.empty() &&
349 uncompressedLength() == uint64_t(0)) {
350 throw std::runtime_error("Codec: invalid uncompressed length");
352 // Handle input state transitions
354 case StreamCodec::FlushOp::NONE:
355 if (state_ == State::RESET) {
356 state_ = State::COMPRESS;
358 assertStateIs(State::COMPRESS);
360 case StreamCodec::FlushOp::FLUSH:
361 if (state_ == State::RESET || state_ == State::COMPRESS) {
362 state_ = State::COMPRESS_FLUSH;
364 assertStateIs(State::COMPRESS_FLUSH);
366 case StreamCodec::FlushOp::END:
367 if (state_ == State::RESET || state_ == State::COMPRESS) {
368 state_ = State::COMPRESS_END;
370 assertStateIs(State::COMPRESS_END);
373 size_t const inputSize = input.size();
374 size_t const outputSize = output.size();
375 bool const done = doCompressStream(input, output, flushOp);
376 if (!done && inputSize == input.size() && outputSize == output.size()) {
377 if (!progressMade_) {
378 throw std::runtime_error("Codec: No forward progress made");
380 // Throw an exception if there is no progress again next time
381 progressMade_ = false;
383 progressMade_ = true;
385 // Handle output state transitions
387 if (state_ == State::COMPRESS_FLUSH) {
388 state_ = State::COMPRESS;
389 } else if (state_ == State::COMPRESS_END) {
392 // Check internal invariants
393 DCHECK(input.empty());
394 DCHECK(flushOp != StreamCodec::FlushOp::NONE);
399 bool StreamCodec::uncompressStream(
401 MutableByteRange& output,
402 StreamCodec::FlushOp flushOp) {
403 if (state_ == State::RESET && input.empty()) {
404 if (uncompressedLength().value_or(0) == 0) {
409 // Handle input state transitions
410 if (state_ == State::RESET) {
411 state_ = State::UNCOMPRESS;
413 assertStateIs(State::UNCOMPRESS);
414 size_t const inputSize = input.size();
415 size_t const outputSize = output.size();
416 bool const done = doUncompressStream(input, output, flushOp);
417 if (!done && inputSize == input.size() && outputSize == output.size()) {
418 if (!progressMade_) {
419 throw std::runtime_error("Codec: no forward progress made");
421 // Throw an exception if there is no progress again next time
422 progressMade_ = false;
424 progressMade_ = true;
426 // Handle output state transitions
433 static std::unique_ptr<IOBuf> addOutputBuffer(
434 MutableByteRange& output,
436 DCHECK(output.empty());
437 auto buffer = IOBuf::create(size);
438 buffer->append(buffer->capacity());
439 output = {buffer->writableData(), buffer->length()};
443 std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
444 uint64_t const uncompressedLength = data->computeChainDataLength();
445 resetStream(uncompressedLength);
446 uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
448 auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
449 auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
451 MutableByteRange output;
452 auto buffer = addOutputBuffer(
454 maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
455 : kDefaultBufferLength);
457 // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
458 IOBuf const* current = data;
459 ByteRange input{current->data(), current->length()};
460 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
463 while (input.empty() && current->next() != data) {
464 current = current->next();
465 input = {current->data(), current->length()};
467 if (current->next() == data) {
468 // This is the last input buffer so end the stream
469 flushOp = StreamCodec::FlushOp::END;
471 if (output.empty()) {
472 buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
474 done = compressStream(input, output, flushOp);
476 DCHECK(input.empty());
477 DCHECK(flushOp == StreamCodec::FlushOp::END);
478 DCHECK_EQ(current->next(), data);
481 buffer->prev()->trimEnd(output.size());
485 static uint64_t computeBufferLength(
486 uint64_t const compressedLength,
487 uint64_t const blockSize) {
488 uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
489 uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
490 return std::min(goodBufferSize, kMaxBufferLength);
493 std::unique_ptr<IOBuf> StreamCodec::doUncompress(
495 Optional<uint64_t> uncompressedLength) {
496 auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
497 auto constexpr kBlockSize = uint64_t(128) << 10;
498 auto const defaultBufferLength =
499 computeBufferLength(data->computeChainDataLength(), kBlockSize);
501 uncompressedLength = getUncompressedLength(data, uncompressedLength);
502 resetStream(uncompressedLength);
504 MutableByteRange output;
505 auto buffer = addOutputBuffer(
507 (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
508 ? *uncompressedLength
509 : defaultBufferLength));
511 // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
512 IOBuf const* current = data;
513 ByteRange input{current->data(), current->length()};
514 StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
517 while (input.empty() && current->next() != data) {
518 current = current->next();
519 input = {current->data(), current->length()};
521 if (current->next() == data) {
522 // Tell the uncompressor there is no more input (it may optimize)
523 flushOp = StreamCodec::FlushOp::END;
525 if (output.empty()) {
526 buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
528 done = uncompressStream(input, output, flushOp);
530 if (!input.empty()) {
531 throw std::runtime_error("Codec: Junk after end of data");
534 buffer->prev()->trimEnd(output.size());
535 if (uncompressedLength &&
536 *uncompressedLength != buffer->computeChainDataLength()) {
537 throw std::runtime_error("Codec: invalid uncompressed length");
548 class NoCompressionCodec final : public Codec {
550 static std::unique_ptr<Codec> create(int level, CodecType type);
551 explicit NoCompressionCodec(int level, CodecType type);
554 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
555 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
556 std::unique_ptr<IOBuf> doUncompress(
558 Optional<uint64_t> uncompressedLength) override;
561 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
562 return std::make_unique<NoCompressionCodec>(level, type);
565 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
567 DCHECK(type == CodecType::NO_COMPRESSION);
569 case COMPRESSION_LEVEL_DEFAULT:
570 case COMPRESSION_LEVEL_FASTEST:
571 case COMPRESSION_LEVEL_BEST:
575 throw std::invalid_argument(to<std::string>(
576 "NoCompressionCodec: invalid level ", level));
580 uint64_t NoCompressionCodec::doMaxCompressedLength(
581 uint64_t uncompressedLength) const {
582 return uncompressedLength;
585 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
587 return data->clone();
590 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
592 Optional<uint64_t> uncompressedLength) {
593 if (uncompressedLength &&
594 data->computeChainDataLength() != *uncompressedLength) {
595 throw std::runtime_error(
596 to<std::string>("NoCompressionCodec: invalid uncompressed length"));
598 return data->clone();
601 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
605 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
606 DCHECK_GE(out->tailroom(), kMaxVarintLength64);
607 out->append(encodeVarint(val, out->writableTail()));
610 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
613 for (int shift = 0; shift <= 63; shift += 7) {
614 b = cursor.read<int8_t>();
615 val |= static_cast<uint64_t>(b & 0x7f) << shift;
621 throw std::invalid_argument("Invalid varint value. Too big.");
628 #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
630 #if FOLLY_HAVE_LIBLZ4
635 class LZ4Codec final : public Codec {
637 static std::unique_ptr<Codec> create(int level, CodecType type);
638 explicit LZ4Codec(int level, CodecType type);
641 bool doNeedsUncompressedLength() const override;
642 uint64_t doMaxUncompressedLength() const override;
643 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
645 bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
647 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
648 std::unique_ptr<IOBuf> doUncompress(
650 Optional<uint64_t> uncompressedLength) override;
652 bool highCompression_;
655 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
656 return std::make_unique<LZ4Codec>(level, type);
659 static bool lz4ConvertLevel(int level) {
662 case COMPRESSION_LEVEL_FASTEST:
663 case COMPRESSION_LEVEL_DEFAULT:
666 case COMPRESSION_LEVEL_BEST:
669 throw std::invalid_argument(
670 to<std::string>("LZ4Codec: invalid level: ", level));
673 LZ4Codec::LZ4Codec(int level, CodecType type)
674 : Codec(type, lz4ConvertLevel(level)),
675 highCompression_(lz4ConvertLevel(level) > 1) {
676 DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
679 bool LZ4Codec::doNeedsUncompressedLength() const {
680 return !encodeSize();
683 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
684 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
686 #ifndef LZ4_MAX_INPUT_SIZE
687 # define LZ4_MAX_INPUT_SIZE 0x7E000000
690 uint64_t LZ4Codec::doMaxUncompressedLength() const {
691 return LZ4_MAX_INPUT_SIZE;
694 uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
695 return LZ4_compressBound(uncompressedLength) +
696 (encodeSize() ? kMaxVarintLength64 : 0);
699 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
701 if (data->isChained()) {
702 // LZ4 doesn't support streaming, so we have to coalesce
703 clone = data->cloneCoalescedAsValue();
707 auto out = IOBuf::create(maxCompressedLength(data->length()));
709 encodeVarintToIOBuf(data->length(), out.get());
713 auto input = reinterpret_cast<const char*>(data->data());
714 auto output = reinterpret_cast<char*>(out->writableTail());
715 const auto inputLength = data->length();
716 #if LZ4_VERSION_NUMBER >= 10700
717 if (highCompression_) {
718 n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
720 n = LZ4_compress_default(input, output, inputLength, out->tailroom());
723 if (highCompression_) {
724 n = LZ4_compressHC(input, output, inputLength);
726 n = LZ4_compress(input, output, inputLength);
731 CHECK_LE(n, out->capacity());
737 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
739 Optional<uint64_t> uncompressedLength) {
741 if (data->isChained()) {
742 // LZ4 doesn't support streaming, so we have to coalesce
743 clone = data->cloneCoalescedAsValue();
747 folly::io::Cursor cursor(data);
748 uint64_t actualUncompressedLength;
750 actualUncompressedLength = decodeVarintFromCursor(cursor);
751 if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
752 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
756 DCHECK(uncompressedLength.hasValue());
757 DCHECK(*uncompressedLength <= maxUncompressedLength());
758 actualUncompressedLength = *uncompressedLength;
761 auto sp = StringPiece{cursor.peekBytes()};
762 auto out = IOBuf::create(actualUncompressedLength);
763 int n = LZ4_decompress_safe(
765 reinterpret_cast<char*>(out->writableTail()),
767 actualUncompressedLength);
769 if (n < 0 || uint64_t(n) != actualUncompressedLength) {
770 throw std::runtime_error(to<std::string>(
771 "LZ4 decompression returned invalid value ", n));
773 out->append(actualUncompressedLength);
777 #if LZ4_VERSION_NUMBER >= 10301
779 class LZ4FrameCodec final : public Codec {
781 static std::unique_ptr<Codec> create(int level, CodecType type);
782 explicit LZ4FrameCodec(int level, CodecType type);
783 ~LZ4FrameCodec() override;
785 std::vector<std::string> validPrefixes() const override;
786 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
790 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
792 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
793 std::unique_ptr<IOBuf> doUncompress(
795 Optional<uint64_t> uncompressedLength) override;
797 // Reset the dctx_ if it is dirty or null.
801 LZ4F_decompressionContext_t dctx_{nullptr};
805 /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
808 return std::make_unique<LZ4FrameCodec>(level, type);
811 static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204;
813 std::vector<std::string> LZ4FrameCodec::validPrefixes() const {
814 return {prefixToStringLE(kLZ4FrameMagicLE)};
817 bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
818 return dataStartsWithLE(data, kLZ4FrameMagicLE);
821 uint64_t LZ4FrameCodec::doMaxCompressedLength(
822 uint64_t uncompressedLength) const {
823 LZ4F_preferences_t prefs{};
824 prefs.compressionLevel = level_;
825 prefs.frameInfo.contentSize = uncompressedLength;
826 return LZ4F_compressFrameBound(uncompressedLength, &prefs);
829 static size_t lz4FrameThrowOnError(size_t code) {
830 if (LZ4F_isError(code)) {
831 throw std::runtime_error(
832 to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
837 void LZ4FrameCodec::resetDCtx() {
838 if (dctx_ && !dirty_) {
842 LZ4F_freeDecompressionContext(dctx_);
844 lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
848 static int lz4fConvertLevel(int level) {
850 case COMPRESSION_LEVEL_FASTEST:
851 case COMPRESSION_LEVEL_DEFAULT:
853 case COMPRESSION_LEVEL_BEST:
859 LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type)
860 : Codec(type, lz4fConvertLevel(level)), level_(lz4fConvertLevel(level)) {
861 DCHECK(type == CodecType::LZ4_FRAME);
864 LZ4FrameCodec::~LZ4FrameCodec() {
866 LZ4F_freeDecompressionContext(dctx_);
870 std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
871 // LZ4 Frame compression doesn't support streaming so we have to coalesce
873 if (data->isChained()) {
874 clone = data->cloneCoalescedAsValue();
878 const auto uncompressedLength = data->length();
879 LZ4F_preferences_t prefs{};
880 prefs.compressionLevel = level_;
881 prefs.frameInfo.contentSize = uncompressedLength;
883 auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
884 const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
890 buf->append(written);
894 std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
896 Optional<uint64_t> uncompressedLength) {
897 // Reset the dctx if any errors have occurred
900 ByteRange in = *data->begin();
902 if (data->isChained()) {
903 clone = data->cloneCoalescedAsValue();
904 in = clone.coalesce();
907 // Select decompression options
908 LZ4F_decompressOptions_t options;
909 options.stableDst = 1;
910 // Select blockSize and growthSize for the IOBufQueue
911 IOBufQueue queue(IOBufQueue::cacheChainLength());
912 auto blockSize = uint64_t{64} << 10;
913 auto growthSize = uint64_t{4} << 20;
914 if (uncompressedLength) {
915 // Allocate uncompressedLength in one chunk (up to 64 MB)
916 const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20);
917 queue.preallocate(allocateSize, allocateSize);
918 blockSize = std::min(*uncompressedLength, blockSize);
919 growthSize = std::min(*uncompressedLength, growthSize);
921 // Reduce growthSize for small data
922 const auto guessUncompressedLen =
923 4 * std::max<uint64_t>(blockSize, in.size());
924 growthSize = std::min(guessUncompressedLen, growthSize);
926 // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
929 // Decompress until the frame is over
932 // Allocate enough space to decompress at least a block
935 std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
937 size_t inSize = in.size();
938 code = lz4FrameThrowOnError(
939 LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
940 if (in.empty() && outSize == 0 && code != 0) {
941 // We passed no input, no output was produced, and the frame isn't over
942 // No more forward progress is possible
943 throw std::runtime_error("LZ4Frame error: Incomplete frame");
945 in.uncheckedAdvance(inSize);
946 queue.postallocate(outSize);
948 // At this point the decompression context can be reused
950 if (uncompressedLength && queue.chainLength() != *uncompressedLength) {
951 throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
956 #endif // LZ4_VERSION_NUMBER >= 10301
957 #endif // FOLLY_HAVE_LIBLZ4
959 #if FOLLY_HAVE_LIBSNAPPY
966 * Implementation of snappy::Source that reads from a IOBuf chain.
968 class IOBufSnappySource final : public snappy::Source {
970 explicit IOBufSnappySource(const IOBuf* data);
971 size_t Available() const override;
972 const char* Peek(size_t* len) override;
973 void Skip(size_t n) override;
979 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
980 : available_(data->computeChainDataLength()),
984 size_t IOBufSnappySource::Available() const {
988 const char* IOBufSnappySource::Peek(size_t* len) {
989 auto sp = StringPiece{cursor_.peekBytes()};
994 void IOBufSnappySource::Skip(size_t n) {
995 CHECK_LE(n, available_);
1000 class SnappyCodec final : public Codec {
1002 static std::unique_ptr<Codec> create(int level, CodecType type);
1003 explicit SnappyCodec(int level, CodecType type);
1006 uint64_t doMaxUncompressedLength() const override;
1007 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1008 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1009 std::unique_ptr<IOBuf> doUncompress(
1011 Optional<uint64_t> uncompressedLength) override;
1014 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
1015 return std::make_unique<SnappyCodec>(level, type);
1018 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
1019 DCHECK(type == CodecType::SNAPPY);
1021 case COMPRESSION_LEVEL_FASTEST:
1022 case COMPRESSION_LEVEL_DEFAULT:
1023 case COMPRESSION_LEVEL_BEST:
1027 throw std::invalid_argument(to<std::string>(
1028 "SnappyCodec: invalid level: ", level));
1032 uint64_t SnappyCodec::doMaxUncompressedLength() const {
1033 // snappy.h uses uint32_t for lengths, so there's that.
1034 return std::numeric_limits<uint32_t>::max();
1037 uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1038 return snappy::MaxCompressedLength(uncompressedLength);
1041 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
1042 IOBufSnappySource source(data);
1043 auto out = IOBuf::create(maxCompressedLength(source.Available()));
1045 snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
1046 out->writableTail()));
1048 size_t n = snappy::Compress(&source, &sink);
1050 CHECK_LE(n, out->capacity());
1055 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(
1057 Optional<uint64_t> uncompressedLength) {
1058 uint32_t actualUncompressedLength = 0;
1061 IOBufSnappySource source(data);
1062 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
1063 throw std::runtime_error("snappy::GetUncompressedLength failed");
1065 if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
1066 throw std::runtime_error("snappy: invalid uncompressed length");
1070 auto out = IOBuf::create(actualUncompressedLength);
1073 IOBufSnappySource source(data);
1074 if (!snappy::RawUncompress(&source,
1075 reinterpret_cast<char*>(out->writableTail()))) {
1076 throw std::runtime_error("snappy::RawUncompress failed");
1080 out->append(actualUncompressedLength);
1084 #endif // FOLLY_HAVE_LIBSNAPPY
1086 #if FOLLY_HAVE_LIBLZMA
1091 class LZMA2StreamCodec final : public StreamCodec {
1093 static std::unique_ptr<Codec> createCodec(int level, CodecType type);
1094 static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
1095 explicit LZMA2StreamCodec(int level, CodecType type);
1096 ~LZMA2StreamCodec() override;
1098 std::vector<std::string> validPrefixes() const override;
1099 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1103 bool doNeedsDataLength() const override;
1104 uint64_t doMaxUncompressedLength() const override;
1105 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1107 bool encodeSize() const {
1108 return type() == CodecType::LZMA2_VARINT_SIZE;
1111 void doResetStream() override;
1112 bool doCompressStream(
1114 MutableByteRange& output,
1115 StreamCodec::FlushOp flushOp) override;
1116 bool doUncompressStream(
1118 MutableByteRange& output,
1119 StreamCodec::FlushOp flushOp) override;
1121 void resetCStream();
1122 void resetDStream();
1124 bool decodeAndCheckVarint(ByteRange& input);
1125 bool flushVarintBuffer(MutableByteRange& output);
1126 void resetVarintBuffer();
1128 Optional<lzma_stream> cstream_{};
1129 Optional<lzma_stream> dstream_{};
1131 std::array<uint8_t, kMaxVarintLength64> varintBuffer_;
1132 ByteRange varintToEncode_;
1133 size_t varintBufferPos_{0};
1136 bool needReset_{true};
1137 bool needDecodeSize_{false};
1140 static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD;
1141 static constexpr unsigned kLZMA2MagicBytes = 6;
1143 std::vector<std::string> LZMA2StreamCodec::validPrefixes() const {
1144 if (type() == CodecType::LZMA2_VARINT_SIZE) {
1147 return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)};
1150 bool LZMA2StreamCodec::doNeedsDataLength() const {
1151 return encodeSize();
1154 bool LZMA2StreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1156 if (type() == CodecType::LZMA2_VARINT_SIZE) {
1159 // Returns false for all inputs less than 8 bytes.
1160 // This is okay, because no valid LZMA2 streams are less than 8 bytes.
1161 return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes);
1164 std::unique_ptr<Codec> LZMA2StreamCodec::createCodec(
1167 return make_unique<LZMA2StreamCodec>(level, type);
1170 std::unique_ptr<StreamCodec> LZMA2StreamCodec::createStream(
1173 return make_unique<LZMA2StreamCodec>(level, type);
1176 LZMA2StreamCodec::LZMA2StreamCodec(int level, CodecType type)
1177 : StreamCodec(type) {
1178 DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
1180 case COMPRESSION_LEVEL_FASTEST:
1183 case COMPRESSION_LEVEL_DEFAULT:
1184 level = LZMA_PRESET_DEFAULT;
1186 case COMPRESSION_LEVEL_BEST:
1190 if (level < 0 || level > 9) {
1191 throw std::invalid_argument(
1192 to<std::string>("LZMA2Codec: invalid level: ", level));
1197 LZMA2StreamCodec::~LZMA2StreamCodec() {
1199 lzma_end(cstream_.get_pointer());
1203 lzma_end(dstream_.get_pointer());
1208 uint64_t LZMA2StreamCodec::doMaxUncompressedLength() const {
1209 // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
1210 return uint64_t(1) << 63;
1213 uint64_t LZMA2StreamCodec::doMaxCompressedLength(
1214 uint64_t uncompressedLength) const {
1215 return lzma_stream_buffer_bound(uncompressedLength) +
1216 (encodeSize() ? kMaxVarintLength64 : 0);
1219 void LZMA2StreamCodec::doResetStream() {
1223 void LZMA2StreamCodec::resetCStream() {
1225 cstream_.assign(LZMA_STREAM_INIT);
1228 lzma_easy_encoder(cstream_.get_pointer(), level_, LZMA_CHECK_NONE);
1229 if (rc != LZMA_OK) {
1230 throw std::runtime_error(folly::to<std::string>(
1231 "LZMA2StreamCodec: lzma_easy_encoder error: ", rc));
1235 void LZMA2StreamCodec::resetDStream() {
1237 dstream_.assign(LZMA_STREAM_INIT);
1239 lzma_ret const rc = lzma_auto_decoder(
1240 dstream_.get_pointer(), std::numeric_limits<uint64_t>::max(), 0);
1241 if (rc != LZMA_OK) {
1242 throw std::runtime_error(folly::to<std::string>(
1243 "LZMA2StreamCodec: lzma_auto_decoder error: ", rc));
1247 static lzma_ret lzmaThrowOnError(lzma_ret const rc) {
1250 case LZMA_STREAM_END:
1251 case LZMA_BUF_ERROR: // not fatal: returned if no progress was made twice
1254 throw std::runtime_error(
1255 to<std::string>("LZMA2StreamCodec: error: ", rc));
1259 static lzma_action lzmaTranslateFlush(StreamCodec::FlushOp flush) {
1261 case StreamCodec::FlushOp::NONE:
1263 case StreamCodec::FlushOp::FLUSH:
1264 return LZMA_SYNC_FLUSH;
1265 case StreamCodec::FlushOp::END:
1268 throw std::invalid_argument("LZMA2StreamCodec: Invalid flush");
1273 * Flushes the varint buffer.
1274 * Advances output by the number of bytes written.
1275 * Returns true when flushing is complete.
1277 bool LZMA2StreamCodec::flushVarintBuffer(MutableByteRange& output) {
1278 if (varintToEncode_.empty()) {
1281 const size_t numBytesToCopy = std::min(varintToEncode_.size(), output.size());
1282 if (numBytesToCopy > 0) {
1283 memcpy(output.data(), varintToEncode_.data(), numBytesToCopy);
1285 varintToEncode_.advance(numBytesToCopy);
1286 output.advance(numBytesToCopy);
1287 return varintToEncode_.empty();
1290 bool LZMA2StreamCodec::doCompressStream(
1292 MutableByteRange& output,
1293 StreamCodec::FlushOp flushOp) {
1297 varintBufferPos_ = 0;
1298 size_t const varintSize =
1299 encodeVarint(*uncompressedLength(), varintBuffer_.data());
1300 varintToEncode_ = {varintBuffer_.data(), varintSize};
1305 if (!flushVarintBuffer(output)) {
1309 cstream_->next_in = const_cast<uint8_t*>(input.data());
1310 cstream_->avail_in = input.size();
1311 cstream_->next_out = output.data();
1312 cstream_->avail_out = output.size();
1314 input.uncheckedAdvance(input.size() - cstream_->avail_in);
1315 output.uncheckedAdvance(output.size() - cstream_->avail_out);
1317 lzma_ret const rc = lzmaThrowOnError(
1318 lzma_code(cstream_.get_pointer(), lzmaTranslateFlush(flushOp)));
1320 case StreamCodec::FlushOp::NONE:
1322 case StreamCodec::FlushOp::FLUSH:
1323 return cstream_->avail_in == 0 && cstream_->avail_out != 0;
1324 case StreamCodec::FlushOp::END:
1325 return rc == LZMA_STREAM_END;
1327 throw std::invalid_argument("LZMA2StreamCodec: invalid FlushOp");
1332 * Attempts to decode a varint from input.
1333 * The function advances input by the number of bytes read.
1335 * If there are too many bytes and the varint is not valid, throw a
1338 * If the uncompressed length was provided and a decoded varint does not match
1339 * the provided length, throw a runtime_error.
1341 * Returns true if the varint was successfully decoded and matches the
1342 * uncompressed length if provided, and false if more bytes are needed.
1344 bool LZMA2StreamCodec::decodeAndCheckVarint(ByteRange& input) {
1345 if (input.empty()) {
1348 size_t const numBytesToCopy =
1349 std::min(kMaxVarintLength64 - varintBufferPos_, input.size());
1350 memcpy(varintBuffer_.data() + varintBufferPos_, input.data(), numBytesToCopy);
1352 size_t const rangeSize = varintBufferPos_ + numBytesToCopy;
1353 ByteRange range{varintBuffer_.data(), rangeSize};
1354 auto const ret = tryDecodeVarint(range);
1356 if (ret.hasValue()) {
1357 size_t const varintSize = rangeSize - range.size();
1358 input.advance(varintSize - varintBufferPos_);
1359 if (uncompressedLength() && *uncompressedLength() != ret.value()) {
1360 throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1363 } else if (ret.error() == DecodeVarintError::TooManyBytes) {
1364 throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
1367 input.advance(numBytesToCopy);
1368 varintBufferPos_ += numBytesToCopy;
1373 bool LZMA2StreamCodec::doUncompressStream(
1375 MutableByteRange& output,
1376 StreamCodec::FlushOp flushOp) {
1380 needDecodeSize_ = encodeSize();
1383 varintBufferPos_ = 0;
1387 if (needDecodeSize_) {
1388 // Try decoding the varint. If the input does not contain the entire varint,
1389 // buffer the input. If the varint can not be decoded, fail.
1390 if (!decodeAndCheckVarint(input)) {
1393 needDecodeSize_ = false;
1396 dstream_->next_in = const_cast<uint8_t*>(input.data());
1397 dstream_->avail_in = input.size();
1398 dstream_->next_out = output.data();
1399 dstream_->avail_out = output.size();
1401 input.advance(input.size() - dstream_->avail_in);
1402 output.advance(output.size() - dstream_->avail_out);
1407 case StreamCodec::FlushOp::NONE:
1408 case StreamCodec::FlushOp::FLUSH:
1409 rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_RUN));
1411 case StreamCodec::FlushOp::END:
1412 rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_FINISH));
1415 throw std::invalid_argument("LZMA2StreamCodec: invalid flush");
1417 return rc == LZMA_STREAM_END;
1419 #endif // FOLLY_HAVE_LIBLZMA
1421 #ifdef FOLLY_HAVE_LIBZSTD
1424 void zstdFreeCStream(ZSTD_CStream* zcs) {
1425 ZSTD_freeCStream(zcs);
1428 void zstdFreeDStream(ZSTD_DStream* zds) {
1429 ZSTD_freeDStream(zds);
1436 class ZSTDStreamCodec final : public StreamCodec {
1438 static std::unique_ptr<Codec> createCodec(int level, CodecType);
1439 static std::unique_ptr<StreamCodec> createStream(int level, CodecType);
1440 explicit ZSTDStreamCodec(int level, CodecType type);
1442 std::vector<std::string> validPrefixes() const override;
1443 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1447 bool doNeedsUncompressedLength() const override;
1448 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1449 Optional<uint64_t> doGetUncompressedLength(
1451 Optional<uint64_t> uncompressedLength) const override;
1453 void doResetStream() override;
1454 bool doCompressStream(
1456 MutableByteRange& output,
1457 StreamCodec::FlushOp flushOp) override;
1458 bool doUncompressStream(
1460 MutableByteRange& output,
1461 StreamCodec::FlushOp flushOp) override;
1463 void resetCStream();
1464 void resetDStream();
1466 bool tryBlockCompress(ByteRange& input, MutableByteRange& output) const;
1467 bool tryBlockUncompress(ByteRange& input, MutableByteRange& output) const;
1470 bool needReset_{true};
1473 folly::static_function_deleter<ZSTD_CStream, &zstdFreeCStream>>
1477 folly::static_function_deleter<ZSTD_DStream, &zstdFreeDStream>>
1481 static constexpr uint32_t kZSTDMagicLE = 0xFD2FB528;
1483 std::vector<std::string> ZSTDStreamCodec::validPrefixes() const {
1484 return {prefixToStringLE(kZSTDMagicLE)};
1487 bool ZSTDStreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
1489 return dataStartsWithLE(data, kZSTDMagicLE);
1492 std::unique_ptr<Codec> ZSTDStreamCodec::createCodec(int level, CodecType type) {
1493 return make_unique<ZSTDStreamCodec>(level, type);
1496 std::unique_ptr<StreamCodec> ZSTDStreamCodec::createStream(
1499 return make_unique<ZSTDStreamCodec>(level, type);
1502 static int zstdConvertLevel(int level) {
1504 case COMPRESSION_LEVEL_FASTEST:
1506 case COMPRESSION_LEVEL_DEFAULT:
1508 case COMPRESSION_LEVEL_BEST:
1511 if (level < 1 || level > ZSTD_maxCLevel()) {
1512 throw std::invalid_argument(
1513 to<std::string>("ZSTD: invalid level: ", level));
1518 ZSTDStreamCodec::ZSTDStreamCodec(int level, CodecType type)
1519 : StreamCodec(type, zstdConvertLevel(level)),
1520 level_(zstdConvertLevel(level)) {
1521 DCHECK(type == CodecType::ZSTD);
1524 bool ZSTDStreamCodec::doNeedsUncompressedLength() const {
1528 uint64_t ZSTDStreamCodec::doMaxCompressedLength(
1529 uint64_t uncompressedLength) const {
1530 return ZSTD_compressBound(uncompressedLength);
1533 void zstdThrowIfError(size_t rc) {
1534 if (!ZSTD_isError(rc)) {
1537 throw std::runtime_error(
1538 to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1541 Optional<uint64_t> ZSTDStreamCodec::doGetUncompressedLength(
1543 Optional<uint64_t> uncompressedLength) const {
1544 // Read decompressed size from frame if available in first IOBuf.
1545 auto const decompressedSize =
1546 ZSTD_getDecompressedSize(data->data(), data->length());
1547 if (decompressedSize != 0) {
1548 if (uncompressedLength && *uncompressedLength != decompressedSize) {
1549 throw std::runtime_error("ZSTD: invalid uncompressed length");
1551 uncompressedLength = decompressedSize;
1553 return uncompressedLength;
1556 void ZSTDStreamCodec::doResetStream() {
1560 bool ZSTDStreamCodec::tryBlockCompress(
1562 MutableByteRange& output) const {
1564 // We need to know that we have enough output space to use block compression
1565 if (output.size() < ZSTD_compressBound(input.size())) {
1568 size_t const length = ZSTD_compress(
1569 output.data(), output.size(), input.data(), input.size(), level_);
1570 zstdThrowIfError(length);
1571 input.uncheckedAdvance(input.size());
1572 output.uncheckedAdvance(length);
1576 void ZSTDStreamCodec::resetCStream() {
1578 cstream_.reset(ZSTD_createCStream());
1580 throw std::bad_alloc{};
1583 // As of 1.3.2 ZSTD_initCStream_advanced() interprets content size 0 as
1584 // unknown if contentSizeFlag == 0, but this behavior is deprecated, and will
1585 // be removed in the future. Starting with version 1.3.2 start passing the
1586 // correct value, ZSTD_CONTENTSIZE_UNKNOWN.
1587 #if ZSTD_VERSION_NUMBER >= 10302
1588 constexpr uint64_t kZstdUnknownContentSize = ZSTD_CONTENTSIZE_UNKNOWN;
1590 constexpr uint64_t kZstdUnknownContentSize = 0;
1592 // Advanced API usage works for all supported versions of zstd.
1593 // Required to set contentSizeFlag.
1594 auto params = ZSTD_getParams(level_, uncompressedLength().value_or(0), 0);
1595 params.fParams.contentSizeFlag = uncompressedLength().hasValue();
1596 zstdThrowIfError(ZSTD_initCStream_advanced(
1601 uncompressedLength().value_or(kZstdUnknownContentSize)));
1604 bool ZSTDStreamCodec::doCompressStream(
1606 MutableByteRange& output,
1607 StreamCodec::FlushOp flushOp) {
1609 // If we are given all the input in one chunk try to use block compression
1610 if (flushOp == StreamCodec::FlushOp::END &&
1611 tryBlockCompress(input, output)) {
1617 ZSTD_inBuffer in = {input.data(), input.size(), 0};
1618 ZSTD_outBuffer out = {output.data(), output.size(), 0};
1620 input.uncheckedAdvance(in.pos);
1621 output.uncheckedAdvance(out.pos);
1623 if (flushOp == StreamCodec::FlushOp::NONE || !input.empty()) {
1624 zstdThrowIfError(ZSTD_compressStream(cstream_.get(), &out, &in));
1626 if (in.pos == in.size && flushOp != StreamCodec::FlushOp::NONE) {
1629 case StreamCodec::FlushOp::FLUSH:
1630 rc = ZSTD_flushStream(cstream_.get(), &out);
1632 case StreamCodec::FlushOp::END:
1633 rc = ZSTD_endStream(cstream_.get(), &out);
1636 throw std::invalid_argument("ZSTD: invalid FlushOp");
1638 zstdThrowIfError(rc);
1646 bool ZSTDStreamCodec::tryBlockUncompress(
1648 MutableByteRange& output) const {
1650 #if ZSTD_VERSION_NUMBER < 10104
1651 // We require ZSTD_findFrameCompressedSize() to perform this optimization.
1654 // We need to know the uncompressed length and have enough output space.
1655 if (!uncompressedLength() || output.size() < *uncompressedLength()) {
1658 size_t const compressedLength =
1659 ZSTD_findFrameCompressedSize(input.data(), input.size());
1660 zstdThrowIfError(compressedLength);
1661 size_t const length = ZSTD_decompress(
1662 output.data(), *uncompressedLength(), input.data(), compressedLength);
1663 zstdThrowIfError(length);
1664 if (length != *uncompressedLength()) {
1665 throw std::runtime_error("ZSTDStreamCodec: Incorrect uncompressed length");
1667 input.uncheckedAdvance(compressedLength);
1668 output.uncheckedAdvance(length);
1673 void ZSTDStreamCodec::resetDStream() {
1675 dstream_.reset(ZSTD_createDStream());
1677 throw std::bad_alloc{};
1680 zstdThrowIfError(ZSTD_initDStream(dstream_.get()));
1683 bool ZSTDStreamCodec::doUncompressStream(
1685 MutableByteRange& output,
1686 StreamCodec::FlushOp flushOp) {
1688 // If we are given all the input in one chunk try to use block uncompression
1689 if (flushOp == StreamCodec::FlushOp::END &&
1690 tryBlockUncompress(input, output)) {
1696 ZSTD_inBuffer in = {input.data(), input.size(), 0};
1697 ZSTD_outBuffer out = {output.data(), output.size(), 0};
1699 input.uncheckedAdvance(in.pos);
1700 output.uncheckedAdvance(out.pos);
1702 size_t const rc = ZSTD_decompressStream(dstream_.get(), &out, &in);
1703 zstdThrowIfError(rc);
1707 #endif // FOLLY_HAVE_LIBZSTD
1709 #if FOLLY_HAVE_LIBBZ2
1711 class Bzip2Codec final : public Codec {
1713 static std::unique_ptr<Codec> create(int level, CodecType type);
1714 explicit Bzip2Codec(int level, CodecType type);
1716 std::vector<std::string> validPrefixes() const override;
1717 bool canUncompress(IOBuf const* data, Optional<uint64_t> uncompressedLength)
1721 uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
1722 std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
1723 std::unique_ptr<IOBuf> doUncompress(
1725 Optional<uint64_t> uncompressedLength) override;
1730 /* static */ std::unique_ptr<Codec> Bzip2Codec::create(
1733 return std::make_unique<Bzip2Codec>(level, type);
1736 Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
1737 DCHECK(type == CodecType::BZIP2);
1739 case COMPRESSION_LEVEL_FASTEST:
1742 case COMPRESSION_LEVEL_DEFAULT:
1745 case COMPRESSION_LEVEL_BEST:
1749 if (level < 1 || level > 9) {
1750 throw std::invalid_argument(
1751 to<std::string>("Bzip2: invalid level: ", level));
1756 static uint32_t constexpr kBzip2MagicLE = 0x685a42;
1757 static uint64_t constexpr kBzip2MagicBytes = 3;
1759 std::vector<std::string> Bzip2Codec::validPrefixes() const {
1760 return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
1763 bool Bzip2Codec::canUncompress(IOBuf const* data, Optional<uint64_t>) const {
1764 return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
1767 uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
1768 // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
1769 // To guarantee that the compressed data will fit in its buffer, allocate an
1770 // output buffer of size 1% larger than the uncompressed data, plus six
1771 // hundred extra bytes.
1772 return uncompressedLength + uncompressedLength / 100 + 600;
1775 static bz_stream createBzStream() {
1777 stream.bzalloc = nullptr;
1778 stream.bzfree = nullptr;
1779 stream.opaque = nullptr;
1780 stream.next_in = stream.next_out = nullptr;
1781 stream.avail_in = stream.avail_out = 0;
1785 // Throws on error condition, otherwise returns the code.
1786 static int bzCheck(int const rc) {
1795 throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
1799 static std::unique_ptr<IOBuf> addOutputBuffer(
1801 uint64_t const bufferLength) {
1802 DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
1803 DCHECK_EQ(stream->avail_out, 0);
1805 auto buf = IOBuf::create(bufferLength);
1806 buf->append(buf->capacity());
1808 stream->next_out = reinterpret_cast<char*>(buf->writableData());
1809 stream->avail_out = buf->length();
1814 std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
1815 bz_stream stream = createBzStream();
1816 bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
1818 bzCheck(BZ2_bzCompressEnd(&stream));
1821 uint64_t const uncompressedLength = data->computeChainDataLength();
1822 uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
1823 uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1824 uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
1826 auto out = addOutputBuffer(
1828 maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
1829 : kDefaultBufferLength);
1831 for (auto range : *data) {
1832 while (!range.empty()) {
1833 auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1835 const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1836 stream.avail_in = inSize;
1838 if (stream.avail_out == 0) {
1839 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1842 bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
1843 range.uncheckedAdvance(inSize - stream.avail_in);
1847 if (stream.avail_out == 0) {
1848 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1850 } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
1852 out->prev()->trimEnd(stream.avail_out);
1857 std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
1859 Optional<uint64_t> uncompressedLength) {
1860 bz_stream stream = createBzStream();
1861 bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
1863 bzCheck(BZ2_bzDecompressEnd(&stream));
1866 uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
1867 uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
1868 uint64_t const kDefaultBufferLength =
1869 computeBufferLength(data->computeChainDataLength(), kBlockSize);
1871 auto out = addOutputBuffer(
1873 ((uncompressedLength && *uncompressedLength <= kMaxSingleStepLength)
1874 ? *uncompressedLength
1875 : kDefaultBufferLength));
1878 for (auto range : *data) {
1879 while (!range.empty()) {
1880 auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
1882 const_cast<char*>(reinterpret_cast<char const*>(range.data()));
1883 stream.avail_in = inSize;
1885 if (stream.avail_out == 0) {
1886 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1889 rc = bzCheck(BZ2_bzDecompress(&stream));
1890 range.uncheckedAdvance(inSize - stream.avail_in);
1893 while (rc != BZ_STREAM_END) {
1894 if (stream.avail_out == 0) {
1895 out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
1897 size_t const outputSize = stream.avail_out;
1898 rc = bzCheck(BZ2_bzDecompress(&stream));
1899 if (outputSize == stream.avail_out) {
1900 throw std::runtime_error("Bzip2Codec: Truncated input");
1904 out->prev()->trimEnd(stream.avail_out);
1906 uint64_t const totalOut =
1907 (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
1908 if (uncompressedLength && uncompressedLength != totalOut) {
1909 throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
1915 #endif // FOLLY_HAVE_LIBBZ2
1919 zlib::Options getZlibOptions(CodecType type) {
1920 DCHECK(type == CodecType::GZIP || type == CodecType::ZLIB);
1921 return type == CodecType::GZIP ? zlib::defaultGzipOptions()
1922 : zlib::defaultZlibOptions();
1925 std::unique_ptr<Codec> getZlibCodec(int level, CodecType type) {
1926 return zlib::getCodec(getZlibOptions(type), level);
1929 std::unique_ptr<StreamCodec> getZlibStreamCodec(int level, CodecType type) {
1930 return zlib::getStreamCodec(getZlibOptions(type), level);
1933 #endif // FOLLY_HAVE_LIBZ
1936 * Automatic decompression
1938 class AutomaticCodec final : public Codec {
1940 static std::unique_ptr<Codec> create(
1941 std::vector<std::unique_ptr<Codec>> customCodecs,
1942 std::unique_ptr<Codec> terminalCodec);
1943 explicit AutomaticCodec(
1944 std::vector<std::unique_ptr<Codec>> customCodecs,
1945 std::unique_ptr<Codec> terminalCodec);
1947 std::vector<std::string> validPrefixes() const override;
1948 bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
1952 bool doNeedsUncompressedLength() const override;
1953 uint64_t doMaxUncompressedLength() const override;
1955 uint64_t doMaxCompressedLength(uint64_t) const override {
1956 throw std::runtime_error(
1957 "AutomaticCodec error: maxCompressedLength() not supported.");
1959 std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
1960 throw std::runtime_error("AutomaticCodec error: compress() not supported.");
1962 std::unique_ptr<IOBuf> doUncompress(
1964 Optional<uint64_t> uncompressedLength) override;
1966 void addCodecIfSupported(CodecType type);
1968 // Throws iff the codecs aren't compatible (very slow)
1969 void checkCompatibleCodecs() const;
1971 std::vector<std::unique_ptr<Codec>> codecs_;
1972 std::unique_ptr<Codec> terminalCodec_;
1973 bool needsUncompressedLength_;
1974 uint64_t maxUncompressedLength_;
1977 std::vector<std::string> AutomaticCodec::validPrefixes() const {
1978 std::unordered_set<std::string> prefixes;
1979 for (const auto& codec : codecs_) {
1980 const auto codecPrefixes = codec->validPrefixes();
1981 prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
1983 return std::vector<std::string>{prefixes.begin(), prefixes.end()};
1986 bool AutomaticCodec::canUncompress(
1988 Optional<uint64_t> uncompressedLength) const {
1992 [data, uncompressedLength](std::unique_ptr<Codec> const& codec) {
1993 return codec->canUncompress(data, uncompressedLength);
1997 void AutomaticCodec::addCodecIfSupported(CodecType type) {
1998 const bool present = std::any_of(
2001 [&type](std::unique_ptr<Codec> const& codec) {
2002 return codec->type() == type;
2004 bool const isTerminalType = terminalCodec_ && terminalCodec_->type() == type;
2005 if (hasCodec(type) && !present && !isTerminalType) {
2006 codecs_.push_back(getCodec(type));
2010 /* static */ std::unique_ptr<Codec> AutomaticCodec::create(
2011 std::vector<std::unique_ptr<Codec>> customCodecs,
2012 std::unique_ptr<Codec> terminalCodec) {
2013 return std::make_unique<AutomaticCodec>(
2014 std::move(customCodecs), std::move(terminalCodec));
2017 AutomaticCodec::AutomaticCodec(
2018 std::vector<std::unique_ptr<Codec>> customCodecs,
2019 std::unique_ptr<Codec> terminalCodec)
2020 : Codec(CodecType::USER_DEFINED, folly::none, "auto"),
2021 codecs_(std::move(customCodecs)),
2022 terminalCodec_(std::move(terminalCodec)) {
2023 // Fastest -> slowest
2024 std::array<CodecType, 6> defaultTypes{{
2025 CodecType::LZ4_FRAME,
2033 for (auto type : defaultTypes) {
2034 addCodecIfSupported(type);
2038 checkCompatibleCodecs();
2041 // Check that none of the codecs are null
2042 DCHECK(std::none_of(
2043 codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
2044 return codec == nullptr;
2047 // Check that the terminal codec's type is not duplicated (with the exception
2048 // of USER_DEFINED).
2049 if (terminalCodec_) {
2050 DCHECK(std::none_of(
2053 [&](std::unique_ptr<Codec> const& codec) {
2054 return codec->type() != CodecType::USER_DEFINED &&
2055 codec->type() == terminalCodec_->type();
2059 bool const terminalNeedsUncompressedLength =
2060 terminalCodec_ && terminalCodec_->needsUncompressedLength();
2061 needsUncompressedLength_ = std::any_of(
2064 [](std::unique_ptr<Codec> const& codec) {
2065 return codec->needsUncompressedLength();
2067 terminalNeedsUncompressedLength;
2069 const auto it = std::max_element(
2072 [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) {
2073 return lhs->maxUncompressedLength() < rhs->maxUncompressedLength();
2075 DCHECK(it != codecs_.end());
2076 auto const terminalMaxUncompressedLength =
2077 terminalCodec_ ? terminalCodec_->maxUncompressedLength() : 0;
2078 maxUncompressedLength_ =
2079 std::max((*it)->maxUncompressedLength(), terminalMaxUncompressedLength);
2082 void AutomaticCodec::checkCompatibleCodecs() const {
2083 // Keep track of all the possible headers.
2084 std::unordered_set<std::string> headers;
2085 // The empty header is not allowed.
2088 // Construct a set of headers and check that none of the headers occur twice.
2089 // Eliminate edge cases.
2090 for (auto&& codec : codecs_) {
2091 const auto codecHeaders = codec->validPrefixes();
2092 // Codecs without any valid headers are not allowed.
2093 if (codecHeaders.empty()) {
2094 throw std::invalid_argument{
2095 "AutomaticCodec: validPrefixes() must not be empty."};
2097 // Insert all the headers for the current codec.
2098 const size_t beforeSize = headers.size();
2099 headers.insert(codecHeaders.begin(), codecHeaders.end());
2100 // Codecs are not compatible if any header occurred twice.
2101 if (beforeSize + codecHeaders.size() != headers.size()) {
2102 throw std::invalid_argument{
2103 "AutomaticCodec: Two valid prefixes collide."};
2107 // Check if any strict non-empty prefix of any header is a header.
2108 for (const auto& header : headers) {
2109 for (size_t i = 1; i < header.size(); ++i) {
2110 if (headers.count(header.substr(0, i))) {
2111 throw std::invalid_argument{
2112 "AutomaticCodec: One valid prefix is a prefix of another valid "
2119 bool AutomaticCodec::doNeedsUncompressedLength() const {
2120 return needsUncompressedLength_;
2123 uint64_t AutomaticCodec::doMaxUncompressedLength() const {
2124 return maxUncompressedLength_;
2127 std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
2129 Optional<uint64_t> uncompressedLength) {
2131 for (auto&& codec : codecs_) {
2132 if (codec->canUncompress(data, uncompressedLength)) {
2133 return codec->uncompress(data, uncompressedLength);
2136 } catch (std::exception const& e) {
2137 if (!terminalCodec_) {
2142 // Try terminal codec
2143 if (terminalCodec_) {
2144 return terminalCodec_->uncompress(data, uncompressedLength);
2147 throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
2150 using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
2151 using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
2154 StreamCodecFactory stream;
2158 codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
2160 {NoCompressionCodec::create, nullptr},
2162 #if FOLLY_HAVE_LIBLZ4
2163 {LZ4Codec::create, nullptr},
2168 #if FOLLY_HAVE_LIBSNAPPY
2169 {SnappyCodec::create, nullptr},
2175 {getZlibCodec, getZlibStreamCodec},
2180 #if FOLLY_HAVE_LIBLZ4
2181 {LZ4Codec::create, nullptr},
2186 #if FOLLY_HAVE_LIBLZMA
2187 {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2188 {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
2194 #if FOLLY_HAVE_LIBZSTD
2195 {ZSTDStreamCodec::createCodec, ZSTDStreamCodec::createStream},
2201 {getZlibCodec, getZlibStreamCodec},
2206 #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
2207 {LZ4FrameCodec::create, nullptr},
2212 #if FOLLY_HAVE_LIBBZ2
2213 {Bzip2Codec::create, nullptr},
2219 Factory const& getFactory(CodecType type) {
2220 size_t const idx = static_cast<size_t>(type);
2221 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
2222 throw std::invalid_argument(
2223 to<std::string>("Compression type ", idx, " invalid"));
2225 return codecFactories[idx];
2229 bool hasCodec(CodecType type) {
2230 return getFactory(type).codec != nullptr;
2233 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
2234 auto const factory = getFactory(type).codec;
2236 throw std::invalid_argument(
2237 to<std::string>("Compression type ", type, " not supported"));
2239 auto codec = (*factory)(level, type);
2240 DCHECK(codec->type() == type);
2244 bool hasStreamCodec(CodecType type) {
2245 return getFactory(type).stream != nullptr;
2248 std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
2249 auto const factory = getFactory(type).stream;
2251 throw std::invalid_argument(
2252 to<std::string>("Compression type ", type, " not supported"));
2254 auto codec = (*factory)(level, type);
2255 DCHECK(codec->type() == type);
2259 std::unique_ptr<Codec> getAutoUncompressionCodec(
2260 std::vector<std::unique_ptr<Codec>> customCodecs,
2261 std::unique_ptr<Codec> terminalCodec) {
2262 return AutomaticCodec::create(
2263 std::move(customCodecs), std::move(terminalCodec));
2266 } // namespace folly