2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/io/Compression.h>
24 #include <glog/logging.h>
26 #if FOLLY_HAVE_LIBSNAPPY
28 #include <snappy-sinksource.h>
35 #if FOLLY_HAVE_LIBLZMA
39 #if FOLLY_HAVE_LIBZSTD
43 #include <folly/Conv.h>
44 #include <folly/Memory.h>
45 #include <folly/Portability.h>
46 #include <folly/ScopeGuard.h>
47 #include <folly/Varint.h>
48 #include <folly/io/Cursor.h>
50 namespace folly { namespace io {
52 Codec::Codec(CodecType type) : type_(type) { }
54 // Ensure consistent behavior in the nullptr case
55 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
56 uint64_t len = data->computeChainDataLength();
58 return IOBuf::create(0);
60 if (len > maxUncompressedLength()) {
61 throw std::runtime_error("Codec: uncompressed length too large");
64 return doCompress(data);
67 std::string Codec::compress(const StringPiece data) {
68 const uint64_t len = data.size();
72 if (len > maxUncompressedLength()) {
73 throw std::runtime_error("Codec: uncompressed length too large");
76 return doCompressString(data);
79 std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
80 uint64_t uncompressedLength) {
81 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
82 if (needsUncompressedLength()) {
83 throw std::invalid_argument("Codec: uncompressed length required");
85 } else if (uncompressedLength > maxUncompressedLength()) {
86 throw std::runtime_error("Codec: uncompressed length too large");
90 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
91 uncompressedLength != 0) {
92 throw std::runtime_error("Codec: invalid uncompressed length");
94 return IOBuf::create(0);
97 return doUncompress(data, uncompressedLength);
100 std::string Codec::uncompress(
101 const StringPiece data,
102 uint64_t uncompressedLength) {
103 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
104 if (needsUncompressedLength()) {
105 throw std::invalid_argument("Codec: uncompressed length required");
107 } else if (uncompressedLength > maxUncompressedLength()) {
108 throw std::runtime_error("Codec: uncompressed length too large");
112 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
113 uncompressedLength != 0) {
114 throw std::runtime_error("Codec: invalid uncompressed length");
119 return doUncompressString(data, uncompressedLength);
122 bool Codec::needsUncompressedLength() const {
123 return doNeedsUncompressedLength();
126 uint64_t Codec::maxUncompressedLength() const {
127 return doMaxUncompressedLength();
130 bool Codec::doNeedsUncompressedLength() const {
134 uint64_t Codec::doMaxUncompressedLength() const {
135 return UNLIMITED_UNCOMPRESSED_LENGTH;
138 std::string Codec::doCompressString(const StringPiece data) {
139 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
140 auto outputBuffer = doCompress(&inputBuffer);
142 output.reserve(outputBuffer->computeChainDataLength());
143 for (auto range : *outputBuffer) {
144 output.append(reinterpret_cast<const char*>(range.data()), range.size());
149 std::string Codec::doUncompressString(
150 const StringPiece data,
151 uint64_t uncompressedLength) {
152 const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
153 auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
155 output.reserve(outputBuffer->computeChainDataLength());
156 for (auto range : *outputBuffer) {
157 output.append(reinterpret_cast<const char*>(range.data()), range.size());
167 class NoCompressionCodec final : public Codec {
169 static std::unique_ptr<Codec> create(int level, CodecType type);
170 explicit NoCompressionCodec(int level, CodecType type);
173 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
174 std::unique_ptr<IOBuf> doUncompress(
176 uint64_t uncompressedLength) override;
179 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
180 return make_unique<NoCompressionCodec>(level, type);
183 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
185 DCHECK(type == CodecType::NO_COMPRESSION);
187 case COMPRESSION_LEVEL_DEFAULT:
188 case COMPRESSION_LEVEL_FASTEST:
189 case COMPRESSION_LEVEL_BEST:
193 throw std::invalid_argument(to<std::string>(
194 "NoCompressionCodec: invalid level ", level));
198 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
200 return data->clone();
203 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
205 uint64_t uncompressedLength) {
206 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
207 data->computeChainDataLength() != uncompressedLength) {
208 throw std::runtime_error(to<std::string>(
209 "NoCompressionCodec: invalid uncompressed length"));
211 return data->clone();
214 #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
218 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
219 DCHECK_GE(out->tailroom(), kMaxVarintLength64);
220 out->append(encodeVarint(val, out->writableTail()));
223 inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
226 for (int shift = 0; shift <= 63; shift += 7) {
227 b = cursor.read<int8_t>();
228 val |= static_cast<uint64_t>(b & 0x7f) << shift;
234 throw std::invalid_argument("Invalid varint value. Too big.");
241 #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
243 #if FOLLY_HAVE_LIBLZ4
248 class LZ4Codec final : public Codec {
250 static std::unique_ptr<Codec> create(int level, CodecType type);
251 explicit LZ4Codec(int level, CodecType type);
254 bool doNeedsUncompressedLength() const override;
255 uint64_t doMaxUncompressedLength() const override;
257 bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
259 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
260 std::unique_ptr<IOBuf> doUncompress(
262 uint64_t uncompressedLength) override;
264 bool highCompression_;
267 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
268 return make_unique<LZ4Codec>(level, type);
271 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
272 DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
275 case COMPRESSION_LEVEL_FASTEST:
276 case COMPRESSION_LEVEL_DEFAULT:
279 case COMPRESSION_LEVEL_BEST:
283 if (level < 1 || level > 2) {
284 throw std::invalid_argument(to<std::string>(
285 "LZ4Codec: invalid level: ", level));
287 highCompression_ = (level > 1);
290 bool LZ4Codec::doNeedsUncompressedLength() const {
291 return !encodeSize();
294 // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
295 // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
297 #ifndef LZ4_MAX_INPUT_SIZE
298 # define LZ4_MAX_INPUT_SIZE 0x7E000000
301 uint64_t LZ4Codec::doMaxUncompressedLength() const {
302 return LZ4_MAX_INPUT_SIZE;
305 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
306 std::unique_ptr<IOBuf> clone;
307 if (data->isChained()) {
308 // LZ4 doesn't support streaming, so we have to coalesce
309 clone = data->clone();
314 uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
315 auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
317 encodeVarintToIOBuf(data->length(), out.get());
321 auto input = reinterpret_cast<const char*>(data->data());
322 auto output = reinterpret_cast<char*>(out->writableTail());
323 const auto inputLength = data->length();
324 #if LZ4_VERSION_NUMBER >= 10700
325 if (highCompression_) {
326 n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
328 n = LZ4_compress_default(input, output, inputLength, out->tailroom());
331 if (highCompression_) {
332 n = LZ4_compressHC(input, output, inputLength);
334 n = LZ4_compress(input, output, inputLength);
339 CHECK_LE(n, out->capacity());
345 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
347 uint64_t uncompressedLength) {
348 std::unique_ptr<IOBuf> clone;
349 if (data->isChained()) {
350 // LZ4 doesn't support streaming, so we have to coalesce
351 clone = data->clone();
356 folly::io::Cursor cursor(data);
357 uint64_t actualUncompressedLength;
359 actualUncompressedLength = decodeVarintFromCursor(cursor);
360 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
361 uncompressedLength != actualUncompressedLength) {
362 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
365 actualUncompressedLength = uncompressedLength;
366 if (actualUncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH ||
367 actualUncompressedLength > maxUncompressedLength()) {
368 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
372 auto sp = StringPiece{cursor.peekBytes()};
373 auto out = IOBuf::create(actualUncompressedLength);
374 int n = LZ4_decompress_safe(
376 reinterpret_cast<char*>(out->writableTail()),
378 actualUncompressedLength);
380 if (n < 0 || uint64_t(n) != actualUncompressedLength) {
381 throw std::runtime_error(to<std::string>(
382 "LZ4 decompression returned invalid value ", n));
384 out->append(actualUncompressedLength);
388 #endif // FOLLY_HAVE_LIBLZ4
390 #if FOLLY_HAVE_LIBSNAPPY
397 * Implementation of snappy::Source that reads from a IOBuf chain.
399 class IOBufSnappySource final : public snappy::Source {
401 explicit IOBufSnappySource(const IOBuf* data);
402 size_t Available() const override;
403 const char* Peek(size_t* len) override;
404 void Skip(size_t n) override;
410 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
411 : available_(data->computeChainDataLength()),
415 size_t IOBufSnappySource::Available() const {
419 const char* IOBufSnappySource::Peek(size_t* len) {
420 auto sp = StringPiece{cursor_.peekBytes()};
425 void IOBufSnappySource::Skip(size_t n) {
426 CHECK_LE(n, available_);
431 class SnappyCodec final : public Codec {
433 static std::unique_ptr<Codec> create(int level, CodecType type);
434 explicit SnappyCodec(int level, CodecType type);
437 uint64_t doMaxUncompressedLength() const override;
438 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
439 std::unique_ptr<IOBuf> doUncompress(
441 uint64_t uncompressedLength) override;
444 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
445 return make_unique<SnappyCodec>(level, type);
448 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
449 DCHECK(type == CodecType::SNAPPY);
451 case COMPRESSION_LEVEL_FASTEST:
452 case COMPRESSION_LEVEL_DEFAULT:
453 case COMPRESSION_LEVEL_BEST:
457 throw std::invalid_argument(to<std::string>(
458 "SnappyCodec: invalid level: ", level));
462 uint64_t SnappyCodec::doMaxUncompressedLength() const {
463 // snappy.h uses uint32_t for lengths, so there's that.
464 return std::numeric_limits<uint32_t>::max();
467 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
468 IOBufSnappySource source(data);
470 IOBuf::create(snappy::MaxCompressedLength(source.Available()));
472 snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
473 out->writableTail()));
475 size_t n = snappy::Compress(&source, &sink);
477 CHECK_LE(n, out->capacity());
482 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
483 uint64_t uncompressedLength) {
484 uint32_t actualUncompressedLength = 0;
487 IOBufSnappySource source(data);
488 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
489 throw std::runtime_error("snappy::GetUncompressedLength failed");
491 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
492 uncompressedLength != actualUncompressedLength) {
493 throw std::runtime_error("snappy: invalid uncompressed length");
497 auto out = IOBuf::create(actualUncompressedLength);
500 IOBufSnappySource source(data);
501 if (!snappy::RawUncompress(&source,
502 reinterpret_cast<char*>(out->writableTail()))) {
503 throw std::runtime_error("snappy::RawUncompress failed");
507 out->append(actualUncompressedLength);
511 #endif // FOLLY_HAVE_LIBSNAPPY
517 class ZlibCodec final : public Codec {
519 static std::unique_ptr<Codec> create(int level, CodecType type);
520 explicit ZlibCodec(int level, CodecType type);
523 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
524 std::unique_ptr<IOBuf> doUncompress(
526 uint64_t uncompressedLength) override;
528 std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
529 bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
534 std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
535 return make_unique<ZlibCodec>(level, type);
538 ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
539 DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP);
541 case COMPRESSION_LEVEL_FASTEST:
544 case COMPRESSION_LEVEL_DEFAULT:
545 level = Z_DEFAULT_COMPRESSION;
547 case COMPRESSION_LEVEL_BEST:
551 if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
552 throw std::invalid_argument(to<std::string>(
553 "ZlibCodec: invalid level: ", level));
558 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
560 CHECK_EQ(stream->avail_out, 0);
562 auto buf = IOBuf::create(length);
565 stream->next_out = buf->writableData();
566 stream->avail_out = buf->length();
571 bool ZlibCodec::doInflate(z_stream* stream,
573 uint32_t bufferLength) {
574 if (stream->avail_out == 0) {
575 head->prependChain(addOutputBuffer(stream, bufferLength));
578 int rc = inflate(stream, Z_NO_FLUSH);
589 throw std::runtime_error(to<std::string>(
590 "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
592 CHECK(false) << rc << ": " << stream->msg;
598 std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
600 stream.zalloc = nullptr;
601 stream.zfree = nullptr;
602 stream.opaque = nullptr;
604 // Using deflateInit2() to support gzip. "The windowBits parameter is the
605 // base two logarithm of the maximum window size (...) The default value is
606 // 15 (...) Add 16 to windowBits to write a simple gzip header and trailer
607 // around the compressed data instead of a zlib wrapper. The gzip header
608 // will have no file name, no extra data, no comment, no modification time
609 // (set to zero), no header crc, and the operating system will be set to 255
611 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
612 // All other parameters (method, memLevel, strategy) get default values from
614 int rc = deflateInit2(&stream,
621 throw std::runtime_error(to<std::string>(
622 "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
625 stream.next_in = stream.next_out = nullptr;
626 stream.avail_in = stream.avail_out = 0;
627 stream.total_in = stream.total_out = 0;
629 bool success = false;
632 rc = deflateEnd(&stream);
633 // If we're here because of an exception, it's okay if some data
635 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
636 << rc << ": " << stream.msg;
639 uint64_t uncompressedLength = data->computeChainDataLength();
640 uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
642 // Max 64MiB in one go
643 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
644 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
646 auto out = addOutputBuffer(
648 (maxCompressedLength <= maxSingleStepLength ?
649 maxCompressedLength :
650 defaultBufferLength));
652 for (auto& range : *data) {
653 uint64_t remaining = range.size();
654 uint64_t written = 0;
656 uint32_t step = (remaining > maxSingleStepLength ?
657 maxSingleStepLength : remaining);
658 stream.next_in = const_cast<uint8_t*>(range.data() + written);
659 stream.avail_in = step;
663 while (stream.avail_in != 0) {
664 if (stream.avail_out == 0) {
665 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
668 rc = deflate(&stream, Z_NO_FLUSH);
670 CHECK_EQ(rc, Z_OK) << stream.msg;
676 if (stream.avail_out == 0) {
677 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
680 rc = deflate(&stream, Z_FINISH);
681 } while (rc == Z_OK);
683 CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
685 out->prev()->trimEnd(stream.avail_out);
687 success = true; // we survived
692 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
693 uint64_t uncompressedLength) {
695 stream.zalloc = nullptr;
696 stream.zfree = nullptr;
697 stream.opaque = nullptr;
699 // "The windowBits parameter is the base two logarithm of the maximum window
700 // size (...) The default value is 15 (...) add 16 to decode only the gzip
701 // format (the zlib format will return a Z_DATA_ERROR)."
702 int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
703 int rc = inflateInit2(&stream, windowBits);
705 throw std::runtime_error(to<std::string>(
706 "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
709 stream.next_in = stream.next_out = nullptr;
710 stream.avail_in = stream.avail_out = 0;
711 stream.total_in = stream.total_out = 0;
713 bool success = false;
716 rc = inflateEnd(&stream);
717 // If we're here because of an exception, it's okay if some data
719 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
720 << rc << ": " << stream.msg;
723 // Max 64MiB in one go
724 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
725 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
727 auto out = addOutputBuffer(
729 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
730 uncompressedLength <= maxSingleStepLength) ?
732 defaultBufferLength));
734 bool streamEnd = false;
735 for (auto& range : *data) {
740 stream.next_in = const_cast<uint8_t*>(range.data());
741 stream.avail_in = range.size();
743 while (stream.avail_in != 0) {
745 throw std::runtime_error(to<std::string>(
746 "ZlibCodec: junk after end of data"));
749 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
754 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
757 out->prev()->trimEnd(stream.avail_out);
759 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
760 uncompressedLength != stream.total_out) {
761 throw std::runtime_error(to<std::string>(
762 "ZlibCodec: invalid uncompressed length"));
765 success = true; // we survived
770 #endif // FOLLY_HAVE_LIBZ
772 #if FOLLY_HAVE_LIBLZMA
777 class LZMA2Codec final : public Codec {
779 static std::unique_ptr<Codec> create(int level, CodecType type);
780 explicit LZMA2Codec(int level, CodecType type);
783 bool doNeedsUncompressedLength() const override;
784 uint64_t doMaxUncompressedLength() const override;
786 bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; }
788 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
789 std::unique_ptr<IOBuf> doUncompress(
791 uint64_t uncompressedLength) override;
793 std::unique_ptr<IOBuf> addOutputBuffer(lzma_stream* stream, size_t length);
794 bool doInflate(lzma_stream* stream, IOBuf* head, size_t bufferLength);
799 std::unique_ptr<Codec> LZMA2Codec::create(int level, CodecType type) {
800 return make_unique<LZMA2Codec>(level, type);
803 LZMA2Codec::LZMA2Codec(int level, CodecType type) : Codec(type) {
804 DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
806 case COMPRESSION_LEVEL_FASTEST:
809 case COMPRESSION_LEVEL_DEFAULT:
810 level = LZMA_PRESET_DEFAULT;
812 case COMPRESSION_LEVEL_BEST:
816 if (level < 0 || level > 9) {
817 throw std::invalid_argument(to<std::string>(
818 "LZMA2Codec: invalid level: ", level));
823 bool LZMA2Codec::doNeedsUncompressedLength() const {
824 return !encodeSize();
827 uint64_t LZMA2Codec::doMaxUncompressedLength() const {
828 // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
829 return uint64_t(1) << 63;
832 std::unique_ptr<IOBuf> LZMA2Codec::addOutputBuffer(
836 CHECK_EQ(stream->avail_out, 0);
838 auto buf = IOBuf::create(length);
841 stream->next_out = buf->writableData();
842 stream->avail_out = buf->length();
847 std::unique_ptr<IOBuf> LZMA2Codec::doCompress(const IOBuf* data) {
849 lzma_stream stream = LZMA_STREAM_INIT;
851 rc = lzma_easy_encoder(&stream, level_, LZMA_CHECK_NONE);
853 throw std::runtime_error(folly::to<std::string>(
854 "LZMA2Codec: lzma_easy_encoder error: ", rc));
857 SCOPE_EXIT { lzma_end(&stream); };
859 uint64_t uncompressedLength = data->computeChainDataLength();
860 uint64_t maxCompressedLength = lzma_stream_buffer_bound(uncompressedLength);
862 // Max 64MiB in one go
863 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
864 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
866 auto out = addOutputBuffer(
868 (maxCompressedLength <= maxSingleStepLength ?
869 maxCompressedLength :
870 defaultBufferLength));
873 auto size = IOBuf::createCombined(kMaxVarintLength64);
874 encodeVarintToIOBuf(uncompressedLength, size.get());
875 size->appendChain(std::move(out));
876 out = std::move(size);
879 for (auto& range : *data) {
884 stream.next_in = const_cast<uint8_t*>(range.data());
885 stream.avail_in = range.size();
887 while (stream.avail_in != 0) {
888 if (stream.avail_out == 0) {
889 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
892 rc = lzma_code(&stream, LZMA_RUN);
895 throw std::runtime_error(folly::to<std::string>(
896 "LZMA2Codec: lzma_code error: ", rc));
902 if (stream.avail_out == 0) {
903 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
906 rc = lzma_code(&stream, LZMA_FINISH);
907 } while (rc == LZMA_OK);
909 if (rc != LZMA_STREAM_END) {
910 throw std::runtime_error(folly::to<std::string>(
911 "LZMA2Codec: lzma_code ended with error: ", rc));
914 out->prev()->trimEnd(stream.avail_out);
919 bool LZMA2Codec::doInflate(lzma_stream* stream,
921 size_t bufferLength) {
922 if (stream->avail_out == 0) {
923 head->prependChain(addOutputBuffer(stream, bufferLength));
926 lzma_ret rc = lzma_code(stream, LZMA_RUN);
931 case LZMA_STREAM_END:
934 throw std::runtime_error(to<std::string>(
935 "LZMA2Codec: lzma_code error: ", rc));
941 std::unique_ptr<IOBuf> LZMA2Codec::doUncompress(const IOBuf* data,
942 uint64_t uncompressedLength) {
944 lzma_stream stream = LZMA_STREAM_INIT;
946 rc = lzma_auto_decoder(&stream, std::numeric_limits<uint64_t>::max(), 0);
948 throw std::runtime_error(folly::to<std::string>(
949 "LZMA2Codec: lzma_auto_decoder error: ", rc));
952 SCOPE_EXIT { lzma_end(&stream); };
954 // Max 64MiB in one go
955 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
956 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
958 folly::io::Cursor cursor(data);
959 uint64_t actualUncompressedLength;
961 actualUncompressedLength = decodeVarintFromCursor(cursor);
962 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
963 uncompressedLength != actualUncompressedLength) {
964 throw std::runtime_error("LZMA2Codec: invalid uncompressed length");
967 actualUncompressedLength = uncompressedLength;
968 DCHECK_NE(actualUncompressedLength, UNKNOWN_UNCOMPRESSED_LENGTH);
971 auto out = addOutputBuffer(
973 (actualUncompressedLength <= maxSingleStepLength ?
974 actualUncompressedLength :
975 defaultBufferLength));
977 bool streamEnd = false;
978 auto buf = cursor.peekBytes();
979 while (!buf.empty()) {
980 stream.next_in = const_cast<uint8_t*>(buf.data());
981 stream.avail_in = buf.size();
983 while (stream.avail_in != 0) {
985 throw std::runtime_error(to<std::string>(
986 "LZMA2Codec: junk after end of data"));
989 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
992 cursor.skip(buf.size());
993 buf = cursor.peekBytes();
997 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
1000 out->prev()->trimEnd(stream.avail_out);
1002 if (actualUncompressedLength != stream.total_out) {
1003 throw std::runtime_error(to<std::string>(
1004 "LZMA2Codec: invalid uncompressed length"));
1010 #endif // FOLLY_HAVE_LIBLZMA
1012 #ifdef FOLLY_HAVE_LIBZSTD
1017 class ZSTDCodec final : public Codec {
1019 static std::unique_ptr<Codec> create(int level, CodecType);
1020 explicit ZSTDCodec(int level, CodecType type);
1023 bool doNeedsUncompressedLength() const override;
1024 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
1025 std::unique_ptr<IOBuf> doUncompress(
1027 uint64_t uncompressedLength) override;
1032 std::unique_ptr<Codec> ZSTDCodec::create(int level, CodecType type) {
1033 return make_unique<ZSTDCodec>(level, type);
1036 ZSTDCodec::ZSTDCodec(int level, CodecType type) : Codec(type) {
1037 DCHECK(type == CodecType::ZSTD);
1039 case COMPRESSION_LEVEL_FASTEST:
1042 case COMPRESSION_LEVEL_DEFAULT:
1045 case COMPRESSION_LEVEL_BEST:
1049 if (level < 1 || level > ZSTD_maxCLevel()) {
1050 throw std::invalid_argument(
1051 to<std::string>("ZSTD: invalid level: ", level));
1056 bool ZSTDCodec::doNeedsUncompressedLength() const {
1060 void zstdThrowIfError(size_t rc) {
1061 if (!ZSTD_isError(rc)) {
1064 throw std::runtime_error(
1065 to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
1068 std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
1069 // Support earlier versions of the codec (working with a single IOBuf,
1070 // and using ZSTD_decompress which requires ZSTD frame to contain size,
1071 // which isn't populated by streaming API).
1072 if (!data->isChained()) {
1073 auto out = IOBuf::createCombined(ZSTD_compressBound(data->length()));
1074 const auto rc = ZSTD_compress(
1075 out->writableData(),
1080 zstdThrowIfError(rc);
1085 auto zcs = ZSTD_createCStream();
1087 ZSTD_freeCStream(zcs);
1090 auto rc = ZSTD_initCStream(zcs, level_);
1091 zstdThrowIfError(rc);
1093 Cursor cursor(data);
1094 auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength()));
1097 out.dst = result->writableTail();
1098 out.size = result->capacity();
1101 for (auto buffer = cursor.peekBytes(); !buffer.empty();) {
1103 in.src = buffer.data();
1104 in.size = buffer.size();
1105 for (in.pos = 0; in.pos != in.size;) {
1106 rc = ZSTD_compressStream(zcs, &out, &in);
1107 zstdThrowIfError(rc);
1109 cursor.skip(in.size);
1110 buffer = cursor.peekBytes();
1113 rc = ZSTD_endStream(zcs, &out);
1114 zstdThrowIfError(rc);
1117 result->append(out.pos);
1121 std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(
1123 uint64_t uncompressedLength) {
1124 auto zds = ZSTD_createDStream();
1126 ZSTD_freeDStream(zds);
1129 auto rc = ZSTD_initDStream(zds);
1130 zstdThrowIfError(rc);
1132 ZSTD_outBuffer out{};
1135 auto outputSize = ZSTD_DStreamOutSize();
1136 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) {
1137 outputSize = uncompressedLength;
1139 auto decompressedSize =
1140 ZSTD_getDecompressedSize(data->data(), data->length());
1141 if (decompressedSize != 0 && decompressedSize < outputSize) {
1142 outputSize = decompressedSize;
1146 IOBufQueue queue(IOBufQueue::cacheChainLength());
1148 Cursor cursor(data);
1150 if (in.pos == in.size) {
1151 auto buffer = cursor.peekBytes();
1152 in.src = buffer.data();
1153 in.size = buffer.size();
1155 cursor.skip(in.size);
1156 if (rc > 1 && in.size == 0) {
1157 throw std::runtime_error(to<std::string>("ZSTD: incomplete input"));
1160 if (out.pos == out.size) {
1162 queue.postallocate(out.pos);
1164 auto buffer = queue.preallocate(outputSize, outputSize);
1165 out.dst = buffer.first;
1166 out.size = buffer.second;
1168 outputSize = ZSTD_DStreamOutSize();
1170 rc = ZSTD_decompressStream(zds, &out, &in);
1171 zstdThrowIfError(rc);
1177 queue.postallocate(out.pos);
1179 if (in.pos != in.size || !cursor.isAtEnd()) {
1180 throw std::runtime_error("ZSTD: junk after end of data");
1182 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
1183 queue.chainLength() != uncompressedLength) {
1184 throw std::runtime_error("ZSTD: invalid uncompressed length");
1187 return queue.move();
1190 #endif // FOLLY_HAVE_LIBZSTD
1194 typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
1196 codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
1197 nullptr, // USER_DEFINED
1198 NoCompressionCodec::create,
1200 #if FOLLY_HAVE_LIBLZ4
1206 #if FOLLY_HAVE_LIBSNAPPY
1207 SnappyCodec::create,
1218 #if FOLLY_HAVE_LIBLZ4
1224 #if FOLLY_HAVE_LIBLZMA
1232 #if FOLLY_HAVE_LIBZSTD
1245 bool hasCodec(CodecType type) {
1246 size_t idx = static_cast<size_t>(type);
1247 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
1248 throw std::invalid_argument(
1249 to<std::string>("Compression type ", idx, " invalid"));
1251 return codecFactories[idx] != nullptr;
1254 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
1255 size_t idx = static_cast<size_t>(type);
1256 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
1257 throw std::invalid_argument(
1258 to<std::string>("Compression type ", idx, " invalid"));
1260 auto factory = codecFactories[idx];
1262 throw std::invalid_argument(to<std::string>(
1263 "Compression type ", idx, " not supported"));
1265 auto codec = (*factory)(level, type);
1266 DCHECK_EQ(static_cast<size_t>(codec->type()), idx);