/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2016 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
}
}
- auto p = cursor.peek();
+ auto sp = StringPiece{cursor.peekBytes()};
auto out = IOBuf::create(actualUncompressedLength);
- int n = LZ4_decompress_safe(reinterpret_cast<const char*>(p.first),
- reinterpret_cast<char*>(out->writableTail()),
- p.second,
- actualUncompressedLength);
+ int n = LZ4_decompress_safe(
+ sp.data(),
+ reinterpret_cast<char*>(out->writableTail()),
+ sp.size(),
+ actualUncompressedLength);
if (n < 0 || uint64_t(n) != actualUncompressedLength) {
throw std::runtime_error(to<std::string>(
}
const char* IOBufSnappySource::Peek(size_t* len) {
- auto p = cursor_.peek();
- *len = p.second;
- return reinterpret_cast<const char*>(p.first);
+ auto sp = StringPiece{cursor_.peekBytes()};
+ *len = sp.size();
+ return sp.data();
}
void IOBufSnappySource::Skip(size_t n) {
defaultBufferLength));
bool streamEnd = false;
- auto buf = cursor.peek();
- while (buf.second != 0) {
- stream.next_in = const_cast<uint8_t*>(buf.first);
- stream.avail_in = buf.second;
+ auto buf = cursor.peekBytes();
+ while (!buf.empty()) {
+ stream.next_in = const_cast<uint8_t*>(buf.data());
+ stream.avail_in = buf.size();
while (stream.avail_in != 0) {
if (streamEnd) {
streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
}
- cursor.skip(buf.second);
- buf = cursor.peek();
+ cursor.skip(buf.size());
+ buf = cursor.peekBytes();
}
while (!streamEnd) {
#ifdef FOLLY_HAVE_LIBZSTD
/**
- * ZSTD_BETA compression
+ * ZSTD compression
*/
class ZSTDCodec final : public Codec {
public:
}
ZSTDCodec::ZSTDCodec(int level, CodecType type) : Codec(type) {
- DCHECK(type == CodecType::ZSTD_BETA);
+ DCHECK(type == CodecType::ZSTD);
switch (level) {
case COMPRESSION_LEVEL_FASTEST:
level_ = 1;
}
bool ZSTDCodec::doNeedsUncompressedLength() const {
- return true;
+ return false;
+}
+
+void zstdThrowIfError(size_t rc) {
+ if (!ZSTD_isError(rc)) {
+ return;
+ }
+ throw std::runtime_error(
+ to<std::string>("ZSTD returned an error: ", ZSTD_getErrorName(rc)));
}
std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
- size_t rc;
- size_t maxCompressedLength = ZSTD_compressBound(data->length());
- auto out = IOBuf::createCombined(maxCompressedLength);
+ // Support earlier versions of the codec (working with a single IOBuf,
+ // and using ZSTD_decompress which requires ZSTD frame to contain size,
+ // which isn't populated by streaming API).
+ if (!data->isChained()) {
+ auto out = IOBuf::createCombined(ZSTD_compressBound(data->length()));
+ const auto rc = ZSTD_compress(
+ out->writableData(),
+ out->capacity(),
+ data->data(),
+ data->length(),
+ level_);
+ zstdThrowIfError(rc);
+ out->append(rc);
+ return out;
+ }
- CHECK_EQ(out->length(), 0);
+ auto zcs = ZSTD_createCStream();
+ SCOPE_EXIT {
+ ZSTD_freeCStream(zcs);
+ };
- rc = ZSTD_compress(out->writableTail(),
- out->capacity(),
- data->data(),
- data->length(),
- level_);
+ auto rc = ZSTD_initCStream(zcs, level_);
+ zstdThrowIfError(rc);
- if (ZSTD_isError(rc)) {
- throw std::runtime_error(to<std::string>(
- "ZSTD compression returned an error: ",
- ZSTD_getErrorName(rc)));
+ Cursor cursor(data);
+ auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength()));
+
+ ZSTD_outBuffer out;
+ out.dst = result->writableTail();
+ out.size = result->capacity();
+ out.pos = 0;
+
+ for (auto buffer = cursor.peekBytes(); !buffer.empty();) {
+ ZSTD_inBuffer in;
+ in.src = buffer.data();
+ in.size = buffer.size();
+ for (in.pos = 0; in.pos != in.size;) {
+ rc = ZSTD_compressStream(zcs, &out, &in);
+ zstdThrowIfError(rc);
+ }
+ cursor.skip(in.size);
+ buffer = cursor.peekBytes();
}
- out->append(rc);
- CHECK_EQ(out->length(), rc);
+ rc = ZSTD_endStream(zcs, &out);
+ zstdThrowIfError(rc);
+ CHECK_EQ(rc, 0);
- return out;
+ result->append(out.pos);
+ return result;
}
-std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(const IOBuf* data,
- uint64_t uncompressedLength) {
- size_t rc;
- auto out = IOBuf::createCombined(uncompressedLength);
+std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(
+ const IOBuf* data,
+ uint64_t uncompressedLength) {
+ auto zds = ZSTD_createDStream();
+ SCOPE_EXIT {
+ ZSTD_freeDStream(zds);
+ };
- CHECK_GE(out->capacity(), uncompressedLength);
- CHECK_EQ(out->length(), 0);
+ auto rc = ZSTD_initDStream(zds);
+ zstdThrowIfError(rc);
- rc = ZSTD_decompress(
- out->writableTail(), out->capacity(), data->data(), data->length());
+ ZSTD_outBuffer out{};
+ ZSTD_inBuffer in{};
- if (ZSTD_isError(rc)) {
- throw std::runtime_error(to<std::string>(
- "ZSTD decompression returned an error: ",
- ZSTD_getErrorName(rc)));
+ auto outputSize = ZSTD_DStreamOutSize();
+ if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) {
+ outputSize = uncompressedLength;
+ } else {
+ auto decompressedSize =
+ ZSTD_getDecompressedSize(data->data(), data->length());
+ if (decompressedSize != 0 && decompressedSize < outputSize) {
+ outputSize = decompressedSize;
+ }
}
- out->append(rc);
- CHECK_EQ(out->length(), rc);
+ IOBufQueue queue(IOBufQueue::cacheChainLength());
+
+ Cursor cursor(data);
+ for (rc = 0;;) {
+ if (in.pos == in.size) {
+ auto buffer = cursor.peekBytes();
+ in.src = buffer.data();
+ in.size = buffer.size();
+ in.pos = 0;
+ cursor.skip(in.size);
+ if (rc > 1 && in.size == 0) {
+ throw std::runtime_error(to<std::string>("ZSTD: incomplete input"));
+ }
+ }
+ if (out.pos == out.size) {
+ if (out.pos != 0) {
+ queue.postallocate(out.pos);
+ }
+ auto buffer = queue.preallocate(outputSize, outputSize);
+ out.dst = buffer.first;
+ out.size = buffer.second;
+ out.pos = 0;
+ outputSize = ZSTD_DStreamOutSize();
+ }
+ rc = ZSTD_decompressStream(zds, &out, &in);
+ zstdThrowIfError(rc);
+ if (rc == 0) {
+ break;
+ }
+ }
+ if (out.pos != 0) {
+ queue.postallocate(out.pos);
+ }
+ if (in.pos != in.size || !cursor.isAtEnd()) {
+ throw std::runtime_error("ZSTD: junk after end of data");
+ }
+ if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+ queue.chainLength() != uncompressedLength) {
+ throw std::runtime_error("ZSTD: invalid uncompressed length");
+ }
- return out;
+ return queue.move();
}
#endif // FOLLY_HAVE_LIBZSTD