From: Tudor Bosman Date: Wed, 7 Aug 2013 00:01:37 +0000 (-0700) Subject: IOBuf compression X-Git-Tag: v0.22.0~907 X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=commitdiff_plain;h=797870eccb83baf33e5d8a3e47385a031080bef6 IOBuf compression Summary: davejwatson: you asked Test Plan: test added Reviewed By: davejwatson@fb.com FB internal diff: D917336 --- diff --git a/folly/io/Compression.cpp b/folly/io/Compression.cpp new file mode 100644 index 00000000..f46df401 --- /dev/null +++ b/folly/io/Compression.cpp @@ -0,0 +1,631 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/io/Compression.h" + +#include +#include +#include +#include +#include +#include + +#include "folly/Conv.h" +#include "folly/Memory.h" +#include "folly/Portability.h" +#include "folly/ScopeGuard.h" +#include "folly/io/Cursor.h" + +namespace folly { namespace io { + +// Ensure consistent behavior in the nullptr case +std::unique_ptr Codec::compress(const IOBuf* data) { + return !data->empty() ? doCompress(data) : IOBuf::create(0); +} + +std::unique_ptr Codec::uncompress(const IOBuf* data, + uint64_t uncompressedLength) { + if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) { + if (needsUncompressedLength()) { + throw std::invalid_argument("Codec: uncompressed length required"); + } + } else if (uncompressedLength > maxUncompressedLength()) { + throw std::runtime_error("Codec: uncompressed length too large"); + } + + if (data->empty()) { + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + uncompressedLength != 0) { + throw std::runtime_error("Codec: invalid uncompressed length"); + } + return IOBuf::create(0); + } + + return doUncompress(data, uncompressedLength); +} + +bool Codec::needsUncompressedLength() const { + return doNeedsUncompressedLength(); +} + +uint64_t Codec::maxUncompressedLength() const { + return doMaxUncompressedLength(); +} + +CodecType Codec::type() const { + return doType(); +} + +bool Codec::doNeedsUncompressedLength() const { + return false; +} + +uint64_t Codec::doMaxUncompressedLength() const { + return std::numeric_limits::max() - 1; +} + +namespace { + +/** + * No compression + */ +class NoCompressionCodec FOLLY_FINAL : public Codec { + public: + static std::unique_ptr create(int level); + explicit NoCompressionCodec(int level); + + private: + CodecType doType() const FOLLY_OVERRIDE; + std::unique_ptr doCompress(const IOBuf* data) FOLLY_OVERRIDE; + std::unique_ptr doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) FOLLY_OVERRIDE; +}; + +std::unique_ptr NoCompressionCodec::create(int level) { + return make_unique(level); +} + +NoCompressionCodec::NoCompressionCodec(int level) { + switch (level) { + case COMPRESSION_LEVEL_DEFAULT: + case COMPRESSION_LEVEL_FASTEST: + case COMPRESSION_LEVEL_BEST: + level = 0; + } + if (level != 0) { + throw std::invalid_argument(to( + "NoCompressionCodec: invalid level ", level)); + } +} + +CodecType NoCompressionCodec::doType() const { + return CodecType::NO_COMPRESSION; +} + +std::unique_ptr NoCompressionCodec::doCompress( + const IOBuf* data) { + return data->clone(); +} + +std::unique_ptr NoCompressionCodec::doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) { + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + data->computeChainDataLength() != uncompressedLength) { + throw std::runtime_error(to( + "NoCompressionCodec: invalid uncompressed length")); + } + return data->clone(); +} + +/** + * LZ4 compression + */ +class LZ4Codec FOLLY_FINAL : public Codec { + public: + static std::unique_ptr create(int level); + explicit LZ4Codec(int level); + + private: + bool doNeedsUncompressedLength() const FOLLY_OVERRIDE; + uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE; + CodecType doType() const FOLLY_OVERRIDE; + std::unique_ptr doCompress(const IOBuf* data) FOLLY_OVERRIDE; + std::unique_ptr doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) FOLLY_OVERRIDE; + + bool highCompression_; +}; + +std::unique_ptr LZ4Codec::create(int level) { + return make_unique(level); +} + +LZ4Codec::LZ4Codec(int level) { + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + case COMPRESSION_LEVEL_DEFAULT: + level = 1; + break; + case COMPRESSION_LEVEL_BEST: + level = 2; + break; + } + if (level < 1 || level > 2) { + throw std::invalid_argument(to( + "LZ4Codec: invalid level: ", level)); + } + highCompression_ = (level > 1); +} + +bool LZ4Codec::doNeedsUncompressedLength() const { + return true; +} + +uint64_t LZ4Codec::doMaxUncompressedLength() const { + // From lz4.h: "Max supported value is ~1.9GB"; I wish we had something + // more accurate. + return 1.8 * (uint64_t(1) << 30); +} + +CodecType LZ4Codec::doType() const { + return CodecType::LZ4; +} + +std::unique_ptr LZ4Codec::doCompress(const IOBuf* data) { + std::unique_ptr clone; + if (data->isChained()) { + // LZ4 doesn't support streaming, so we have to coalesce + clone = data->clone(); + clone->coalesce(); + data = clone.get(); + } + + auto out = IOBuf::create(LZ4_compressBound(data->length())); + int n; + if (highCompression_) { + n = LZ4_compress(reinterpret_cast(data->data()), + reinterpret_cast(out->writableTail()), + data->length()); + } else { + n = LZ4_compressHC(reinterpret_cast(data->data()), + reinterpret_cast(out->writableTail()), + data->length()); + } + + CHECK_GE(n, 0); + CHECK_LE(n, out->capacity()); + + out->append(n); + return out; +} + +std::unique_ptr LZ4Codec::doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) { + std::unique_ptr clone; + if (data->isChained()) { + // LZ4 doesn't support streaming, so we have to coalesce + clone = data->clone(); + clone->coalesce(); + data = clone.get(); + } + + auto out = IOBuf::create(uncompressedLength); + int n = LZ4_uncompress(reinterpret_cast(data->data()), + reinterpret_cast(out->writableTail()), + uncompressedLength); + if (n != data->length()) { + throw std::runtime_error(to( + "LZ4 decompression returned invalid value ", n)); + } + out->append(uncompressedLength); + return out; +} + +/** + * Snappy compression + */ + +/** + * Implementation of snappy::Source that reads from a IOBuf chain. + */ +class IOBufSnappySource FOLLY_FINAL : public snappy::Source { + public: + explicit IOBufSnappySource(const IOBuf* data); + size_t Available() const FOLLY_OVERRIDE; + const char* Peek(size_t* len) FOLLY_OVERRIDE; + void Skip(size_t n) FOLLY_OVERRIDE; + private: + size_t available_; + io::Cursor cursor_; +}; + +IOBufSnappySource::IOBufSnappySource(const IOBuf* data) + : available_(data->computeChainDataLength()), + cursor_(data) { +} + +size_t IOBufSnappySource::Available() const { + return available_; +} + +const char* IOBufSnappySource::Peek(size_t* len) { + auto p = cursor_.peek(); + *len = p.second; + return reinterpret_cast(p.first); +} + +void IOBufSnappySource::Skip(size_t n) { + CHECK_LE(n, available_); + cursor_.skip(n); + available_ -= n; +} + +class SnappyCodec FOLLY_FINAL : public Codec { + public: + static std::unique_ptr create(int level); + explicit SnappyCodec(int level); + + private: + uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE; + CodecType doType() const FOLLY_OVERRIDE; + std::unique_ptr doCompress(const IOBuf* data) FOLLY_OVERRIDE; + std::unique_ptr doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) FOLLY_OVERRIDE; +}; + +std::unique_ptr SnappyCodec::create(int level) { + return make_unique(level); +} + +SnappyCodec::SnappyCodec(int level) { + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + case COMPRESSION_LEVEL_DEFAULT: + case COMPRESSION_LEVEL_BEST: + level = 1; + } + if (level != 1) { + throw std::invalid_argument(to( + "SnappyCodec: invalid level: ", level)); + } +} + +uint64_t SnappyCodec::doMaxUncompressedLength() const { + // snappy.h uses uint32_t for lengths, so there's that. + return std::numeric_limits::max(); +} + +CodecType SnappyCodec::doType() const { + return CodecType::SNAPPY; +} + +std::unique_ptr SnappyCodec::doCompress(const IOBuf* data) { + IOBufSnappySource source(data); + auto out = + IOBuf::create(snappy::MaxCompressedLength(source.Available())); + + snappy::UncheckedByteArraySink sink(reinterpret_cast( + out->writableTail())); + + size_t n = snappy::Compress(&source, &sink); + + CHECK_LE(n, out->capacity()); + out->append(n); + return out; +} + +std::unique_ptr SnappyCodec::doUncompress(const IOBuf* data, + uint64_t uncompressedLength) { + uint32_t actualUncompressedLength = 0; + + { + IOBufSnappySource source(data); + if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) { + throw std::runtime_error("snappy::GetUncompressedLength failed"); + } + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + uncompressedLength != actualUncompressedLength) { + throw std::runtime_error("snappy: invalid uncompressed length"); + } + } + + auto out = IOBuf::create(actualUncompressedLength); + + { + IOBufSnappySource source(data); + if (!snappy::RawUncompress(&source, + reinterpret_cast(out->writableTail()))) { + throw std::runtime_error("snappy::RawUncompress failed"); + } + } + + out->append(actualUncompressedLength); + return out; +} + +/** + * Zlib codec + */ +class ZlibCodec FOLLY_FINAL : public Codec { + public: + static std::unique_ptr create(int level); + explicit ZlibCodec(int level); + + private: + CodecType doType() const FOLLY_OVERRIDE; + std::unique_ptr doCompress(const IOBuf* data) FOLLY_OVERRIDE; + std::unique_ptr doUncompress( + const IOBuf* data, + uint64_t uncompressedLength) FOLLY_OVERRIDE; + + std::unique_ptr addOutputBuffer(z_stream* stream, uint32_t length); + bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength); + + int level_; +}; + +std::unique_ptr ZlibCodec::create(int level) { + return make_unique(level); +} + +ZlibCodec::ZlibCodec(int level) { + switch (level) { + case COMPRESSION_LEVEL_FASTEST: + level = 1; + break; + case COMPRESSION_LEVEL_DEFAULT: + level = Z_DEFAULT_COMPRESSION; + break; + case COMPRESSION_LEVEL_BEST: + level = 9; + break; + } + if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) { + throw std::invalid_argument(to( + "ZlibCodec: invalid level: ", level)); + } + level_ = level; +} + +CodecType ZlibCodec::doType() const { + return CodecType::ZLIB; +} + +std::unique_ptr ZlibCodec::addOutputBuffer(z_stream* stream, + uint32_t length) { + CHECK_EQ(stream->avail_out, 0); + + auto buf = IOBuf::create(length); + buf->append(length); + + stream->next_out = buf->writableData(); + stream->avail_out = buf->length(); + + return buf; +} + +bool ZlibCodec::doInflate(z_stream* stream, + IOBuf* head, + uint32_t bufferLength) { + if (stream->avail_out == 0) { + head->prependChain(addOutputBuffer(stream, bufferLength)); + } + + int rc = inflate(stream, Z_NO_FLUSH); + + switch (rc) { + case Z_OK: + break; + case Z_STREAM_END: + return true; + case Z_BUF_ERROR: + case Z_NEED_DICT: + case Z_DATA_ERROR: + case Z_MEM_ERROR: + throw std::runtime_error(to( + "ZlibCodec: inflate error: ", rc, ": ", stream->msg)); + default: + CHECK(false) << rc << ": " << stream->msg; + } + + return false; +} + + +std::unique_ptr ZlibCodec::doCompress(const IOBuf* data) { + z_stream stream; + stream.zalloc = nullptr; + stream.zfree = nullptr; + stream.opaque = nullptr; + + int rc = deflateInit(&stream, level_); + if (rc != Z_OK) { + throw std::runtime_error(to( + "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg)); + } + + stream.next_in = stream.next_out = nullptr; + stream.avail_in = stream.avail_out = 0; + stream.total_in = stream.total_out = 0; + + bool success = false; + + SCOPE_EXIT { + int rc = deflateEnd(&stream); + // If we're here because of an exception, it's okay if some data + // got dropped. + CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR)) + << rc << ": " << stream.msg; + }; + + uint64_t uncompressedLength = data->computeChainDataLength(); + uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength); + + // Max 64MiB in one go + constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB + constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB + + auto out = addOutputBuffer( + &stream, + (maxCompressedLength <= maxSingleStepLength ? + maxCompressedLength : + defaultBufferLength)); + + for (auto& range : *data) { + if (range.empty()) { + continue; + } + + stream.next_in = const_cast(range.data()); + stream.avail_in = range.size(); + + while (stream.avail_in != 0) { + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, defaultBufferLength)); + } + + rc = deflate(&stream, Z_NO_FLUSH); + + CHECK_EQ(rc, Z_OK) << stream.msg; + } + } + + do { + if (stream.avail_out == 0) { + out->prependChain(addOutputBuffer(&stream, defaultBufferLength)); + } + + rc = deflate(&stream, Z_FINISH); + } while (rc == Z_OK); + + CHECK_EQ(rc, Z_STREAM_END) << stream.msg; + + out->prev()->trimEnd(stream.avail_out); + + success = true; // we survived + + return out; +} + +std::unique_ptr ZlibCodec::doUncompress(const IOBuf* data, + uint64_t uncompressedLength) { + z_stream stream; + stream.zalloc = nullptr; + stream.zfree = nullptr; + stream.opaque = nullptr; + + int rc = inflateInit(&stream); + if (rc != Z_OK) { + throw std::runtime_error(to( + "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg)); + } + + stream.next_in = stream.next_out = nullptr; + stream.avail_in = stream.avail_out = 0; + stream.total_in = stream.total_out = 0; + + bool success = false; + + SCOPE_EXIT { + int rc = inflateEnd(&stream); + // If we're here because of an exception, it's okay if some data + // got dropped. + CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR)) + << rc << ": " << stream.msg; + }; + + // Max 64MiB in one go + constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20; // 64MiB + constexpr uint32_t defaultBufferLength = uint32_t(4) << 20; // 4MiB + + auto out = addOutputBuffer( + &stream, + ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + uncompressedLength <= maxSingleStepLength) ? + uncompressedLength : + defaultBufferLength)); + + bool streamEnd = false; + for (auto& range : *data) { + if (range.empty()) { + continue; + } + + stream.next_in = const_cast(range.data()); + stream.avail_in = range.size(); + + while (stream.avail_in != 0) { + if (streamEnd) { + throw std::runtime_error(to( + "ZlibCodec: junk after end of data")); + } + + streamEnd = doInflate(&stream, out.get(), defaultBufferLength); + } + } + + while (!streamEnd) { + streamEnd = doInflate(&stream, out.get(), defaultBufferLength); + } + + out->prev()->trimEnd(stream.avail_out); + + if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH && + uncompressedLength != stream.total_out) { + throw std::runtime_error(to( + "ZlibCodec: invalid uncompressed length")); + } + + success = true; // we survived + + return out; +} + +typedef std::unique_ptr (*CodecFactory)(int); + +CodecFactory gCodecFactories[ + static_cast(CodecType::NUM_CODEC_TYPES)] = { + NoCompressionCodec::create, + LZ4Codec::create, + SnappyCodec::create, + ZlibCodec::create +}; + +} // namespace + +std::unique_ptr getCodec(CodecType type, int level) { + size_t idx = static_cast(type); + if (idx >= static_cast(CodecType::NUM_CODEC_TYPES)) { + throw std::invalid_argument(to( + "Compression type ", idx, " not supported")); + } + auto factory = gCodecFactories[idx]; + if (!factory) { + throw std::invalid_argument(to( + "Compression type ", idx, " not supported")); + } + auto codec = (*factory)(level); + DCHECK_EQ(static_cast(codec->type()), idx); + return codec; +} + +}} // namespaces + diff --git a/folly/io/Compression.h b/folly/io/Compression.h new file mode 100644 index 00000000..5a5f9d89 --- /dev/null +++ b/folly/io/Compression.h @@ -0,0 +1,142 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_IO_COMPRESSION_H_ +#define FOLLY_IO_COMPRESSION_H_ + +#include +#include +#include + +#include "folly/io/IOBuf.h" + +/** + * Compression / decompression over IOBufs + */ + +namespace folly { namespace io { + +enum class CodecType { + /** + * Use no compression. + * Levels supported: 0 + */ + NO_COMPRESSION = 0, + + /** + * Use LZ4 compression. + * Levels supported: 1 = fast, 2 = best; default = 1 + */ + LZ4 = 1, + + /** + * Use Snappy compression. + * Levels supported: 1 + */ + SNAPPY = 2, + + /** + * Use zlib compression. + * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6 + */ + ZLIB = 3, + + NUM_CODEC_TYPES = 4, +}; + +class Codec { + public: + virtual ~Codec() { } + + /** + * Return the maximum length of data that may be compressed with this codec. + * NO_COMPRESSION and ZLIB support arbitrary lengths; + * LZ4 supports up to 1.9GiB; SNAPPY supports up to 4GiB. + */ + uint64_t maxUncompressedLength() const; + + /** + * Return the codec's type. + */ + CodecType type() const; + + /** + * Does this codec need the exact uncompressed length on decompression? + */ + bool needsUncompressedLength() const; + + /** + * Compress data, returning an IOBuf (which may share storage with data). + * Throws std::invalid_argument if data is larger than + * maxUncompressedLength(). + * + * Regardless of the behavior of the underlying compressor, compressing + * an empty IOBuf chain will return an empty IOBuf chain. + */ + std::unique_ptr compress(const folly::IOBuf* data); + + /** + * Uncompress data. Throws std::runtime_error on decompression error. + * + * Some codecs (LZ4) require the exact uncompressed length; this is indicated + * by needsUncompressedLength(). + * + * For other codes (zlib), knowing the exact uncompressed length ahead of + * time might be faster. + * + * Regardless of the behavior of the underlying compressor, uncompressing + * an empty IOBuf chain will return an empty IOBuf chain. + */ + static constexpr uint64_t UNKNOWN_UNCOMPRESSED_LENGTH = uint64_t(-1); + + std::unique_ptr uncompress( + const IOBuf* data, + uint64_t uncompressedLength = UNKNOWN_UNCOMPRESSED_LENGTH); + + private: + // default: no limits (save for special value UNKNOWN_UNCOMPRESSED_LENGTH) + virtual uint64_t doMaxUncompressedLength() const; + // default: doesn't need uncompressed length + virtual bool doNeedsUncompressedLength() const; + virtual CodecType doType() const = 0; + virtual std::unique_ptr doCompress(const folly::IOBuf* data) = 0; + virtual std::unique_ptr doUncompress(const folly::IOBuf* data, + uint64_t uncompressedLength) = 0; +}; + +constexpr int COMPRESSION_LEVEL_FASTEST = -1; +constexpr int COMPRESSION_LEVEL_DEFAULT = -2; +constexpr int COMPRESSION_LEVEL_BEST = -3; + +/** + * Return a codec for the given type. Throws on error. The level + * is a non-negative codec-dependent integer indicating the level of + * compression desired, or one of the following constants: + * + * COMPRESSION_LEVEL_FASTEST is fastest (uses least CPU / memory, + * worst compression) + * COMPRESSION_LEVEL_DEFAULT is the default (likely a tradeoff between + * FASTEST and BEST) + * COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory, + * best compression) + */ +std::unique_ptr getCodec(CodecType type, + int level = COMPRESSION_LEVEL_DEFAULT); + +}} // namespaces + +#endif /* FOLLY_IO_COMPRESSION_H_ */ + diff --git a/folly/io/test/CompressionTest.cpp b/folly/io/test/CompressionTest.cpp new file mode 100644 index 00000000..ae791cd3 --- /dev/null +++ b/folly/io/test/CompressionTest.cpp @@ -0,0 +1,194 @@ +/* + * Copyright 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "folly/io/Compression.h" + +// Yes, tr1, as that's what gtest requires +#include +#include +#include +#include + +#include +#include + +#include "folly/Benchmark.h" +#include "folly/Hash.h" +#include "folly/Random.h" +#include "folly/io/IOBufQueue.h" + +namespace folly { namespace io { namespace test { + +constexpr size_t randomDataSizeLog2 = 27; // 128MiB +constexpr size_t randomDataSize = size_t(1) << randomDataSizeLog2; + +std::unique_ptr randomData; +std::unordered_map hashes; + +uint64_t hashIOBuf(const IOBuf* buf) { + uint64_t h = folly::hash::FNV_64_HASH_START; + for (auto& range : *buf) { + h = folly::hash::fnv64_buf(range.data(), range.size(), h); + } + return h; +} + +uint64_t getRandomDataHash(uint64_t size) { + auto p = hashes.find(size); + if (p != hashes.end()) { + return p->second; + } + + uint64_t h = folly::hash::fnv64_buf(randomData.get(), size); + hashes[size] = h; + return h; +} + +void generateRandomData() { + randomData.reset(new uint8_t[size_t(1) << randomDataSizeLog2]); + + constexpr size_t numThreadsLog2 = 3; + constexpr size_t numThreads = size_t(1) << numThreadsLog2; + + uint32_t seed = randomNumberSeed(); + + std::vector threads; + threads.reserve(numThreads); + for (size_t t = 0; t < numThreads; ++t) { + threads.emplace_back( + [seed, t, numThreadsLog2] () { + std::mt19937 rng(seed + t); + size_t countLog2 = size_t(1) << (randomDataSizeLog2 - numThreadsLog2); + size_t start = size_t(t) << countLog2; + for (size_t i = 0; i < countLog2; ++i) { + randomData[start + i] = rng(); + } + }); + } + + for (auto& t : threads) { + t.join(); + } +} + +class CompressionTest : public testing::TestWithParam< + std::tr1::tuple> { + protected: + void SetUp() { + auto tup = GetParam(); + uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup); + codec_ = getCodec(std::tr1::get<1>(tup)); + } + + uint64_t uncompressedLength_; + std::unique_ptr codec_; +}; + +TEST_P(CompressionTest, Simple) { + auto original = IOBuf::wrapBuffer(randomData.get(), uncompressedLength_); + auto compressed = codec_->compress(original.get()); + if (!codec_->needsUncompressedLength()) { + auto uncompressed = codec_->uncompress(compressed.get()); + EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength()); + EXPECT_EQ(getRandomDataHash(uncompressedLength_), + hashIOBuf(uncompressed.get())); + } + { + auto uncompressed = codec_->uncompress(compressed.get(), + uncompressedLength_); + EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength()); + EXPECT_EQ(getRandomDataHash(uncompressedLength_), + hashIOBuf(uncompressed.get())); + } +} + +INSTANTIATE_TEST_CASE_P( + CompressionTest, + CompressionTest, + testing::Combine( + testing::Values(0, 1, 12, 22, int(randomDataSizeLog2)), + testing::Values(CodecType::NO_COMPRESSION, + CodecType::LZ4, + CodecType::SNAPPY, + CodecType::ZLIB))); + +class CompressionCorruptionTest : public testing::TestWithParam { + protected: + void SetUp() { + codec_ = getCodec(GetParam()); + } + + std::unique_ptr codec_; +}; + +TEST_P(CompressionCorruptionTest, Simple) { + constexpr uint64_t uncompressedLength = 42; + auto original = IOBuf::wrapBuffer(randomData.get(), uncompressedLength); + auto compressed = codec_->compress(original.get()); + + if (!codec_->needsUncompressedLength()) { + auto uncompressed = codec_->uncompress(compressed.get()); + EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); + EXPECT_EQ(getRandomDataHash(uncompressedLength), + hashIOBuf(uncompressed.get())); + } + { + auto uncompressed = codec_->uncompress(compressed.get(), + uncompressedLength); + EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength()); + EXPECT_EQ(getRandomDataHash(uncompressedLength), + hashIOBuf(uncompressed.get())); + } + + EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1), + std::runtime_error); + + // Corrupt the first character + ++(compressed->writableData()[0]); + + if (!codec_->needsUncompressedLength()) { + EXPECT_THROW(codec_->uncompress(compressed.get()), + std::runtime_error); + } + + EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength), + std::runtime_error); +} + +INSTANTIATE_TEST_CASE_P( + CompressionCorruptionTest, + CompressionCorruptionTest, + testing::Values( + // NO_COMPRESSION can't detect corruption + // LZ4 can't detect corruption reliably (sigh) + CodecType::SNAPPY, + CodecType::ZLIB)); + +}}} // namespaces + +int main(int argc, char *argv[]) { + testing::InitGoogleTest(&argc, argv); + google::ParseCommandLineFlags(&argc, &argv, true); + + folly::io::test::generateRandomData(); // 4GB + + auto ret = RUN_ALL_TESTS(); + if (!ret) { + folly::runBenchmarksOnFlag(); + } + return ret; +} +