2 * Copyright 2013 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include "folly/io/Compression.h"
21 #include <glog/logging.h>
23 #include <snappy-sinksource.h>
26 #include "folly/Conv.h"
27 #include "folly/Memory.h"
28 #include "folly/Portability.h"
29 #include "folly/ScopeGuard.h"
30 #include "folly/Varint.h"
31 #include "folly/io/Cursor.h"
33 namespace folly { namespace io {
35 Codec::Codec(CodecType type) : type_(type) { }
37 // Ensure consistent behavior in the nullptr case
38 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
39 return !data->empty() ? doCompress(data) : IOBuf::create(0);
42 std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
43 uint64_t uncompressedLength) {
44 if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
45 if (needsUncompressedLength()) {
46 throw std::invalid_argument("Codec: uncompressed length required");
48 } else if (uncompressedLength > maxUncompressedLength()) {
49 throw std::runtime_error("Codec: uncompressed length too large");
53 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
54 uncompressedLength != 0) {
55 throw std::runtime_error("Codec: invalid uncompressed length");
57 return IOBuf::create(0);
60 return doUncompress(data, uncompressedLength);
63 bool Codec::needsUncompressedLength() const {
64 return doNeedsUncompressedLength();
67 uint64_t Codec::maxUncompressedLength() const {
68 return doMaxUncompressedLength();
71 bool Codec::doNeedsUncompressedLength() const {
75 uint64_t Codec::doMaxUncompressedLength() const {
76 return std::numeric_limits<uint64_t>::max() - 1;
84 class NoCompressionCodec FOLLY_FINAL : public Codec {
86 static std::unique_ptr<Codec> create(int level, CodecType type);
87 explicit NoCompressionCodec(int level, CodecType type);
90 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
91 std::unique_ptr<IOBuf> doUncompress(
93 uint64_t uncompressedLength) FOLLY_OVERRIDE;
96 std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
97 return make_unique<NoCompressionCodec>(level, type);
100 NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
102 DCHECK(type == CodecType::NO_COMPRESSION);
104 case COMPRESSION_LEVEL_DEFAULT:
105 case COMPRESSION_LEVEL_FASTEST:
106 case COMPRESSION_LEVEL_BEST:
110 throw std::invalid_argument(to<std::string>(
111 "NoCompressionCodec: invalid level ", level));
115 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
117 return data->clone();
120 std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
122 uint64_t uncompressedLength) {
123 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
124 data->computeChainDataLength() != uncompressedLength) {
125 throw std::runtime_error(to<std::string>(
126 "NoCompressionCodec: invalid uncompressed length"));
128 return data->clone();
134 class LZ4Codec FOLLY_FINAL : public Codec {
136 static std::unique_ptr<Codec> create(int level, CodecType type);
137 explicit LZ4Codec(int level, CodecType type);
140 bool doNeedsUncompressedLength() const FOLLY_OVERRIDE;
141 uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
143 bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
145 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
146 std::unique_ptr<IOBuf> doUncompress(
148 uint64_t uncompressedLength) FOLLY_OVERRIDE;
150 bool highCompression_;
153 std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
154 return make_unique<LZ4Codec>(level, type);
157 LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
158 DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
161 case COMPRESSION_LEVEL_FASTEST:
162 case COMPRESSION_LEVEL_DEFAULT:
165 case COMPRESSION_LEVEL_BEST:
169 if (level < 1 || level > 2) {
170 throw std::invalid_argument(to<std::string>(
171 "LZ4Codec: invalid level: ", level));
173 highCompression_ = (level > 1);
176 bool LZ4Codec::doNeedsUncompressedLength() const {
177 return !encodeSize();
180 uint64_t LZ4Codec::doMaxUncompressedLength() const {
181 // From lz4.h: "Max supported value is ~1.9GB"; I wish we had something
183 return 1.8 * (uint64_t(1) << 30);
188 void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
189 DCHECK_GE(out->tailroom(), kMaxVarintLength64);
190 out->append(encodeVarint(val, out->writableTail()));
193 uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
194 // Must have enough room in *this* buffer.
195 auto p = cursor.peek();
196 folly::ByteRange range(p.first, p.second);
197 uint64_t val = decodeVarint(range);
198 cursor.skip(range.data() - p.first);
204 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
205 std::unique_ptr<IOBuf> clone;
206 if (data->isChained()) {
207 // LZ4 doesn't support streaming, so we have to coalesce
208 clone = data->clone();
213 uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
214 auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
216 encodeVarintToIOBuf(data->length(), out.get());
220 if (highCompression_) {
221 n = LZ4_compressHC(reinterpret_cast<const char*>(data->data()),
222 reinterpret_cast<char*>(out->writableTail()),
225 n = LZ4_compress(reinterpret_cast<const char*>(data->data()),
226 reinterpret_cast<char*>(out->writableTail()),
231 CHECK_LE(n, out->capacity());
237 std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
239 uint64_t uncompressedLength) {
240 std::unique_ptr<IOBuf> clone;
241 if (data->isChained()) {
242 // LZ4 doesn't support streaming, so we have to coalesce
243 clone = data->clone();
248 folly::io::Cursor cursor(data);
249 uint64_t actualUncompressedLength;
251 actualUncompressedLength = decodeVarintFromCursor(cursor);
252 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
253 uncompressedLength != actualUncompressedLength) {
254 throw std::runtime_error("LZ4Codec: invalid uncompressed length");
257 actualUncompressedLength = uncompressedLength;
258 DCHECK_NE(actualUncompressedLength, UNKNOWN_UNCOMPRESSED_LENGTH);
261 auto out = IOBuf::create(actualUncompressedLength);
262 auto p = cursor.peek();
263 int n = LZ4_uncompress(reinterpret_cast<const char*>(p.first),
264 reinterpret_cast<char*>(out->writableTail()),
265 actualUncompressedLength);
267 throw std::runtime_error(to<std::string>(
268 "LZ4 decompression returned invalid value ", n));
270 out->append(actualUncompressedLength);
279 * Implementation of snappy::Source that reads from a IOBuf chain.
281 class IOBufSnappySource FOLLY_FINAL : public snappy::Source {
283 explicit IOBufSnappySource(const IOBuf* data);
284 size_t Available() const FOLLY_OVERRIDE;
285 const char* Peek(size_t* len) FOLLY_OVERRIDE;
286 void Skip(size_t n) FOLLY_OVERRIDE;
292 IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
293 : available_(data->computeChainDataLength()),
297 size_t IOBufSnappySource::Available() const {
301 const char* IOBufSnappySource::Peek(size_t* len) {
302 auto p = cursor_.peek();
304 return reinterpret_cast<const char*>(p.first);
307 void IOBufSnappySource::Skip(size_t n) {
308 CHECK_LE(n, available_);
313 class SnappyCodec FOLLY_FINAL : public Codec {
315 static std::unique_ptr<Codec> create(int level, CodecType type);
316 explicit SnappyCodec(int level, CodecType type);
319 uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
320 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
321 std::unique_ptr<IOBuf> doUncompress(
323 uint64_t uncompressedLength) FOLLY_OVERRIDE;
326 std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
327 return make_unique<SnappyCodec>(level, type);
330 SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
331 DCHECK(type == CodecType::SNAPPY);
333 case COMPRESSION_LEVEL_FASTEST:
334 case COMPRESSION_LEVEL_DEFAULT:
335 case COMPRESSION_LEVEL_BEST:
339 throw std::invalid_argument(to<std::string>(
340 "SnappyCodec: invalid level: ", level));
344 uint64_t SnappyCodec::doMaxUncompressedLength() const {
345 // snappy.h uses uint32_t for lengths, so there's that.
346 return std::numeric_limits<uint32_t>::max();
349 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
350 IOBufSnappySource source(data);
352 IOBuf::create(snappy::MaxCompressedLength(source.Available()));
354 snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
355 out->writableTail()));
357 size_t n = snappy::Compress(&source, &sink);
359 CHECK_LE(n, out->capacity());
364 std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
365 uint64_t uncompressedLength) {
366 uint32_t actualUncompressedLength = 0;
369 IOBufSnappySource source(data);
370 if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
371 throw std::runtime_error("snappy::GetUncompressedLength failed");
373 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
374 uncompressedLength != actualUncompressedLength) {
375 throw std::runtime_error("snappy: invalid uncompressed length");
379 auto out = IOBuf::create(actualUncompressedLength);
382 IOBufSnappySource source(data);
383 if (!snappy::RawUncompress(&source,
384 reinterpret_cast<char*>(out->writableTail()))) {
385 throw std::runtime_error("snappy::RawUncompress failed");
389 out->append(actualUncompressedLength);
396 class ZlibCodec FOLLY_FINAL : public Codec {
398 static std::unique_ptr<Codec> create(int level, CodecType type);
399 explicit ZlibCodec(int level, CodecType type);
402 std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
403 std::unique_ptr<IOBuf> doUncompress(
405 uint64_t uncompressedLength) FOLLY_OVERRIDE;
407 std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
408 bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
413 std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
414 return make_unique<ZlibCodec>(level, type);
417 ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
418 DCHECK(type == CodecType::ZLIB);
420 case COMPRESSION_LEVEL_FASTEST:
423 case COMPRESSION_LEVEL_DEFAULT:
424 level = Z_DEFAULT_COMPRESSION;
426 case COMPRESSION_LEVEL_BEST:
430 if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
431 throw std::invalid_argument(to<std::string>(
432 "ZlibCodec: invalid level: ", level));
437 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
439 CHECK_EQ(stream->avail_out, 0);
441 auto buf = IOBuf::create(length);
444 stream->next_out = buf->writableData();
445 stream->avail_out = buf->length();
450 bool ZlibCodec::doInflate(z_stream* stream,
452 uint32_t bufferLength) {
453 if (stream->avail_out == 0) {
454 head->prependChain(addOutputBuffer(stream, bufferLength));
457 int rc = inflate(stream, Z_NO_FLUSH);
468 throw std::runtime_error(to<std::string>(
469 "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
471 CHECK(false) << rc << ": " << stream->msg;
478 std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
480 stream.zalloc = nullptr;
481 stream.zfree = nullptr;
482 stream.opaque = nullptr;
484 int rc = deflateInit(&stream, level_);
486 throw std::runtime_error(to<std::string>(
487 "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
490 stream.next_in = stream.next_out = nullptr;
491 stream.avail_in = stream.avail_out = 0;
492 stream.total_in = stream.total_out = 0;
494 bool success = false;
497 int rc = deflateEnd(&stream);
498 // If we're here because of an exception, it's okay if some data
500 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
501 << rc << ": " << stream.msg;
504 uint64_t uncompressedLength = data->computeChainDataLength();
505 uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
507 // Max 64MiB in one go
508 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
509 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
511 auto out = addOutputBuffer(
513 (maxCompressedLength <= maxSingleStepLength ?
514 maxCompressedLength :
515 defaultBufferLength));
517 for (auto& range : *data) {
522 stream.next_in = const_cast<uint8_t*>(range.data());
523 stream.avail_in = range.size();
525 while (stream.avail_in != 0) {
526 if (stream.avail_out == 0) {
527 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
530 rc = deflate(&stream, Z_NO_FLUSH);
532 CHECK_EQ(rc, Z_OK) << stream.msg;
537 if (stream.avail_out == 0) {
538 out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
541 rc = deflate(&stream, Z_FINISH);
542 } while (rc == Z_OK);
544 CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
546 out->prev()->trimEnd(stream.avail_out);
548 success = true; // we survived
553 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
554 uint64_t uncompressedLength) {
556 stream.zalloc = nullptr;
557 stream.zfree = nullptr;
558 stream.opaque = nullptr;
560 int rc = inflateInit(&stream);
562 throw std::runtime_error(to<std::string>(
563 "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
566 stream.next_in = stream.next_out = nullptr;
567 stream.avail_in = stream.avail_out = 0;
568 stream.total_in = stream.total_out = 0;
570 bool success = false;
573 int rc = inflateEnd(&stream);
574 // If we're here because of an exception, it's okay if some data
576 CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
577 << rc << ": " << stream.msg;
580 // Max 64MiB in one go
581 constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB
582 constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB
584 auto out = addOutputBuffer(
586 ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
587 uncompressedLength <= maxSingleStepLength) ?
589 defaultBufferLength));
591 bool streamEnd = false;
592 for (auto& range : *data) {
597 stream.next_in = const_cast<uint8_t*>(range.data());
598 stream.avail_in = range.size();
600 while (stream.avail_in != 0) {
602 throw std::runtime_error(to<std::string>(
603 "ZlibCodec: junk after end of data"));
606 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
611 streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
614 out->prev()->trimEnd(stream.avail_out);
616 if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
617 uncompressedLength != stream.total_out) {
618 throw std::runtime_error(to<std::string>(
619 "ZlibCodec: invalid uncompressed length"));
622 success = true; // we survived
627 typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
629 CodecFactory gCodecFactories[
630 static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
631 nullptr, // USER_DEFINED
632 NoCompressionCodec::create,
641 std::unique_ptr<Codec> getCodec(CodecType type, int level) {
642 size_t idx = static_cast<size_t>(type);
643 if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
644 throw std::invalid_argument(to<std::string>(
645 "Compression type ", idx, " not supported"));
647 auto factory = gCodecFactories[idx];
649 throw std::invalid_argument(to<std::string>(
650 "Compression type ", idx, " not supported"));
652 auto codec = (*factory)(level, type);
653 DCHECK_EQ(static_cast<size_t>(codec->type()), idx);