/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
uint64_t len = data->computeChainDataLength();
if (len == 0) {
return IOBuf::create(0);
- } else if (len > maxUncompressedLength()) {
+ }
+ if (len > maxUncompressedLength()) {
throw std::runtime_error("Codec: uncompressed length too large");
}
return doCompress(data);
}
+std::string Codec::compress(const StringPiece data) {
+ const uint64_t len = data.size();
+ if (len == 0) {
+ return "";
+ }
+ if (len > maxUncompressedLength()) {
+ throw std::runtime_error("Codec: uncompressed length too large");
+ }
+
+ return doCompressString(data);
+}
+
std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
uint64_t uncompressedLength) {
if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
return doUncompress(data, uncompressedLength);
}
+std::string Codec::uncompress(
+ const StringPiece data,
+ uint64_t uncompressedLength) {
+ if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
+ if (needsUncompressedLength()) {
+ throw std::invalid_argument("Codec: uncompressed length required");
+ }
+ } else if (uncompressedLength > maxUncompressedLength()) {
+ throw std::runtime_error("Codec: uncompressed length too large");
+ }
+
+ if (data.empty()) {
+ if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+ uncompressedLength != 0) {
+ throw std::runtime_error("Codec: invalid uncompressed length");
+ }
+ return "";
+ }
+
+ return doUncompressString(data, uncompressedLength);
+}
+
bool Codec::needsUncompressedLength() const {
return doNeedsUncompressedLength();
}
return UNLIMITED_UNCOMPRESSED_LENGTH;
}
+std::string Codec::doCompressString(const StringPiece data) {
+ const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
+ auto outputBuffer = doCompress(&inputBuffer);
+ std::string output;
+ output.reserve(outputBuffer->computeChainDataLength());
+ for (auto range : *outputBuffer) {
+ output.append(reinterpret_cast<const char*>(range.data()), range.size());
+ }
+ return output;
+}
+
+std::string Codec::doUncompressString(
+ const StringPiece data,
+ uint64_t uncompressedLength) {
+ const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
+ auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
+ std::string output;
+ output.reserve(outputBuffer->computeChainDataLength());
+ for (auto range : *outputBuffer) {
+ output.append(reinterpret_cast<const char*>(range.data()), range.size());
+ }
+ return output;
+}
+
namespace {
/**
}
std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
- std::unique_ptr<IOBuf> clone;
+ IOBuf clone;
if (data->isChained()) {
// LZ4 doesn't support streaming, so we have to coalesce
- clone = data->clone();
- clone->coalesce();
- data = clone.get();
+ clone = data->cloneCoalescedAsValue();
+ data = &clone;
}
uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
}
int n;
+ auto input = reinterpret_cast<const char*>(data->data());
+ auto output = reinterpret_cast<char*>(out->writableTail());
+ const auto inputLength = data->length();
+#if LZ4_VERSION_NUMBER >= 10700
if (highCompression_) {
- n = LZ4_compressHC(reinterpret_cast<const char*>(data->data()),
- reinterpret_cast<char*>(out->writableTail()),
- data->length());
+ n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
} else {
- n = LZ4_compress(reinterpret_cast<const char*>(data->data()),
- reinterpret_cast<char*>(out->writableTail()),
- data->length());
+ n = LZ4_compress_default(input, output, inputLength, out->tailroom());
}
+#else
+ if (highCompression_) {
+ n = LZ4_compressHC(input, output, inputLength);
+ } else {
+ n = LZ4_compress(input, output, inputLength);
+ }
+#endif
CHECK_GE(n, 0);
CHECK_LE(n, out->capacity());
std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
const IOBuf* data,
uint64_t uncompressedLength) {
- std::unique_ptr<IOBuf> clone;
+ IOBuf clone;
if (data->isChained()) {
// LZ4 doesn't support streaming, so we have to coalesce
- clone = data->clone();
- clone->coalesce();
- data = clone.get();
+ clone = data->cloneCoalescedAsValue();
+ data = &clone;
}
folly::io::Cursor cursor(data);
}
}
- auto p = cursor.peek();
+ auto sp = StringPiece{cursor.peekBytes()};
auto out = IOBuf::create(actualUncompressedLength);
- int n = LZ4_decompress_safe(reinterpret_cast<const char*>(p.first),
- reinterpret_cast<char*>(out->writableTail()),
- p.second,
- actualUncompressedLength);
+ int n = LZ4_decompress_safe(
+ sp.data(),
+ reinterpret_cast<char*>(out->writableTail()),
+ sp.size(),
+ actualUncompressedLength);
if (n < 0 || uint64_t(n) != actualUncompressedLength) {
throw std::runtime_error(to<std::string>(
}
const char* IOBufSnappySource::Peek(size_t* len) {
- auto p = cursor_.peek();
- *len = p.second;
- return reinterpret_cast<const char*>(p.first);
+ auto sp = StringPiece{cursor_.peekBytes()};
+ *len = sp.size();
+ return sp.data();
}
void IOBufSnappySource::Skip(size_t n) {
bool success = false;
SCOPE_EXIT {
- int rc = deflateEnd(&stream);
+ rc = deflateEnd(&stream);
// If we're here because of an exception, it's okay if some data
// got dropped.
CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
bool success = false;
SCOPE_EXIT {
- int rc = inflateEnd(&stream);
+ rc = inflateEnd(&stream);
// If we're here because of an exception, it's okay if some data
// got dropped.
CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
defaultBufferLength));
bool streamEnd = false;
- auto buf = cursor.peek();
- while (buf.second != 0) {
- stream.next_in = const_cast<uint8_t*>(buf.first);
- stream.avail_in = buf.second;
+ auto buf = cursor.peekBytes();
+ while (!buf.empty()) {
+ stream.next_in = const_cast<uint8_t*>(buf.data());
+ stream.avail_in = buf.size();
while (stream.avail_in != 0) {
if (streamEnd) {
streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
}
- cursor.skip(buf.second);
- buf = cursor.peek();
+ cursor.skip(buf.size());
+ buf = cursor.peekBytes();
}
while (!streamEnd) {
#ifdef FOLLY_HAVE_LIBZSTD
/**
- * ZSTD_BETA compression
+ * ZSTD compression
*/
class ZSTDCodec final : public Codec {
public:
const IOBuf* data,
uint64_t uncompressedLength) override;
- int level_{1};
+ int level_;
};
std::unique_ptr<Codec> ZSTDCodec::create(int level, CodecType type) {
}
ZSTDCodec::ZSTDCodec(int level, CodecType type) : Codec(type) {
- DCHECK(type == CodecType::ZSTD_BETA);
+ DCHECK(type == CodecType::ZSTD);
switch (level) {
case COMPRESSION_LEVEL_FASTEST:
- level_ = 1;
+ level = 1;
break;
case COMPRESSION_LEVEL_DEFAULT:
- level_ = 1;
+ level = 1;
break;
case COMPRESSION_LEVEL_BEST:
- level_ = 19;
+ level = 19;
break;
}
+ if (level < 1 || level > ZSTD_maxCLevel()) {
+ throw std::invalid_argument(
+ to<std::string>("ZSTD: invalid level: ", level));
+ }
+ level_ = level;
}
bool ZSTDCodec::doNeedsUncompressedLength() const {
- return true;
+ return false;
+}
+
+void zstdThrowIfError(size_t rc) {
+ if (!ZSTD_isError(rc)) {
+ return;
+ }
+ throw std::runtime_error(
+ to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
}
std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
- size_t rc;
- size_t maxCompressedLength = ZSTD_compressBound(data->length());
- auto out = IOBuf::createCombined(maxCompressedLength);
+ // Support earlier versions of the codec (working with a single IOBuf,
+ // and using ZSTD_decompress which requires ZSTD frame to contain size,
+ // which isn't populated by streaming API).
+ if (!data->isChained()) {
+ auto out = IOBuf::createCombined(ZSTD_compressBound(data->length()));
+ const auto rc = ZSTD_compress(
+ out->writableData(),
+ out->capacity(),
+ data->data(),
+ data->length(),
+ level_);
+ zstdThrowIfError(rc);
+ out->append(rc);
+ return out;
+ }
- CHECK_EQ(out->length(), 0);
+ auto zcs = ZSTD_createCStream();
+ SCOPE_EXIT {
+ ZSTD_freeCStream(zcs);
+ };
- rc = ZSTD_compress(out->writableTail(),
- out->capacity(),
- data->data(),
- data->length(),
- level_);
+ auto rc = ZSTD_initCStream(zcs, level_);
+ zstdThrowIfError(rc);
- if (ZSTD_isError(rc)) {
- throw std::runtime_error(to<std::string>(
- "ZSTD compression returned an error: ",
- ZSTD_getErrorName(rc)));
+ Cursor cursor(data);
+ auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength()));
+
+ ZSTD_outBuffer out;
+ out.dst = result->writableTail();
+ out.size = result->capacity();
+ out.pos = 0;
+
+ for (auto buffer = cursor.peekBytes(); !buffer.empty();) {
+ ZSTD_inBuffer in;
+ in.src = buffer.data();
+ in.size = buffer.size();
+ for (in.pos = 0; in.pos != in.size;) {
+ rc = ZSTD_compressStream(zcs, &out, &in);
+ zstdThrowIfError(rc);
+ }
+ cursor.skip(in.size);
+ buffer = cursor.peekBytes();
}
- out->append(rc);
- CHECK_EQ(out->length(), rc);
+ rc = ZSTD_endStream(zcs, &out);
+ zstdThrowIfError(rc);
+ CHECK_EQ(rc, 0);
- return out;
+ result->append(out.pos);
+ return result;
}
-std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(const IOBuf* data,
- uint64_t uncompressedLength) {
- size_t rc;
- auto out = IOBuf::createCombined(uncompressedLength);
+std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(
+ const IOBuf* data,
+ uint64_t uncompressedLength) {
+ auto zds = ZSTD_createDStream();
+ SCOPE_EXIT {
+ ZSTD_freeDStream(zds);
+ };
- CHECK_GE(out->capacity(), uncompressedLength);
- CHECK_EQ(out->length(), 0);
+ auto rc = ZSTD_initDStream(zds);
+ zstdThrowIfError(rc);
- rc = ZSTD_decompress(
- out->writableTail(), out->capacity(), data->data(), data->length());
+ ZSTD_outBuffer out{};
+ ZSTD_inBuffer in{};
- if (ZSTD_isError(rc)) {
- throw std::runtime_error(to<std::string>(
- "ZSTD decompression returned an error: ",
- ZSTD_getErrorName(rc)));
+ auto outputSize = ZSTD_DStreamOutSize();
+ if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) {
+ outputSize = uncompressedLength;
+ } else {
+ auto decompressedSize =
+ ZSTD_getDecompressedSize(data->data(), data->length());
+ if (decompressedSize != 0 && decompressedSize < outputSize) {
+ outputSize = decompressedSize;
+ }
}
- out->append(rc);
- CHECK_EQ(out->length(), rc);
+ IOBufQueue queue(IOBufQueue::cacheChainLength());
+
+ Cursor cursor(data);
+ for (rc = 0;;) {
+ if (in.pos == in.size) {
+ auto buffer = cursor.peekBytes();
+ in.src = buffer.data();
+ in.size = buffer.size();
+ in.pos = 0;
+ cursor.skip(in.size);
+ if (rc > 1 && in.size == 0) {
+ throw std::runtime_error(to<std::string>("ZSTD: incomplete input"));
+ }
+ }
+ if (out.pos == out.size) {
+ if (out.pos != 0) {
+ queue.postallocate(out.pos);
+ }
+ auto buffer = queue.preallocate(outputSize, outputSize);
+ out.dst = buffer.first;
+ out.size = buffer.second;
+ out.pos = 0;
+ outputSize = ZSTD_DStreamOutSize();
+ }
+ rc = ZSTD_decompressStream(zds, &out, &in);
+ zstdThrowIfError(rc);
+ if (rc == 0) {
+ break;
+ }
+ }
+ if (out.pos != 0) {
+ queue.postallocate(out.pos);
+ }
+ if (in.pos != in.size || !cursor.isAtEnd()) {
+ throw std::runtime_error("ZSTD: junk after end of data");
+ }
+ if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+ queue.chainLength() != uncompressedLength) {
+ throw std::runtime_error("ZSTD: invalid uncompressed length");
+ }
- return out;
+ return queue.move();
}
#endif // FOLLY_HAVE_LIBZSTD
} // namespace
-std::unique_ptr<Codec> getCodec(CodecType type, int level) {
- typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
-
- static CodecFactory codecFactories[
- static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
- nullptr, // USER_DEFINED
- NoCompressionCodec::create,
+typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
+static CodecFactory
+ codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
+ nullptr, // USER_DEFINED
+ NoCompressionCodec::create,
#if FOLLY_HAVE_LIBLZ4
- LZ4Codec::create,
+ LZ4Codec::create,
#else
- nullptr,
+ nullptr,
#endif
#if FOLLY_HAVE_LIBSNAPPY
- SnappyCodec::create,
+ SnappyCodec::create,
#else
- nullptr,
+ nullptr,
#endif
#if FOLLY_HAVE_LIBZ
- ZlibCodec::create,
+ ZlibCodec::create,
#else
- nullptr,
+ nullptr,
#endif
#if FOLLY_HAVE_LIBLZ4
- LZ4Codec::create,
+ LZ4Codec::create,
#else
- nullptr,
+ nullptr,
#endif
#if FOLLY_HAVE_LIBLZMA
- LZMA2Codec::create,
- LZMA2Codec::create,
+ LZMA2Codec::create,
+ LZMA2Codec::create,
#else
- nullptr,
- nullptr,
+ nullptr,
+ nullptr,
#endif
#if FOLLY_HAVE_LIBZSTD
- ZSTDCodec::create,
+ ZSTDCodec::create,
#else
- nullptr,
+ nullptr,
#endif
#if FOLLY_HAVE_LIBZ
- ZlibCodec::create,
+ ZlibCodec::create,
#else
- nullptr,
+ nullptr,
#endif
- };
+};
+bool hasCodec(CodecType type) {
size_t idx = static_cast<size_t>(type);
if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
- throw std::invalid_argument(to<std::string>(
- "Compression type ", idx, " not supported"));
+ throw std::invalid_argument(
+ to<std::string>("Compression type ", idx, " invalid"));
+ }
+ return codecFactories[idx] != nullptr;
+}
+
+std::unique_ptr<Codec> getCodec(CodecType type, int level) {
+ size_t idx = static_cast<size_t>(type);
+ if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
+ throw std::invalid_argument(
+ to<std::string>("Compression type ", idx, " invalid"));
}
auto factory = codecFactories[idx];
if (!factory) {