From: Nick Terrell Date: Wed, 24 May 2017 21:59:41 +0000 (-0700) Subject: Add streaming API X-Git-Tag: v2017.05.29.00~9 X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=commitdiff_plain;h=74560278227a11b3080a756d2fffb154caf9e2f1 Add streaming API Summary: Adds a C-style streaming API to `folly/io/Compression.h`, with a zlib-esque interface. Stacked diffs will add streaming support to zstd, zlib, gzip, lzma, lz4_frame, and automatic codecs. This interface is targeting advanced users who are building higher level interfaces. They can use this as a common base so they don't have to reimplement the same code for every codec. Reviewed By: yfeldblum Differential Revision: D5026332 fbshipit-source-id: e3abf1767b493c2fef153b895858a3a81b67d989 --- diff --git a/folly/io/Compression.cpp b/folly/io/Compression.cpp index 4c02802e..2c47093b 100644 --- a/folly/io/Compression.cpp +++ b/folly/io/Compression.cpp @@ -176,6 +176,242 @@ std::string Codec::doUncompressString( return output; } +uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const { + if (uncompressedLength == 0) { + return 0; + } + return doMaxCompressedLength(uncompressedLength); +} + +Optional Codec::getUncompressedLength( + const folly::IOBuf* data, + Optional uncompressedLength) const { + auto const compressedLength = data->computeChainDataLength(); + if (uncompressedLength == uint64_t(0) || compressedLength == 0) { + if (uncompressedLength.value_or(0) != 0 || compressedLength != 0) { + throw std::runtime_error("Invalid uncompressed length"); + } + return 0; + } + return doGetUncompressedLength(data, uncompressedLength); +} + +Optional Codec::doGetUncompressedLength( + const folly::IOBuf*, + Optional uncompressedLength) const { + return uncompressedLength; +} + +bool StreamCodec::needsDataLength() const { + return doNeedsDataLength(); +} + +bool StreamCodec::doNeedsDataLength() const { + return false; +} + +void StreamCodec::assertStateIs(State expected) const { + if (state_ != expected) { + throw std::logic_error(folly::to( + "Codec: state is ", state_, "; expected state ", expected)); + } +} + +void StreamCodec::resetStream(Optional uncompressedLength) { + state_ = State::RESET; + uncompressedLength_ = uncompressedLength; + doResetStream(); +} + +bool StreamCodec::compressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) { + if (state_ == State::RESET && input.empty()) { + if (flushOp == StreamCodec::FlushOp::NONE) { + return false; + } + if (flushOp == StreamCodec::FlushOp::END && + uncompressedLength().value_or(0) != 0) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + return true; + } + if (state_ == State::RESET && !input.empty() && + uncompressedLength() == uint64_t(0)) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + // Handle input state transitions + switch (flushOp) { + case StreamCodec::FlushOp::NONE: + if (state_ == State::RESET) { + state_ = State::COMPRESS; + } + assertStateIs(State::COMPRESS); + break; + case StreamCodec::FlushOp::FLUSH: + if (state_ == State::RESET || state_ == State::COMPRESS) { + state_ = State::COMPRESS_FLUSH; + } + assertStateIs(State::COMPRESS_FLUSH); + break; + case StreamCodec::FlushOp::END: + if (state_ == State::RESET || state_ == State::COMPRESS) { + state_ = State::COMPRESS_END; + } + assertStateIs(State::COMPRESS_END); + break; + } + bool const done = doCompressStream(input, output, flushOp); + // Handle output state transitions + if (done) { + if (state_ == State::COMPRESS_FLUSH) { + state_ = State::COMPRESS; + } else if (state_ == State::COMPRESS_END) { + state_ = State::END; + } + // Check internal invariants + DCHECK(input.empty()); + DCHECK(flushOp != StreamCodec::FlushOp::NONE); + } + return done; +} + +bool StreamCodec::uncompressStream( + ByteRange& input, + MutableByteRange& output, + StreamCodec::FlushOp flushOp) { + if (state_ == State::RESET && input.empty()) { + if (uncompressedLength().value_or(0) == 0) { + return true; + } + return false; + } + // Handle input state transitions + if (state_ == State::RESET) { + state_ = State::UNCOMPRESS; + } + assertStateIs(State::UNCOMPRESS); + bool const done = doUncompressStream(input, output, flushOp); + // Handle output state transitions + if (done) { + state_ = State::END; + } + return done; +} + +static std::unique_ptr addOutputBuffer( + MutableByteRange& output, + uint64_t size) { + DCHECK(output.empty()); + auto buffer = IOBuf::create(size); + buffer->append(buffer->capacity()); + output = {buffer->writableData(), buffer->length()}; + return buffer; +} + +std::unique_ptr StreamCodec::doCompress(IOBuf const* data) { + uint64_t const uncompressedLength = data->computeChainDataLength(); + resetStream(uncompressedLength); + uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength); + + auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB + auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB + + MutableByteRange output; + auto buffer = addOutputBuffer( + output, + maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen + : kDefaultBufferLength); + + // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer + IOBuf const* current = data; + ByteRange input{current->data(), current->length()}; + StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE; + for (;;) { + while (input.empty() && current->next() != data) { + current = current->next(); + input = {current->data(), current->length()}; + } + if (current->next() == data) { + // This is the last input buffer so end the stream + flushOp = StreamCodec::FlushOp::END; + } + if (output.empty()) { + buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength)); + } + bool const done = compressStream(input, output, flushOp); + if (done) { + DCHECK(input.empty()); + DCHECK(flushOp == StreamCodec::FlushOp::END); + DCHECK_EQ(current->next(), data); + break; + } + } + buffer->prev()->trimEnd(output.size()); + return buffer; +} + +static uint64_t computeBufferLength( + uint64_t const compressedLength, + uint64_t const blockSize) { + uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB + uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength); + return std::min(goodBufferSize, kMaxBufferLength); +} + +std::unique_ptr StreamCodec::doUncompress( + IOBuf const* data, + Optional uncompressedLength) { + auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB + auto constexpr kBlockSize = uint64_t(128) << 10; + auto const defaultBufferLength = + computeBufferLength(data->computeChainDataLength(), kBlockSize); + + uncompressedLength = getUncompressedLength(data, uncompressedLength); + resetStream(uncompressedLength); + + MutableByteRange output; + auto buffer = addOutputBuffer( + output, + (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength + ? *uncompressedLength + : defaultBufferLength)); + + // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer + IOBuf const* current = data; + ByteRange input{current->data(), current->length()}; + StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE; + for (;;) { + while (input.empty() && current->next() != data) { + current = current->next(); + input = {current->data(), current->length()}; + } + if (current->next() == data) { + // Tell the uncompressor there is no more input (it may optimize) + flushOp = StreamCodec::FlushOp::END; + } + if (output.empty()) { + buffer->prependChain(addOutputBuffer(output, defaultBufferLength)); + } + bool const done = uncompressStream(input, output, flushOp); + if (done) { + break; + } + } + if (!input.empty()) { + throw std::runtime_error("Codec: Junk after end of data"); + } + + buffer->prev()->trimEnd(output.size()); + if (uncompressedLength && + *uncompressedLength != buffer->computeChainDataLength()) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + + return buffer; +} + namespace { /** @@ -187,6 +423,7 @@ class NoCompressionCodec final : public Codec { explicit NoCompressionCodec(int level, CodecType type); private: + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, @@ -212,6 +449,11 @@ NoCompressionCodec::NoCompressionCodec(int level, CodecType type) } } +uint64_t NoCompressionCodec::doMaxCompressedLength( + uint64_t uncompressedLength) const { + return uncompressedLength; +} + std::unique_ptr NoCompressionCodec::doCompress( const IOBuf* data) { return data->clone(); @@ -288,14 +530,6 @@ prefixToStringLE(T prefix, uint64_t n = sizeof(T)) { memcpy(&result[0], &prefix, n); return result; } - -static uint64_t computeBufferLength( - uint64_t const compressedLength, - uint64_t const blockSize) { - uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB - uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength); - return std::min(goodBufferSize, kMaxBufferLength); -} } // namespace #if FOLLY_HAVE_LIBLZ4 @@ -311,6 +545,7 @@ class LZ4Codec final : public Codec { private: bool doNeedsUncompressedLength() const override; uint64_t doMaxUncompressedLength() const override; + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; } @@ -360,6 +595,11 @@ uint64_t LZ4Codec::doMaxUncompressedLength() const { return LZ4_MAX_INPUT_SIZE; } +uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const { + return LZ4_compressBound(uncompressedLength) + + (encodeSize() ? kMaxVarintLength64 : 0); +} + std::unique_ptr LZ4Codec::doCompress(const IOBuf* data) { IOBuf clone; if (data->isChained()) { @@ -368,8 +608,7 @@ std::unique_ptr LZ4Codec::doCompress(const IOBuf* data) { data = &clone; } - uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0; - auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length())); + auto out = IOBuf::create(maxCompressedLength(data->length())); if (encodeSize()) { encodeVarintToIOBuf(data->length(), out.get()); } @@ -452,6 +691,8 @@ class LZ4FrameCodec final : public Codec { const override; private: + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; + std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, @@ -481,6 +722,14 @@ bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional) const { return dataStartsWithLE(data, kLZ4FrameMagicLE); } +uint64_t LZ4FrameCodec::doMaxCompressedLength( + uint64_t uncompressedLength) const { + LZ4F_preferences_t prefs{}; + prefs.compressionLevel = level_; + prefs.frameInfo.contentSize = uncompressedLength; + return LZ4F_compressFrameBound(uncompressedLength, &prefs); +} + static size_t lz4FrameThrowOnError(size_t code) { if (LZ4F_isError(code)) { throw std::runtime_error( @@ -535,7 +784,7 @@ std::unique_ptr LZ4FrameCodec::doCompress(const IOBuf* data) { prefs.compressionLevel = level_; prefs.frameInfo.contentSize = uncompressedLength; // Compress - auto buf = IOBuf::create(LZ4F_compressFrameBound(uncompressedLength, &prefs)); + auto buf = IOBuf::create(maxCompressedLength(uncompressedLength)); const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame( buf->writableTail(), buf->tailroom(), @@ -659,6 +908,7 @@ class SnappyCodec final : public Codec { private: uint64_t doMaxUncompressedLength() const override; + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, @@ -688,10 +938,13 @@ uint64_t SnappyCodec::doMaxUncompressedLength() const { return std::numeric_limits::max(); } +uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const { + return snappy::MaxCompressedLength(uncompressedLength); +} + std::unique_ptr SnappyCodec::doCompress(const IOBuf* data) { IOBufSnappySource source(data); - auto out = - IOBuf::create(snappy::MaxCompressedLength(source.Available())); + auto out = IOBuf::create(maxCompressedLength(source.Available())); snappy::UncheckedByteArraySink sink(reinterpret_cast( out->writableTail())); @@ -748,6 +1001,7 @@ class ZlibCodec final : public Codec { const override; private: + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, @@ -819,6 +1073,10 @@ bool ZlibCodec::canUncompress(const IOBuf* data, Optional) const { } } +uint64_t ZlibCodec::doMaxCompressedLength(uint64_t uncompressedLength) const { + return deflateBound(nullptr, uncompressedLength); +} + std::unique_ptr ZlibCodec::create(int level, CodecType type) { return std::make_unique(level, type); } @@ -1075,6 +1333,7 @@ class LZMA2Codec final : public Codec { private: bool doNeedsUncompressedLength() const override; uint64_t doMaxUncompressedLength() const override; + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; bool encodeSize() const { return type() == CodecType::LZMA2_VARINT_SIZE; } @@ -1141,6 +1400,11 @@ uint64_t LZMA2Codec::doMaxUncompressedLength() const { return uint64_t(1) << 63; } +uint64_t LZMA2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const { + return lzma_stream_buffer_bound(uncompressedLength) + + (encodeSize() ? kMaxVarintLength64 : 0); +} + std::unique_ptr LZMA2Codec::addOutputBuffer( lzma_stream* stream, size_t length) { @@ -1334,6 +1598,7 @@ class ZSTDCodec final : public Codec { private: bool doNeedsUncompressedLength() const override; + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; std::unique_ptr doCompress(const IOBuf* data) override; std::unique_ptr doUncompress( const IOBuf* data, @@ -1380,6 +1645,10 @@ bool ZSTDCodec::doNeedsUncompressedLength() const { return false; } +uint64_t ZSTDCodec::doMaxCompressedLength(uint64_t uncompressedLength) const { + return ZSTD_compressBound(uncompressedLength); +} + void zstdThrowIfError(size_t rc) { if (!ZSTD_isError(rc)) { return; @@ -1414,7 +1683,8 @@ std::unique_ptr ZSTDCodec::doCompress(const IOBuf* data) { zstdThrowIfError(rc); Cursor cursor(data); - auto result = IOBuf::createCombined(ZSTD_compressBound(cursor.totalLength())); + auto result = + IOBuf::createCombined(maxCompressedLength(cursor.totalLength())); ZSTD_outBuffer out; out.dst = result->writableTail(); @@ -1557,6 +1827,7 @@ class Bzip2Codec final : public Codec { const override; private: + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override; std::unique_ptr doCompress(IOBuf const* data) override; std::unique_ptr doUncompress( IOBuf const* data, @@ -1602,6 +1873,14 @@ bool Bzip2Codec::canUncompress(IOBuf const* data, Optional) const { return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes); } +uint64_t Bzip2Codec::doMaxCompressedLength(uint64_t uncompressedLength) const { + // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress + // To guarantee that the compressed data will fit in its buffer, allocate an + // output buffer of size 1% larger than the uncompressed data, plus six + // hundred extra bytes. + return uncompressedLength + uncompressedLength / 100 + 600; +} + static bz_stream createBzStream() { bz_stream stream; stream.bzalloc = nullptr; @@ -1626,14 +1905,6 @@ static int bzCheck(int const rc) { } } -static uint64_t bzCompressBound(uint64_t const uncompressedLength) { - // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress - // To guarantee that the compressed data will fit in its buffer, allocate an - // output buffer of size 1% larger than the uncompressed data, plus six - // hundred extra bytes. - return uncompressedLength + uncompressedLength / 100 + 600; -} - static std::unique_ptr addOutputBuffer( bz_stream* stream, uint64_t const bufferLength) { @@ -1657,14 +1928,14 @@ std::unique_ptr Bzip2Codec::doCompress(IOBuf const* data) { }; uint64_t const uncompressedLength = data->computeChainDataLength(); - uint64_t const maxCompressedLength = bzCompressBound(uncompressedLength); + uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength); uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20; auto out = addOutputBuffer( &stream, - maxCompressedLength <= kMaxSingleStepLength ? maxCompressedLength - : kDefaultBufferLength); + maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen + : kDefaultBufferLength); for (auto range : *data) { while (!range.empty()) { @@ -1766,6 +2037,10 @@ class AutomaticCodec final : public Codec { bool doNeedsUncompressedLength() const override; uint64_t doMaxUncompressedLength() const override; + uint64_t doMaxCompressedLength(uint64_t) const override { + throw std::runtime_error( + "AutomaticCodec error: maxCompressedLength() not supported."); + } std::unique_ptr doCompress(const IOBuf*) override { throw std::runtime_error("AutomaticCodec error: compress() not supported."); } @@ -1909,93 +2184,112 @@ std::unique_ptr AutomaticCodec::doUncompress( throw std::runtime_error("AutomaticCodec error: Unknown compressed data"); } -} // namespace +using CodecFactory = std::unique_ptr (*)(int, CodecType); +using StreamCodecFactory = std::unique_ptr (*)(int, CodecType); +struct Factory { + CodecFactory codec; + StreamCodecFactory stream; +}; -typedef std::unique_ptr (*CodecFactory)(int, CodecType); -static constexpr CodecFactory +constexpr Factory codecFactories[static_cast(CodecType::NUM_CODEC_TYPES)] = { - nullptr, // USER_DEFINED - NoCompressionCodec::create, + {}, // USER_DEFINED + {NoCompressionCodec::create, nullptr}, #if FOLLY_HAVE_LIBLZ4 - LZ4Codec::create, + {LZ4Codec::create, nullptr}, #else - nullptr, + {}, #endif #if FOLLY_HAVE_LIBSNAPPY - SnappyCodec::create, + {SnappyCodec::create, nullptr}, #else - nullptr, + {}, #endif #if FOLLY_HAVE_LIBZ - ZlibCodec::create, + {ZlibCodec::create, nullptr}, #else - nullptr, + {}, #endif #if FOLLY_HAVE_LIBLZ4 - LZ4Codec::create, + {LZ4Codec::create, nullptr}, #else - nullptr, + {}, #endif #if FOLLY_HAVE_LIBLZMA - LZMA2Codec::create, - LZMA2Codec::create, + {LZMA2Codec::create, nullptr}, + {LZMA2Codec::create, nullptr}, #else - nullptr, - nullptr, + {}, + {}, #endif #if FOLLY_HAVE_LIBZSTD - ZSTDCodec::create, + {ZSTDCodec::create, nullptr}, #else - nullptr, + {}, #endif #if FOLLY_HAVE_LIBZ - ZlibCodec::create, + {ZlibCodec::create, nullptr}, #else - nullptr, + {}, #endif #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301) - LZ4FrameCodec::create, + {LZ4FrameCodec::create, nullptr}, #else - nullptr, + {}, #endif #if FOLLY_HAVE_LIBBZ2 - Bzip2Codec::create, + {Bzip2Codec::create, nullptr}, #else - nullptr + {}, #endif }; -bool hasCodec(CodecType type) { - size_t idx = static_cast(type); +Factory const& getFactory(CodecType type) { + size_t const idx = static_cast(type); if (idx >= static_cast(CodecType::NUM_CODEC_TYPES)) { throw std::invalid_argument( to("Compression type ", idx, " invalid")); } - return codecFactories[idx] != nullptr; + return codecFactories[idx]; +} +} // namespace + +bool hasCodec(CodecType type) { + return getFactory(type).codec != nullptr; } std::unique_ptr getCodec(CodecType type, int level) { - size_t idx = static_cast(type); - if (idx >= static_cast(CodecType::NUM_CODEC_TYPES)) { + auto const factory = getFactory(type).codec; + if (!factory) { throw std::invalid_argument( - to("Compression type ", idx, " invalid")); + to("Compression type ", type, " not supported")); } - auto factory = codecFactories[idx]; + auto codec = (*factory)(level, type); + DCHECK(codec->type() == type); + return codec; +} + +bool hasStreamCodec(CodecType type) { + return getFactory(type).stream != nullptr; +} + +std::unique_ptr getStreamCodec(CodecType type, int level) { + auto const factory = getFactory(type).stream; if (!factory) { - throw std::invalid_argument(to( - "Compression type ", idx, " not supported")); + throw std::invalid_argument( + to("Compression type ", type, " not supported")); } auto codec = (*factory)(level, type); - DCHECK_EQ(static_cast(codec->type()), idx); + DCHECK(codec->type() == type); return codec; } diff --git a/folly/io/Compression.h b/folly/io/Compression.h index a5ac58c2..e9bcd2a5 100644 --- a/folly/io/Compression.h +++ b/folly/io/Compression.h @@ -107,11 +107,12 @@ class Codec { public: virtual ~Codec() { } + static constexpr uint64_t UNLIMITED_UNCOMPRESSED_LENGTH = uint64_t(-1); /** * Return the maximum length of data that may be compressed with this codec. * NO_COMPRESSION and ZLIB support arbitrary lengths; * LZ4 supports up to 1.9GiB; SNAPPY supports up to 4GiB. - * May return UNLIMITED_UNCOMPRESSED_LENGTH (uint64_t(-1)) if unlimited. + * May return UNLIMITED_UNCOMPRESSED_LENGTH if unlimited. */ uint64_t maxUncompressedLength() const; @@ -154,8 +155,6 @@ class Codec { * Regardless of the behavior of the underlying compressor, uncompressing * an empty IOBuf chain will return an empty IOBuf chain. */ - static constexpr uint64_t UNLIMITED_UNCOMPRESSED_LENGTH = uint64_t(-1); - std::unique_ptr uncompress( const IOBuf* data, folly::Optional uncompressedLength = folly::none); @@ -169,6 +168,24 @@ class Codec { StringPiece data, folly::Optional uncompressedLength = folly::none); + /** + * Returns a bound on the maximum compressed length when compressing data with + * the given uncompressed length. + */ + uint64_t maxCompressedLength(uint64_t uncompressedLength) const; + + /** + * Extracts the uncompressed length from the compressed data if possible. + * If the codec doesn't store the uncompressed length, or the data is + * corrupted it returns the given uncompressedLength. + * If the uncompressed length is stored in the compressed data and + * uncompressedLength is not none and they do not match a std::runtime_error + * is thrown. + */ + folly::Optional getUncompressedLength( + const folly::IOBuf* data, + folly::Optional uncompressedLength = folly::none) const; + protected: explicit Codec(CodecType type); @@ -209,9 +226,171 @@ class Codec { StringPiece data, folly::Optional uncompressedLength); + virtual uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const = 0; + // default: returns the passed uncompressedLength. + virtual folly::Optional doGetUncompressedLength( + const folly::IOBuf* data, + folly::Optional uncompressedLength) const; + CodecType type_; }; +class StreamCodec : public Codec { + public: + virtual ~StreamCodec() {} + + /** + * Does the codec need the data length before compression streaming? + */ + bool needsDataLength() const; + + /***************************************************************************** + * Streaming API + ***************************************************************************** + * A low-level stateful streaming API. + * Streaming operations can be started in two ways: + * 1. From a clean Codec on which no non-const methods have been called. + * 2. A call to resetStream(), which will reset any codec to a clean state. + * After a streaming operation has begun, either compressStream() or + * uncompressStream() must be called until the streaming operation ends. + * compressStream() ends when it returns true with flushOp END. + * uncompressStream() ends when it returns true. At this point the codec + * may be reused by calling resetStream(). + * + * compress() and uncompress() can be called at any time, but they interrupt + * any ongoing streaming operations (state is lost and resetStream() must be + * called before another streaming operation). + */ + + /** + * Reset the state of the codec, and set the uncompressed length for the next + * streaming operation. If uncompressedLength is not none it must be exactly + * the uncompressed length. compressStream() must be passed exactly + * uncompressedLength input bytes before the stream is ended. + * uncompressStream() must be passed a compressed frame that uncompresses to + * uncompressedLength. + */ + void resetStream(folly::Optional uncompressedLength = folly::none); + + enum class FlushOp { NONE, FLUSH, END }; + + /** + * Compresses some data from the input buffer and writes the compressed data + * into the output buffer. It may read input without producing any output, + * except when forced to flush. + * + * The input buffer is advanced to point to the range of data that hasn't yet + * been read. Compression will resume at this point for the next call to + * compressStream(). The output buffer is advanced one byte past the last byte + * written. + * + * The default flushOp is NONE, which allows compressStream() complete + * discretion in how much data to gather before writing any output. + * + * If flushOp is END, all pending and input data is flushed to the output + * buffer, and the frame is ended. compressStream() must be called with the + * same input and flushOp END until it returns true. At this point the caller + * must call resetStream() to use the codec again. + * + * If flushOp is FLUSH, all pending and input data is flushed to the output + * buffer, but the frame is not ended. compressStream() must be called with + * the same input and flushOp END until it returns true. At this point the + * caller can continue to compressStream() with any input data and flushOp. + * The uncompressor, if passed all the produced output data, will be able to + * uncompress all the input data passed to compressStream() so far. Excessive + * use of flushOp FLUSH will deteriorate compression ratio. This is useful for + * stateful streaming across a network. Most users don't need to use this + * flushOp. + * + * A std::logic_error is thrown on incorrect usage of the API. + * A std::runtime_error is thrown upon error conditions. + */ + bool compressStream( + folly::ByteRange& input, + folly::MutableByteRange& output, + FlushOp flushOp = StreamCodec::FlushOp::NONE); + + /** + * Uncompresses some data from the input buffer and writes the uncompressed + * data into the output buffer. It may read input without producing any + * output. + * + * The input buffer is advanced to point to the range of data that hasn't yet + * been read. Uncompression will resume at this point for the next call to + * uncompressStream(). The output buffer is advanced one byte past the last + * byte written. + * + * The default flushOp is NONE, which allows uncompressStream() complete + * discretion in how much output data to flush. The uncompressor may not make + * maximum forward progress, but will make some forward progress when + * possible. + * + * If flushOp is END, the caller guarantees that no more input will be + * presented to uncompressStream(). uncompressStream() must be called with the + * same input and flushOp END until it returns true. This is not mandatory, + * but if the input is all available in one buffer, and there is enough output + * space to write the entire frame, codecs can uncompress faster. + * + * If flushOp is FLUSH, uncompressStream() is guaranteed to make the maximum + * amount of forward progress possible. When using this flushOp and + * uncompressStream() returns with `!output.empty()` the caller knows that all + * pending output has been flushed. This is useful for stateful streaming + * across a network, and it should be used in conjunction with + * compressStream() with flushOp FLUSH. Most users don't need to use this + * flushOp. + * + * Returns true at the end of a frame. At this point resetStream() must be + * called to reuse the codec. + */ + bool uncompressStream( + folly::ByteRange& input, + folly::MutableByteRange& output, + FlushOp flushOp = StreamCodec::FlushOp::NONE); + + protected: + explicit StreamCodec(CodecType type) : Codec(type) {} + + // Returns the uncompressed length last passed to resetStream() or none if it + // hasn't been called yet. + folly::Optional uncompressedLength() const { + return uncompressedLength_; + } + + private: + // default: Implemented using the streaming API. + std::unique_ptr doCompress(const folly::IOBuf* data) override; + virtual std::unique_ptr doUncompress( + const folly::IOBuf* data, + folly::Optional uncompressedLength) override; + + // default: Returns false + virtual bool doNeedsDataLength() const; + virtual void doResetStream() = 0; + virtual bool doCompressStream( + folly::ByteRange& input, + folly::MutableByteRange& output, + FlushOp flushOp) = 0; + virtual bool doUncompressStream( + folly::ByteRange& input, + folly::MutableByteRange& output, + FlushOp flushOp) = 0; + + enum class State { + RESET, + COMPRESS, + COMPRESS_FLUSH, + COMPRESS_END, + UNCOMPRESS, + END, + }; + void assertStateIs(State expected) const; + + CodecType type_; + State state_{State::RESET}; + ByteRange previousInput_{}; + folly::Optional uncompressedLength_{}; +}; + constexpr int COMPRESSION_LEVEL_FASTEST = -1; constexpr int COMPRESSION_LEVEL_DEFAULT = -2; constexpr int COMPRESSION_LEVEL_BEST = -3; @@ -232,8 +411,29 @@ constexpr int COMPRESSION_LEVEL_BEST = -3; * decompress all data compressed with the a codec of the same type, regardless * of compression level. */ -std::unique_ptr getCodec(CodecType type, - int level = COMPRESSION_LEVEL_DEFAULT); +std::unique_ptr getCodec( + CodecType type, + int level = COMPRESSION_LEVEL_DEFAULT); + +/** + * Return a codec for the given type. Throws on error. The level + * is a non-negative codec-dependent integer indicating the level of + * compression desired, or one of the following constants: + * + * COMPRESSION_LEVEL_FASTEST is fastest (uses least CPU / memory, + * worst compression) + * COMPRESSION_LEVEL_DEFAULT is the default (likely a tradeoff between + * FASTEST and BEST) + * COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory, + * best compression) + * + * When decompressing, the compression level is ignored. All codecs will + * decompress all data compressed with the a codec of the same type, regardless + * of compression level. + */ +std::unique_ptr getStreamCodec( + CodecType type, + int level = COMPRESSION_LEVEL_DEFAULT); /** * Returns a codec that can uncompress any of the given codec types as well as @@ -262,4 +462,8 @@ std::unique_ptr getAutoUncompressionCodec( */ bool hasCodec(CodecType type); -}} // namespaces +/** + * Check if a specified codec is supported and supports streaming. + */ +bool hasStreamCodec(CodecType type); +}} // namespaces diff --git a/folly/io/test/CompressionTest.cpp b/folly/io/test/CompressionTest.cpp index e8b9312e..d1d3a177 100644 --- a/folly/io/test/CompressionTest.cpp +++ b/folly/io/test/CompressionTest.cpp @@ -16,10 +16,12 @@ #include +#include #include #include #include #include +#include #include #include @@ -147,6 +149,19 @@ static std::vector availableCodecs() { return codecs; } +static std::vector availableStreamCodecs() { + std::vector codecs; + + for (size_t i = 0; i < static_cast(CodecType::NUM_CODEC_TYPES); ++i) { + auto type = static_cast(i); + if (hasStreamCodec(type)) { + codecs.push_back(type); + } + } + + return codecs; +} + TEST(CompressionTestNeedsUncompressedLength, Simple) { static const struct { CodecType type; bool needsUncompressedLength; } expectations[] = { @@ -399,6 +414,422 @@ INSTANTIATE_TEST_CASE_P( CodecType::BZIP2, }))); +class StreamingUnitTest : public testing::TestWithParam { + protected: + void SetUp() override { + codec_ = getStreamCodec(GetParam()); + } + + std::unique_ptr codec_; +}; + +TEST_P(StreamingUnitTest, maxCompressedLength) { + EXPECT_EQ(0, codec_->maxCompressedLength(0)); + for (uint64_t const length : {1, 10, 100, 1000, 10000, 100000, 1000000}) { + EXPECT_GE(codec_->maxCompressedLength(length), length); + } +} + +TEST_P(StreamingUnitTest, getUncompressedLength) { + auto const empty = IOBuf::create(0); + EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get())); + EXPECT_EQ(uint64_t(0), codec_->getUncompressedLength(empty.get(), 0)); + + auto const data = IOBuf::wrapBuffer(randomDataHolder.data(100)); + auto const compressed = codec_->compress(data.get()); + + EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 0)); + if (auto const length = codec_->getUncompressedLength(data.get())) { + EXPECT_EQ(100, *length); + } + EXPECT_EQ(uint64_t(100), codec_->getUncompressedLength(data.get(), 100)); + // If the uncompressed length is stored in the frame, then make sure it throws + // when it is given the wrong length. + if (codec_->getUncompressedLength(data.get()) == uint64_t(100)) { + EXPECT_ANY_THROW(codec_->getUncompressedLength(data.get(), 200)); + } +} + +TEST_P(StreamingUnitTest, emptyData) { + ByteRange input{}; + auto buffer = IOBuf::create(1); + buffer->append(buffer->capacity()); + MutableByteRange output{}; + + // Test compressing empty data in one pass + EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END)); + codec_->resetStream(0); + EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END)); + codec_->resetStream(); + output = {buffer->writableData(), buffer->length()}; + EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END)); + EXPECT_EQ(buffer->length(), output.size()); + + // Test compressing empty data with multiple calls to compressStream() + codec_->resetStream(); + output = {}; + EXPECT_FALSE(codec_->compressStream(input, output)); + EXPECT_TRUE( + codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH)); + EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END)); + codec_->resetStream(); + output = {buffer->writableData(), buffer->length()}; + EXPECT_FALSE(codec_->compressStream(input, output)); + EXPECT_TRUE( + codec_->compressStream(input, output, StreamCodec::FlushOp::FLUSH)); + EXPECT_TRUE(codec_->compressStream(input, output, StreamCodec::FlushOp::END)); + EXPECT_EQ(buffer->length(), output.size()); + + // Test uncompressing empty data + output = {}; + codec_->resetStream(); + EXPECT_TRUE(codec_->uncompressStream(input, output)); + codec_->resetStream(); + EXPECT_TRUE( + codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH)); + codec_->resetStream(); + EXPECT_TRUE( + codec_->uncompressStream(input, output, StreamCodec::FlushOp::END)); + codec_->resetStream(0); + EXPECT_TRUE(codec_->uncompressStream(input, output)); + codec_->resetStream(0); + EXPECT_TRUE( + codec_->uncompressStream(input, output, StreamCodec::FlushOp::FLUSH)); + codec_->resetStream(0); + EXPECT_TRUE( + codec_->uncompressStream(input, output, StreamCodec::FlushOp::END)); +} + +TEST_P(StreamingUnitTest, noForwardProgressOkay) { + auto inBuffer = IOBuf::create(2); + inBuffer->writableData()[0] = 'a'; + inBuffer->writableData()[0] = 'a'; + inBuffer->append(2); + auto input = inBuffer->coalesce(); + auto compressed = codec_->compress(inBuffer.get()); + + auto outBuffer = IOBuf::create(codec_->maxCompressedLength(2)); + MutableByteRange output{outBuffer->writableTail(), outBuffer->tailroom()}; + + ByteRange emptyInput; + MutableByteRange emptyOutput; + + // Compress some data to avoid empty data special casing + codec_->resetStream(); + while (!input.empty()) { + codec_->compressStream(input, output); + } + // empty input and output is okay for flush NONE and FLUSH. + codec_->compressStream(emptyInput, emptyOutput); + codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH); + + codec_->resetStream(); + input = inBuffer->coalesce(); + output = {outBuffer->writableTail(), outBuffer->tailroom()}; + while (!input.empty()) { + codec_->compressStream(input, output); + } + // empty input and output is okay for flush END. + codec_->compressStream(emptyInput, emptyOutput, StreamCodec::FlushOp::END); + + codec_->resetStream(); + input = compressed->coalesce(); + input.uncheckedSubtract(1); // Remove last byte so the operation is incomplete + output = {inBuffer->writableData(), inBuffer->length()}; + // Uncompress some data to avoid empty data special casing + while (!input.empty()) { + EXPECT_FALSE(codec_->uncompressStream(input, output)); + } + // empty input and output is okay for all flush values. + EXPECT_FALSE(codec_->uncompressStream(emptyInput, emptyOutput)); + EXPECT_FALSE(codec_->uncompressStream( + emptyInput, emptyOutput, StreamCodec::FlushOp::FLUSH)); + EXPECT_FALSE(codec_->uncompressStream( + emptyInput, emptyOutput, StreamCodec::FlushOp::END)); +} + +TEST_P(StreamingUnitTest, stateTransitions) { + auto inBuffer = IOBuf::create(1); + inBuffer->writableData()[0] = 'a'; + inBuffer->append(1); + auto compressed = codec_->compress(inBuffer.get()); + ByteRange const in = compressed->coalesce(); + auto outBuffer = IOBuf::create(codec_->maxCompressedLength(in.size())); + MutableByteRange const out{outBuffer->writableTail(), outBuffer->tailroom()}; + + auto compress = [&]( + StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE, + bool empty = false) { + auto input = in; + auto output = empty ? MutableByteRange{} : out; + return codec_->compressStream(input, output, flushOp); + }; + auto uncompress = [&]( + StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE, + bool empty = false) { + auto input = in; + auto output = empty ? MutableByteRange{} : out; + return codec_->uncompressStream(input, output, flushOp); + }; + + // compression flow + codec_->resetStream(); + EXPECT_FALSE(compress()); + EXPECT_FALSE(compress()); + EXPECT_TRUE(compress(StreamCodec::FlushOp::FLUSH)); + EXPECT_FALSE(compress()); + EXPECT_TRUE(compress(StreamCodec::FlushOp::END)); + // uncompression flow + codec_->resetStream(); + EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true)); + codec_->resetStream(); + EXPECT_FALSE(uncompress(StreamCodec::FlushOp::FLUSH, true)); + codec_->resetStream(); + EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true)); + codec_->resetStream(); + EXPECT_FALSE(uncompress(StreamCodec::FlushOp::NONE, true)); + codec_->resetStream(); + EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH)); + // compress -> uncompress + codec_->resetStream(); + EXPECT_FALSE(compress()); + EXPECT_THROW(uncompress(), std::logic_error); + // uncompress -> compress + codec_->resetStream(); + EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH)); + EXPECT_THROW(compress(), std::logic_error); + // end -> compress + codec_->resetStream(); + EXPECT_FALSE(compress()); + EXPECT_TRUE(compress(StreamCodec::FlushOp::END)); + EXPECT_THROW(compress(), std::logic_error); + // end -> uncompress + codec_->resetStream(); + EXPECT_TRUE(uncompress(StreamCodec::FlushOp::FLUSH)); + EXPECT_THROW(uncompress(), std::logic_error); + // flush -> compress + codec_->resetStream(); + EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true)); + EXPECT_THROW(compress(), std::logic_error); + // flush -> end + codec_->resetStream(); + EXPECT_FALSE(compress(StreamCodec::FlushOp::FLUSH, true)); + EXPECT_THROW(compress(StreamCodec::FlushOp::END), std::logic_error); + // undefined -> compress + codec_->compress(inBuffer.get()); + EXPECT_THROW(compress(), std::logic_error); + codec_->uncompress(compressed.get()); + EXPECT_THROW(compress(), std::logic_error); + // undefined -> undefined + codec_->uncompress(compressed.get()); + codec_->compress(inBuffer.get()); +} + +INSTANTIATE_TEST_CASE_P( + StreamingUnitTest, + StreamingUnitTest, + testing::ValuesIn(availableStreamCodecs())); + +class StreamingCompressionTest + : public testing::TestWithParam> { + protected: + void SetUp() override { + auto const tup = GetParam(); + uncompressedLength_ = uint64_t(1) << std::get<0>(tup); + chunkSize_ = size_t(1) << std::get<1>(tup); + codec_ = getStreamCodec(std::get<2>(tup)); + } + + void runResetStreamTest(DataHolder const& dh); + void runCompressStreamTest(DataHolder const& dh); + void runUncompressStreamTest(DataHolder const& dh); + void runFlushTest(DataHolder const& dh); + + private: + std::vector split(ByteRange data) const; + + uint64_t uncompressedLength_; + size_t chunkSize_; + std::unique_ptr codec_; +}; + +std::vector StreamingCompressionTest::split(ByteRange data) const { + size_t const pieces = std::max(1, data.size() / chunkSize_); + std::vector result; + result.reserve(pieces + 1); + while (!data.empty()) { + size_t const pieceSize = std::min(data.size(), chunkSize_); + result.push_back(data.subpiece(0, pieceSize)); + data.uncheckedAdvance(pieceSize); + } + return result; +} + +static std::unique_ptr compressSome( + StreamCodec* codec, + ByteRange data, + uint64_t bufferSize, + StreamCodec::FlushOp flush) { + bool result; + IOBufQueue queue; + do { + auto buffer = IOBuf::create(bufferSize); + buffer->append(buffer->capacity()); + MutableByteRange output{buffer->writableData(), buffer->length()}; + + result = codec->compressStream(data, output, flush); + buffer->trimEnd(output.size()); + queue.append(std::move(buffer)); + + } while (!(flush == StreamCodec::FlushOp::NONE && data.empty()) && !result); + EXPECT_TRUE(data.empty()); + return queue.move(); +} + +static std::pair> uncompressSome( + StreamCodec* codec, + ByteRange& data, + uint64_t bufferSize, + StreamCodec::FlushOp flush) { + bool result; + IOBufQueue queue; + do { + auto buffer = IOBuf::create(bufferSize); + buffer->append(buffer->capacity()); + MutableByteRange output{buffer->writableData(), buffer->length()}; + + result = codec->uncompressStream(data, output, flush); + buffer->trimEnd(output.size()); + queue.append(std::move(buffer)); + + } while (queue.tailroom() == 0 && !result); + return std::make_pair(result, queue.move()); +} + +void StreamingCompressionTest::runResetStreamTest(DataHolder const& dh) { + auto const input = dh.data(uncompressedLength_); + // Compress some but leave state unclean + codec_->resetStream(uncompressedLength_); + compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE); + // Reset stream and compress all + codec_->resetStream(); + auto compressed = + compressSome(codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END); + auto const uncompressed = codec_->uncompress(compressed.get(), input.size()); + EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); +} + +TEST_P(StreamingCompressionTest, resetStream) { + runResetStreamTest(constantDataHolder); + runResetStreamTest(randomDataHolder); +} + +void StreamingCompressionTest::runCompressStreamTest( + const folly::io::test::DataHolder& dh) { + auto const inputs = split(dh.data(uncompressedLength_)); + + IOBufQueue queue; + codec_->resetStream(uncompressedLength_); + // Compress many inputs in a row + for (auto const input : inputs) { + queue.append(compressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::NONE)); + } + // Finish the operation with empty input. + ByteRange empty; + queue.append( + compressSome(codec_.get(), empty, chunkSize_, StreamCodec::FlushOp::END)); + + auto const uncompressed = codec_->uncompress(queue.front()); + EXPECT_EQ(dh.hash(uncompressedLength_), hashIOBuf(uncompressed.get())); +} + +TEST_P(StreamingCompressionTest, compressStream) { + runCompressStreamTest(constantDataHolder); + runCompressStreamTest(randomDataHolder); +} + +void StreamingCompressionTest::runUncompressStreamTest( + const folly::io::test::DataHolder& dh) { + auto const data = IOBuf::wrapBuffer(dh.data(uncompressedLength_)); + // Concatenate 3 compressed frames in a row + auto compressed = codec_->compress(data.get()); + compressed->prependChain(codec_->compress(data.get())); + compressed->prependChain(codec_->compress(data.get())); + // Pass all 3 compressed frames in one input buffer + auto input = compressed->coalesce(); + // Uncompress the first frame + codec_->resetStream(data->computeChainDataLength()); + { + auto const result = uncompressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH); + ASSERT_TRUE(result.first); + ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); + } + // Uncompress the second frame + codec_->resetStream(); + { + auto const result = uncompressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::END); + ASSERT_TRUE(result.first); + ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); + } + // Uncompress the third frame + codec_->resetStream(); + { + auto const result = uncompressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH); + ASSERT_TRUE(result.first); + ASSERT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); + } + EXPECT_TRUE(input.empty()); +} + +TEST_P(StreamingCompressionTest, uncompressStream) { + runUncompressStreamTest(constantDataHolder); + runUncompressStreamTest(randomDataHolder); +} + +void StreamingCompressionTest::runFlushTest(DataHolder const& dh) { + auto const inputs = split(dh.data(uncompressedLength_)); + auto uncodec = getStreamCodec(codec_->type()); + + codec_->resetStream(); + for (auto input : inputs) { + // Compress some data and flush the stream + auto compressed = compressSome( + codec_.get(), input, chunkSize_, StreamCodec::FlushOp::FLUSH); + auto compressedRange = compressed->coalesce(); + // Uncompress the compressed data + auto result = uncompressSome( + uncodec.get(), + compressedRange, + chunkSize_, + StreamCodec::FlushOp::FLUSH); + // All compressed data should have been consumed + EXPECT_TRUE(compressedRange.empty()); + // The frame isn't complete + EXPECT_FALSE(result.first); + // The uncompressed data should be exactly the input data + EXPECT_EQ(input.size(), result.second->computeChainDataLength()); + auto const data = IOBuf::wrapBuffer(input); + EXPECT_EQ(hashIOBuf(data.get()), hashIOBuf(result.second.get())); + } +} + +TEST_P(StreamingCompressionTest, testFlush) { + runFlushTest(constantDataHolder); + runFlushTest(randomDataHolder); +} + +INSTANTIATE_TEST_CASE_P( + StreamingCompressionTest, + StreamingCompressionTest, + testing::Combine( + testing::Values(0, 1, 12, 22, 27), + testing::Values(12, 17, 20), + testing::ValuesIn(availableStreamCodecs()))); + class AutomaticCodecTest : public testing::TestWithParam { protected: void SetUp() override { @@ -499,6 +930,10 @@ class CustomCodec : public Codec { return {prefix_}; } + uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override { + return codec_->maxCompressedLength(uncompressedLength) + prefix_.size(); + } + bool canUncompress(const IOBuf* data, Optional) const override { auto clone = data->cloneCoalescedAsValue(); if (clone.length() < prefix_.size()) {