IOBuf compression
authorTudor Bosman <tudorb@fb.com>
Wed, 7 Aug 2013 00:01:37 +0000 (17:01 -0700)
committerSara Golemon <sgolemon@fb.com>
Wed, 28 Aug 2013 21:30:11 +0000 (14:30 -0700)
Summary: davejwatson: you asked

Test Plan: test added

Reviewed By: davejwatson@fb.com

FB internal diff: D917336

folly/io/Compression.cpp [new file with mode: 0644]
folly/io/Compression.h [new file with mode: 0644]
folly/io/test/CompressionTest.cpp [new file with mode: 0644]

diff --git a/folly/io/Compression.cpp b/folly/io/Compression.cpp
new file mode 100644 (file)
index 0000000..f46df40
--- /dev/null
@@ -0,0 +1,631 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/io/Compression.h"
+
+#include <lz4.h>
+#include <lz4hc.h>
+#include <glog/logging.h>
+#include <snappy.h>
+#include <snappy-sinksource.h>
+#include <zlib.h>
+
+#include "folly/Conv.h"
+#include "folly/Memory.h"
+#include "folly/Portability.h"
+#include "folly/ScopeGuard.h"
+#include "folly/io/Cursor.h"
+
+namespace folly { namespace io {
+
+// Ensure consistent behavior in the nullptr case
+std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
+  return !data->empty() ? doCompress(data) : IOBuf::create(0);
+}
+
+std::unique_ptr<IOBuf> Codec::uncompress(const IOBuf* data,
+                                         uint64_t uncompressedLength) {
+  if (uncompressedLength == UNKNOWN_UNCOMPRESSED_LENGTH) {
+    if (needsUncompressedLength()) {
+      throw std::invalid_argument("Codec: uncompressed length required");
+    }
+  } else if (uncompressedLength > maxUncompressedLength()) {
+    throw std::runtime_error("Codec: uncompressed length too large");
+  }
+
+  if (data->empty()) {
+    if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+        uncompressedLength != 0) {
+      throw std::runtime_error("Codec: invalid uncompressed length");
+    }
+    return IOBuf::create(0);
+  }
+
+  return doUncompress(data, uncompressedLength);
+}
+
+bool Codec::needsUncompressedLength() const {
+  return doNeedsUncompressedLength();
+}
+
+uint64_t Codec::maxUncompressedLength() const {
+  return doMaxUncompressedLength();
+}
+
+CodecType Codec::type() const {
+  return doType();
+}
+
+bool Codec::doNeedsUncompressedLength() const {
+  return false;
+}
+
+uint64_t Codec::doMaxUncompressedLength() const {
+  return std::numeric_limits<uint64_t>::max() - 1;
+}
+
+namespace {
+
+/**
+ * No compression
+ */
+class NoCompressionCodec FOLLY_FINAL : public Codec {
+ public:
+  static std::unique_ptr<Codec> create(int level);
+  explicit NoCompressionCodec(int level);
+
+ private:
+  CodecType doType() const FOLLY_OVERRIDE;
+  std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
+  std::unique_ptr<IOBuf> doUncompress(
+      const IOBuf* data,
+      uint64_t uncompressedLength) FOLLY_OVERRIDE;
+};
+
+std::unique_ptr<Codec> NoCompressionCodec::create(int level) {
+  return make_unique<NoCompressionCodec>(level);
+}
+
+NoCompressionCodec::NoCompressionCodec(int level) {
+  switch (level) {
+  case COMPRESSION_LEVEL_DEFAULT:
+  case COMPRESSION_LEVEL_FASTEST:
+  case COMPRESSION_LEVEL_BEST:
+    level = 0;
+  }
+  if (level != 0) {
+    throw std::invalid_argument(to<std::string>(
+        "NoCompressionCodec: invalid level ", level));
+  }
+}
+
+CodecType NoCompressionCodec::doType() const {
+  return CodecType::NO_COMPRESSION;
+}
+
+std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
+    const IOBuf* data) {
+  return data->clone();
+}
+
+std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
+    const IOBuf* data,
+    uint64_t uncompressedLength) {
+  if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+      data->computeChainDataLength() != uncompressedLength) {
+    throw std::runtime_error(to<std::string>(
+        "NoCompressionCodec: invalid uncompressed length"));
+  }
+  return data->clone();
+}
+
+/**
+ * LZ4 compression
+ */
+class LZ4Codec FOLLY_FINAL : public Codec {
+ public:
+  static std::unique_ptr<Codec> create(int level);
+  explicit LZ4Codec(int level);
+
+ private:
+  bool doNeedsUncompressedLength() const FOLLY_OVERRIDE;
+  uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
+  CodecType doType() const FOLLY_OVERRIDE;
+  std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
+  std::unique_ptr<IOBuf> doUncompress(
+      const IOBuf* data,
+      uint64_t uncompressedLength) FOLLY_OVERRIDE;
+
+  bool highCompression_;
+};
+
+std::unique_ptr<Codec> LZ4Codec::create(int level) {
+  return make_unique<LZ4Codec>(level);
+}
+
+LZ4Codec::LZ4Codec(int level) {
+  switch (level) {
+  case COMPRESSION_LEVEL_FASTEST:
+  case COMPRESSION_LEVEL_DEFAULT:
+    level = 1;
+    break;
+  case COMPRESSION_LEVEL_BEST:
+    level = 2;
+    break;
+  }
+  if (level < 1 || level > 2) {
+    throw std::invalid_argument(to<std::string>(
+        "LZ4Codec: invalid level: ", level));
+  }
+  highCompression_ = (level > 1);
+}
+
+bool LZ4Codec::doNeedsUncompressedLength() const {
+  return true;
+}
+
+uint64_t LZ4Codec::doMaxUncompressedLength() const {
+  // From lz4.h: "Max supported value is ~1.9GB"; I wish we had something
+  // more accurate.
+  return 1.8 * (uint64_t(1) << 30);
+}
+
+CodecType LZ4Codec::doType() const {
+  return CodecType::LZ4;
+}
+
+std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
+  std::unique_ptr<IOBuf> clone;
+  if (data->isChained()) {
+    // LZ4 doesn't support streaming, so we have to coalesce
+    clone = data->clone();
+    clone->coalesce();
+    data = clone.get();
+  }
+
+  auto out = IOBuf::create(LZ4_compressBound(data->length()));
+  int n;
+  if (highCompression_) {
+    n = LZ4_compress(reinterpret_cast<const char*>(data->data()),
+                     reinterpret_cast<char*>(out->writableTail()),
+                     data->length());
+  } else {
+    n = LZ4_compressHC(reinterpret_cast<const char*>(data->data()),
+                       reinterpret_cast<char*>(out->writableTail()),
+                       data->length());
+  }
+
+  CHECK_GE(n, 0);
+  CHECK_LE(n, out->capacity());
+
+  out->append(n);
+  return out;
+}
+
+std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
+    const IOBuf* data,
+    uint64_t uncompressedLength) {
+  std::unique_ptr<IOBuf> clone;
+  if (data->isChained()) {
+    // LZ4 doesn't support streaming, so we have to coalesce
+    clone = data->clone();
+    clone->coalesce();
+    data = clone.get();
+  }
+
+  auto out = IOBuf::create(uncompressedLength);
+  int n = LZ4_uncompress(reinterpret_cast<const char*>(data->data()),
+                         reinterpret_cast<char*>(out->writableTail()),
+                         uncompressedLength);
+  if (n != data->length()) {
+    throw std::runtime_error(to<std::string>(
+        "LZ4 decompression returned invalid value ", n));
+  }
+  out->append(uncompressedLength);
+  return out;
+}
+
+/**
+ * Snappy compression
+ */
+
+/**
+ * Implementation of snappy::Source that reads from a IOBuf chain.
+ */
+class IOBufSnappySource FOLLY_FINAL : public snappy::Source {
+ public:
+  explicit IOBufSnappySource(const IOBuf* data);
+  size_t Available() const FOLLY_OVERRIDE;
+  const char* Peek(size_t* len) FOLLY_OVERRIDE;
+  void Skip(size_t n) FOLLY_OVERRIDE;
+ private:
+  size_t available_;
+  io::Cursor cursor_;
+};
+
+IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
+  : available_(data->computeChainDataLength()),
+    cursor_(data) {
+}
+
+size_t IOBufSnappySource::Available() const {
+  return available_;
+}
+
+const char* IOBufSnappySource::Peek(size_t* len) {
+  auto p = cursor_.peek();
+  *len = p.second;
+  return reinterpret_cast<const char*>(p.first);
+}
+
+void IOBufSnappySource::Skip(size_t n) {
+  CHECK_LE(n, available_);
+  cursor_.skip(n);
+  available_ -= n;
+}
+
+class SnappyCodec FOLLY_FINAL : public Codec {
+ public:
+  static std::unique_ptr<Codec> create(int level);
+  explicit SnappyCodec(int level);
+
+ private:
+  uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
+  CodecType doType() const FOLLY_OVERRIDE;
+  std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
+  std::unique_ptr<IOBuf> doUncompress(
+      const IOBuf* data,
+      uint64_t uncompressedLength) FOLLY_OVERRIDE;
+};
+
+std::unique_ptr<Codec> SnappyCodec::create(int level) {
+  return make_unique<SnappyCodec>(level);
+}
+
+SnappyCodec::SnappyCodec(int level) {
+  switch (level) {
+  case COMPRESSION_LEVEL_FASTEST:
+  case COMPRESSION_LEVEL_DEFAULT:
+  case COMPRESSION_LEVEL_BEST:
+    level = 1;
+  }
+  if (level != 1) {
+    throw std::invalid_argument(to<std::string>(
+        "SnappyCodec: invalid level: ", level));
+  }
+}
+
+uint64_t SnappyCodec::doMaxUncompressedLength() const {
+  // snappy.h uses uint32_t for lengths, so there's that.
+  return std::numeric_limits<uint32_t>::max();
+}
+
+CodecType SnappyCodec::doType() const {
+  return CodecType::SNAPPY;
+}
+
+std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
+  IOBufSnappySource source(data);
+  auto out =
+    IOBuf::create(snappy::MaxCompressedLength(source.Available()));
+
+  snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(
+      out->writableTail()));
+
+  size_t n = snappy::Compress(&source, &sink);
+
+  CHECK_LE(n, out->capacity());
+  out->append(n);
+  return out;
+}
+
+std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
+                                                 uint64_t uncompressedLength) {
+  uint32_t actualUncompressedLength = 0;
+
+  {
+    IOBufSnappySource source(data);
+    if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
+      throw std::runtime_error("snappy::GetUncompressedLength failed");
+    }
+    if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+        uncompressedLength != actualUncompressedLength) {
+      throw std::runtime_error("snappy: invalid uncompressed length");
+    }
+  }
+
+  auto out = IOBuf::create(actualUncompressedLength);
+
+  {
+    IOBufSnappySource source(data);
+    if (!snappy::RawUncompress(&source,
+                               reinterpret_cast<char*>(out->writableTail()))) {
+      throw std::runtime_error("snappy::RawUncompress failed");
+    }
+  }
+
+  out->append(actualUncompressedLength);
+  return out;
+}
+
+/**
+ * Zlib codec
+ */
+class ZlibCodec FOLLY_FINAL : public Codec {
+ public:
+  static std::unique_ptr<Codec> create(int level);
+  explicit ZlibCodec(int level);
+
+ private:
+  CodecType doType() const FOLLY_OVERRIDE;
+  std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
+  std::unique_ptr<IOBuf> doUncompress(
+      const IOBuf* data,
+      uint64_t uncompressedLength) FOLLY_OVERRIDE;
+
+  std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
+  bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
+
+  int level_;
+};
+
+std::unique_ptr<Codec> ZlibCodec::create(int level) {
+  return make_unique<ZlibCodec>(level);
+}
+
+ZlibCodec::ZlibCodec(int level) {
+  switch (level) {
+  case COMPRESSION_LEVEL_FASTEST:
+    level = 1;
+    break;
+  case COMPRESSION_LEVEL_DEFAULT:
+    level = Z_DEFAULT_COMPRESSION;
+    break;
+  case COMPRESSION_LEVEL_BEST:
+    level = 9;
+    break;
+  }
+  if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
+    throw std::invalid_argument(to<std::string>(
+        "ZlibCodec: invalid level: ", level));
+  }
+  level_ = level;
+}
+
+CodecType ZlibCodec::doType() const {
+  return CodecType::ZLIB;
+}
+
+std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
+                                                  uint32_t length) {
+  CHECK_EQ(stream->avail_out, 0);
+
+  auto buf = IOBuf::create(length);
+  buf->append(length);
+
+  stream->next_out = buf->writableData();
+  stream->avail_out = buf->length();
+
+  return buf;
+}
+
+bool ZlibCodec::doInflate(z_stream* stream,
+                          IOBuf* head,
+                          uint32_t bufferLength) {
+  if (stream->avail_out == 0) {
+    head->prependChain(addOutputBuffer(stream, bufferLength));
+  }
+
+  int rc = inflate(stream, Z_NO_FLUSH);
+
+  switch (rc) {
+  case Z_OK:
+    break;
+  case Z_STREAM_END:
+    return true;
+  case Z_BUF_ERROR:
+  case Z_NEED_DICT:
+  case Z_DATA_ERROR:
+  case Z_MEM_ERROR:
+    throw std::runtime_error(to<std::string>(
+        "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
+  default:
+    CHECK(false) << rc << ": " << stream->msg;
+  }
+
+  return false;
+}
+
+
+std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
+  z_stream stream;
+  stream.zalloc = nullptr;
+  stream.zfree = nullptr;
+  stream.opaque = nullptr;
+
+  int rc = deflateInit(&stream, level_);
+  if (rc != Z_OK) {
+    throw std::runtime_error(to<std::string>(
+        "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
+  }
+
+  stream.next_in = stream.next_out = nullptr;
+  stream.avail_in = stream.avail_out = 0;
+  stream.total_in = stream.total_out = 0;
+
+  bool success = false;
+
+  SCOPE_EXIT {
+    int rc = deflateEnd(&stream);
+    // If we're here because of an exception, it's okay if some data
+    // got dropped.
+    CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
+      << rc << ": " << stream.msg;
+  };
+
+  uint64_t uncompressedLength = data->computeChainDataLength();
+  uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
+
+  // Max 64MiB in one go
+  constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
+  constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
+
+  auto out = addOutputBuffer(
+      &stream,
+      (maxCompressedLength <= maxSingleStepLength ?
+       maxCompressedLength :
+       defaultBufferLength));
+
+  for (auto& range : *data) {
+    if (range.empty()) {
+      continue;
+    }
+
+    stream.next_in = const_cast<uint8_t*>(range.data());
+    stream.avail_in = range.size();
+
+    while (stream.avail_in != 0) {
+      if (stream.avail_out == 0) {
+        out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
+      }
+
+      rc = deflate(&stream, Z_NO_FLUSH);
+
+      CHECK_EQ(rc, Z_OK) << stream.msg;
+    }
+  }
+
+  do {
+    if (stream.avail_out == 0) {
+      out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
+    }
+
+    rc = deflate(&stream, Z_FINISH);
+  } while (rc == Z_OK);
+
+  CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
+
+  out->prev()->trimEnd(stream.avail_out);
+
+  success = true;  // we survived
+
+  return out;
+}
+
+std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
+                                               uint64_t uncompressedLength) {
+  z_stream stream;
+  stream.zalloc = nullptr;
+  stream.zfree = nullptr;
+  stream.opaque = nullptr;
+
+  int rc = inflateInit(&stream);
+  if (rc != Z_OK) {
+    throw std::runtime_error(to<std::string>(
+        "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
+  }
+
+  stream.next_in = stream.next_out = nullptr;
+  stream.avail_in = stream.avail_out = 0;
+  stream.total_in = stream.total_out = 0;
+
+  bool success = false;
+
+  SCOPE_EXIT {
+    int rc = inflateEnd(&stream);
+    // If we're here because of an exception, it's okay if some data
+    // got dropped.
+    CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
+      << rc << ": " << stream.msg;
+  };
+
+  // Max 64MiB in one go
+  constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
+  constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
+
+  auto out = addOutputBuffer(
+      &stream,
+      ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+        uncompressedLength <= maxSingleStepLength) ?
+       uncompressedLength :
+       defaultBufferLength));
+
+  bool streamEnd = false;
+  for (auto& range : *data) {
+    if (range.empty()) {
+      continue;
+    }
+
+    stream.next_in = const_cast<uint8_t*>(range.data());
+    stream.avail_in = range.size();
+
+    while (stream.avail_in != 0) {
+      if (streamEnd) {
+        throw std::runtime_error(to<std::string>(
+            "ZlibCodec: junk after end of data"));
+      }
+
+      streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
+    }
+  }
+
+  while (!streamEnd) {
+    streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
+  }
+
+  out->prev()->trimEnd(stream.avail_out);
+
+  if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+      uncompressedLength != stream.total_out) {
+    throw std::runtime_error(to<std::string>(
+        "ZlibCodec: invalid uncompressed length"));
+  }
+
+  success = true;  // we survived
+
+  return out;
+}
+
+typedef std::unique_ptr<Codec> (*CodecFactory)(int);
+
+CodecFactory gCodecFactories[
+    static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
+  NoCompressionCodec::create,
+  LZ4Codec::create,
+  SnappyCodec::create,
+  ZlibCodec::create
+};
+
+}  // namespace
+
+std::unique_ptr<Codec> getCodec(CodecType type, int level) {
+  size_t idx = static_cast<size_t>(type);
+  if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
+    throw std::invalid_argument(to<std::string>(
+        "Compression type ", idx, " not supported"));
+  }
+  auto factory = gCodecFactories[idx];
+  if (!factory) {
+    throw std::invalid_argument(to<std::string>(
+        "Compression type ", idx, " not supported"));
+  }
+  auto codec = (*factory)(level);
+  DCHECK_EQ(static_cast<size_t>(codec->type()), idx);
+  return codec;
+}
+
+}}  // namespaces
+
diff --git a/folly/io/Compression.h b/folly/io/Compression.h
new file mode 100644 (file)
index 0000000..5a5f9d8
--- /dev/null
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_IO_COMPRESSION_H_
+#define FOLLY_IO_COMPRESSION_H_
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+
+#include "folly/io/IOBuf.h"
+
+/**
+ * Compression / decompression over IOBufs
+ */
+
+namespace folly { namespace io {
+
+enum class CodecType {
+  /**
+   * Use no compression.
+   * Levels supported: 0
+   */
+  NO_COMPRESSION = 0,
+
+  /**
+   * Use LZ4 compression.
+   * Levels supported: 1 = fast, 2 = best; default = 1
+   */
+  LZ4 = 1,
+
+  /**
+   * Use Snappy compression.
+   * Levels supported: 1
+   */
+  SNAPPY = 2,
+
+  /**
+   * Use zlib compression.
+   * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6
+   */
+  ZLIB = 3,
+
+  NUM_CODEC_TYPES = 4,
+};
+
+class Codec {
+ public:
+  virtual ~Codec() { }
+
+  /**
+   * Return the maximum length of data that may be compressed with this codec.
+   * NO_COMPRESSION and ZLIB support arbitrary lengths;
+   * LZ4 supports up to 1.9GiB; SNAPPY supports up to 4GiB.
+   */
+  uint64_t maxUncompressedLength() const;
+
+  /**
+   * Return the codec's type.
+   */
+  CodecType type() const;
+
+  /**
+   * Does this codec need the exact uncompressed length on decompression?
+   */
+  bool needsUncompressedLength() const;
+
+  /**
+   * Compress data, returning an IOBuf (which may share storage with data).
+   * Throws std::invalid_argument if data is larger than
+   * maxUncompressedLength().
+   *
+   * Regardless of the behavior of the underlying compressor, compressing
+   * an empty IOBuf chain will return an empty IOBuf chain.
+   */
+  std::unique_ptr<IOBuf> compress(const folly::IOBuf* data);
+
+  /**
+   * Uncompress data. Throws std::runtime_error on decompression error.
+   *
+   * Some codecs (LZ4) require the exact uncompressed length; this is indicated
+   * by needsUncompressedLength().
+   *
+   * For other codes (zlib), knowing the exact uncompressed length ahead of
+   * time might be faster.
+   *
+   * Regardless of the behavior of the underlying compressor, uncompressing
+   * an empty IOBuf chain will return an empty IOBuf chain.
+   */
+  static constexpr uint64_t UNKNOWN_UNCOMPRESSED_LENGTH = uint64_t(-1);
+
+  std::unique_ptr<IOBuf> uncompress(
+      const IOBuf* data,
+      uint64_t uncompressedLength = UNKNOWN_UNCOMPRESSED_LENGTH);
+
+ private:
+  // default: no limits (save for special value UNKNOWN_UNCOMPRESSED_LENGTH)
+  virtual uint64_t doMaxUncompressedLength() const;
+  // default: doesn't need uncompressed length
+  virtual bool doNeedsUncompressedLength() const;
+  virtual CodecType doType() const = 0;
+  virtual std::unique_ptr<IOBuf> doCompress(const folly::IOBuf* data) = 0;
+  virtual std::unique_ptr<IOBuf> doUncompress(const folly::IOBuf* data,
+                                              uint64_t uncompressedLength) = 0;
+};
+
+constexpr int COMPRESSION_LEVEL_FASTEST = -1;
+constexpr int COMPRESSION_LEVEL_DEFAULT = -2;
+constexpr int COMPRESSION_LEVEL_BEST = -3;
+
+/**
+ * Return a codec for the given type. Throws on error.  The level
+ * is a non-negative codec-dependent integer indicating the level of
+ * compression desired, or one of the following constants:
+ *
+ * COMPRESSION_LEVEL_FASTEST is fastest (uses least CPU / memory,
+ *   worst compression)
+ * COMPRESSION_LEVEL_DEFAULT is the default (likely a tradeoff between
+ *   FASTEST and BEST)
+ * COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory,
+ *   best compression)
+ */
+std::unique_ptr<Codec> getCodec(CodecType type,
+                                int level = COMPRESSION_LEVEL_DEFAULT);
+
+}}  // namespaces
+
+#endif /* FOLLY_IO_COMPRESSION_H_ */
+
diff --git a/folly/io/test/CompressionTest.cpp b/folly/io/test/CompressionTest.cpp
new file mode 100644 (file)
index 0000000..ae791cd
--- /dev/null
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2013 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/io/Compression.h"
+
+// Yes, tr1, as that's what gtest requires
+#include <random>
+#include <thread>
+#include <tr1/tuple>
+#include <unordered_map>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "folly/Benchmark.h"
+#include "folly/Hash.h"
+#include "folly/Random.h"
+#include "folly/io/IOBufQueue.h"
+
+namespace folly { namespace io { namespace test {
+
+constexpr size_t randomDataSizeLog2 = 27;  // 128MiB
+constexpr size_t randomDataSize = size_t(1) << randomDataSizeLog2;
+
+std::unique_ptr<uint8_t[]> randomData;
+std::unordered_map<uint64_t, uint64_t> hashes;
+
+uint64_t hashIOBuf(const IOBuf* buf) {
+  uint64_t h = folly::hash::FNV_64_HASH_START;
+  for (auto& range : *buf) {
+    h = folly::hash::fnv64_buf(range.data(), range.size(), h);
+  }
+  return h;
+}
+
+uint64_t getRandomDataHash(uint64_t size) {
+  auto p = hashes.find(size);
+  if (p != hashes.end()) {
+    return p->second;
+  }
+
+  uint64_t h = folly::hash::fnv64_buf(randomData.get(), size);
+  hashes[size] = h;
+  return h;
+}
+
+void generateRandomData() {
+  randomData.reset(new uint8_t[size_t(1) << randomDataSizeLog2]);
+
+  constexpr size_t numThreadsLog2 = 3;
+  constexpr size_t numThreads = size_t(1) << numThreadsLog2;
+
+  uint32_t seed = randomNumberSeed();
+
+  std::vector<std::thread> threads;
+  threads.reserve(numThreads);
+  for (size_t t = 0; t < numThreads; ++t) {
+    threads.emplace_back(
+        [seed, t, numThreadsLog2] () {
+          std::mt19937 rng(seed + t);
+          size_t countLog2 = size_t(1) << (randomDataSizeLog2 - numThreadsLog2);
+          size_t start = size_t(t) << countLog2;
+          for (size_t i = 0; i < countLog2; ++i) {
+            randomData[start + i] = rng();
+          }
+        });
+  }
+
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
+class CompressionTest : public testing::TestWithParam<
+    std::tr1::tuple<int, CodecType>> {
+  protected:
+   void SetUp() {
+     auto tup = GetParam();
+     uncompressedLength_ = uint64_t(1) << std::tr1::get<0>(tup);
+     codec_ = getCodec(std::tr1::get<1>(tup));
+   }
+
+   uint64_t uncompressedLength_;
+   std::unique_ptr<Codec> codec_;
+};
+
+TEST_P(CompressionTest, Simple) {
+  auto original = IOBuf::wrapBuffer(randomData.get(), uncompressedLength_);
+  auto compressed = codec_->compress(original.get());
+  if (!codec_->needsUncompressedLength()) {
+    auto uncompressed = codec_->uncompress(compressed.get());
+    EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
+    EXPECT_EQ(getRandomDataHash(uncompressedLength_),
+              hashIOBuf(uncompressed.get()));
+  }
+  {
+    auto uncompressed = codec_->uncompress(compressed.get(),
+                                           uncompressedLength_);
+    EXPECT_EQ(uncompressedLength_, uncompressed->computeChainDataLength());
+    EXPECT_EQ(getRandomDataHash(uncompressedLength_),
+              hashIOBuf(uncompressed.get()));
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(
+    CompressionTest,
+    CompressionTest,
+    testing::Combine(
+        testing::Values(0, 1, 12, 22, int(randomDataSizeLog2)),
+        testing::Values(CodecType::NO_COMPRESSION,
+                        CodecType::LZ4,
+                        CodecType::SNAPPY,
+                        CodecType::ZLIB)));
+
+class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
+ protected:
+  void SetUp() {
+    codec_ = getCodec(GetParam());
+  }
+
+  std::unique_ptr<Codec> codec_;
+};
+
+TEST_P(CompressionCorruptionTest, Simple) {
+  constexpr uint64_t uncompressedLength = 42;
+  auto original = IOBuf::wrapBuffer(randomData.get(), uncompressedLength);
+  auto compressed = codec_->compress(original.get());
+
+  if (!codec_->needsUncompressedLength()) {
+    auto uncompressed = codec_->uncompress(compressed.get());
+    EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
+    EXPECT_EQ(getRandomDataHash(uncompressedLength),
+              hashIOBuf(uncompressed.get()));
+  }
+  {
+    auto uncompressed = codec_->uncompress(compressed.get(),
+                                           uncompressedLength);
+    EXPECT_EQ(uncompressedLength, uncompressed->computeChainDataLength());
+    EXPECT_EQ(getRandomDataHash(uncompressedLength),
+              hashIOBuf(uncompressed.get()));
+  }
+
+  EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength + 1),
+               std::runtime_error);
+
+  // Corrupt the first character
+  ++(compressed->writableData()[0]);
+
+  if (!codec_->needsUncompressedLength()) {
+    EXPECT_THROW(codec_->uncompress(compressed.get()),
+                 std::runtime_error);
+  }
+
+  EXPECT_THROW(codec_->uncompress(compressed.get(), uncompressedLength),
+               std::runtime_error);
+}
+
+INSTANTIATE_TEST_CASE_P(
+    CompressionCorruptionTest,
+    CompressionCorruptionTest,
+    testing::Values(
+        // NO_COMPRESSION can't detect corruption
+        // LZ4 can't detect corruption reliably (sigh)
+        CodecType::SNAPPY,
+        CodecType::ZLIB));
+
+}}}  // namespaces
+
+int main(int argc, char *argv[]) {
+  testing::InitGoogleTest(&argc, argv);
+  google::ParseCommandLineFlags(&argc, &argv, true);
+
+  folly::io::test::generateRandomData();  // 4GB
+
+  auto ret = RUN_ALL_TESTS();
+  if (!ret) {
+    folly::runBenchmarksOnFlag();
+  }
+  return ret;
+}
+