-std::unique_ptr<IOBuf> ZSTDCodec::doCompress(const IOBuf* data) {
- // Support earlier versions of the codec (working with a single IOBuf,
- // and using ZSTD_decompress which requires ZSTD frame to contain size,
- // which isn't populated by streaming API).
- if (!data->isChained()) {
- auto out = IOBuf::createCombined(ZSTD_compressBound(data->length()));
- const auto rc = ZSTD_compress(
- out->writableData(),
- out->capacity(),
- data->data(),
- data->length(),
- level_);
- zstdThrowIfError(rc);
- out->append(rc);
- return out;
- }
-
- auto zcs = ZSTD_createCStream();
- SCOPE_EXIT {
- ZSTD_freeCStream(zcs);
- };
-
- auto rc = ZSTD_initCStream(zcs, level_);
- zstdThrowIfError(rc);
-
- Cursor cursor(data);
- auto result =
- IOBuf::createCombined(maxCompressedLength(cursor.totalLength()));
-
- ZSTD_outBuffer out;
- out.dst = result->writableTail();
- out.size = result->capacity();
- out.pos = 0;
-
- for (auto buffer = cursor.peekBytes(); !buffer.empty();) {
- ZSTD_inBuffer in;
- in.src = buffer.data();
- in.size = buffer.size();
- for (in.pos = 0; in.pos != in.size;) {
- rc = ZSTD_compressStream(zcs, &out, &in);
- zstdThrowIfError(rc);
+Optional<uint64_t> ZSTDStreamCodec::doGetUncompressedLength(
+ IOBuf const* data,
+ Optional<uint64_t> uncompressedLength) const {
+ // Read decompressed size from frame if available in first IOBuf.
+ auto const decompressedSize =
+ ZSTD_getDecompressedSize(data->data(), data->length());
+ if (decompressedSize != 0) {
+ if (uncompressedLength && *uncompressedLength != decompressedSize) {
+ throw std::runtime_error("ZSTD: invalid uncompressed length");