Add Varint-length-prefixed flavor of LZ4
authorTudor Bosman <tudorb@fb.com>
Thu, 15 Aug 2013 02:37:40 +0000 (19:37 -0700)
committerSara Golemon <sgolemon@fb.com>
Wed, 28 Aug 2013 21:30:12 +0000 (14:30 -0700)
Test Plan: test added

Reviewed By: alandau@fb.com

FB internal diff: D928836

folly/io/Compression.cpp
folly/io/Compression.h
folly/io/test/CompressionTest.cpp

index f46df40103759db64b44682186134415305098b0..951619a17debccd79c0b2edfc0a5d1909e084dc2 100644 (file)
 #include "folly/Memory.h"
 #include "folly/Portability.h"
 #include "folly/ScopeGuard.h"
+#include "folly/Varint.h"
 #include "folly/io/Cursor.h"
 
 namespace folly { namespace io {
 
+Codec::Codec(CodecType type) : type_(type) { }
+
 // Ensure consistent behavior in the nullptr case
 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
   return !data->empty() ? doCompress(data) : IOBuf::create(0);
@@ -65,10 +68,6 @@ uint64_t Codec::maxUncompressedLength() const {
   return doMaxUncompressedLength();
 }
 
-CodecType Codec::type() const {
-  return doType();
-}
-
 bool Codec::doNeedsUncompressedLength() const {
   return false;
 }
@@ -84,22 +83,23 @@ namespace {
  */
 class NoCompressionCodec FOLLY_FINAL : public Codec {
  public:
-  static std::unique_ptr<Codec> create(int level);
-  explicit NoCompressionCodec(int level);
+  static std::unique_ptr<Codec> create(int level, CodecType type);
+  explicit NoCompressionCodec(int level, CodecType type);
 
  private:
-  CodecType doType() const FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doUncompress(
       const IOBuf* data,
       uint64_t uncompressedLength) FOLLY_OVERRIDE;
 };
 
-std::unique_ptr<Codec> NoCompressionCodec::create(int level) {
-  return make_unique<NoCompressionCodec>(level);
+std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
+  return make_unique<NoCompressionCodec>(level, type);
 }
 
-NoCompressionCodec::NoCompressionCodec(int level) {
+NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
+  : Codec(type) {
+  DCHECK(type == CodecType::NO_COMPRESSION);
   switch (level) {
   case COMPRESSION_LEVEL_DEFAULT:
   case COMPRESSION_LEVEL_FASTEST:
@@ -112,10 +112,6 @@ NoCompressionCodec::NoCompressionCodec(int level) {
   }
 }
 
-CodecType NoCompressionCodec::doType() const {
-  return CodecType::NO_COMPRESSION;
-}
-
 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
     const IOBuf* data) {
   return data->clone();
@@ -137,13 +133,15 @@ std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
  */
 class LZ4Codec FOLLY_FINAL : public Codec {
  public:
-  static std::unique_ptr<Codec> create(int level);
-  explicit LZ4Codec(int level);
+  static std::unique_ptr<Codec> create(int level, CodecType type);
+  explicit LZ4Codec(int level, CodecType type);
 
  private:
   bool doNeedsUncompressedLength() const FOLLY_OVERRIDE;
   uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
-  CodecType doType() const FOLLY_OVERRIDE;
+
+  bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
+
   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doUncompress(
       const IOBuf* data,
@@ -152,11 +150,13 @@ class LZ4Codec FOLLY_FINAL : public Codec {
   bool highCompression_;
 };
 
-std::unique_ptr<Codec> LZ4Codec::create(int level) {
-  return make_unique<LZ4Codec>(level);
+std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
+  return make_unique<LZ4Codec>(level, type);
 }
 
-LZ4Codec::LZ4Codec(int level) {
+LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
+  DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
+
   switch (level) {
   case COMPRESSION_LEVEL_FASTEST:
   case COMPRESSION_LEVEL_DEFAULT:
@@ -174,7 +174,7 @@ LZ4Codec::LZ4Codec(int level) {
 }
 
 bool LZ4Codec::doNeedsUncompressedLength() const {
-  return true;
+  return !encodeSize();
 }
 
 uint64_t LZ4Codec::doMaxUncompressedLength() const {
@@ -183,10 +183,24 @@ uint64_t LZ4Codec::doMaxUncompressedLength() const {
   return 1.8 * (uint64_t(1) << 30);
 }
 
-CodecType LZ4Codec::doType() const {
-  return CodecType::LZ4;
+namespace {
+
+void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
+  DCHECK_GE(out->tailroom(), kMaxVarintLength64);
+  out->append(encodeVarint(val, out->writableTail()));
+}
+
+uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
+  // Must have enough room in *this* buffer.
+  auto p = cursor.peek();
+  folly::ByteRange range(p.first, p.second);
+  uint64_t val = decodeVarint(range);
+  cursor.skip(range.data() - p.first);
+  return val;
 }
 
+}  // namespace
+
 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
   std::unique_ptr<IOBuf> clone;
   if (data->isChained()) {
@@ -196,16 +210,21 @@ std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
     data = clone.get();
   }
 
-  auto out = IOBuf::create(LZ4_compressBound(data->length()));
+  uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
+  auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
+  if (encodeSize()) {
+    encodeVarintToIOBuf(data->length(), out.get());
+  }
+
   int n;
   if (highCompression_) {
-    n = LZ4_compress(reinterpret_cast<const char*>(data->data()),
-                     reinterpret_cast<char*>(out->writableTail()),
-                     data->length());
-  } else {
     n = LZ4_compressHC(reinterpret_cast<const char*>(data->data()),
                        reinterpret_cast<char*>(out->writableTail()),
                        data->length());
+  } else {
+    n = LZ4_compress(reinterpret_cast<const char*>(data->data()),
+                     reinterpret_cast<char*>(out->writableTail()),
+                     data->length());
   }
 
   CHECK_GE(n, 0);
@@ -226,15 +245,29 @@ std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
     data = clone.get();
   }
 
-  auto out = IOBuf::create(uncompressedLength);
-  int n = LZ4_uncompress(reinterpret_cast<const char*>(data->data()),
+  folly::io::Cursor cursor(data);
+  uint64_t actualUncompressedLength;
+  if (encodeSize()) {
+    actualUncompressedLength = decodeVarintFromCursor(cursor);
+    if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+        uncompressedLength != actualUncompressedLength) {
+      throw std::runtime_error("LZ4Codec: invalid uncompressed length");
+    }
+  } else {
+    actualUncompressedLength = uncompressedLength;
+    DCHECK_NE(actualUncompressedLength, UNKNOWN_UNCOMPRESSED_LENGTH);
+  }
+
+  auto out = IOBuf::create(actualUncompressedLength);
+  auto p = cursor.peek();
+  int n = LZ4_uncompress(reinterpret_cast<const char*>(p.first),
                          reinterpret_cast<char*>(out->writableTail()),
-                         uncompressedLength);
-  if (n != data->length()) {
+                         actualUncompressedLength);
+  if (n != p.second) {
     throw std::runtime_error(to<std::string>(
         "LZ4 decompression returned invalid value ", n));
   }
-  out->append(uncompressedLength);
+  out->append(actualUncompressedLength);
   return out;
 }
 
@@ -279,23 +312,23 @@ void IOBufSnappySource::Skip(size_t n) {
 
 class SnappyCodec FOLLY_FINAL : public Codec {
  public:
-  static std::unique_ptr<Codec> create(int level);
-  explicit SnappyCodec(int level);
+  static std::unique_ptr<Codec> create(int level, CodecType type);
+  explicit SnappyCodec(int level, CodecType type);
 
  private:
   uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
-  CodecType doType() const FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doUncompress(
       const IOBuf* data,
       uint64_t uncompressedLength) FOLLY_OVERRIDE;
 };
 
-std::unique_ptr<Codec> SnappyCodec::create(int level) {
-  return make_unique<SnappyCodec>(level);
+std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
+  return make_unique<SnappyCodec>(level, type);
 }
 
-SnappyCodec::SnappyCodec(int level) {
+SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
+  DCHECK(type == CodecType::SNAPPY);
   switch (level) {
   case COMPRESSION_LEVEL_FASTEST:
   case COMPRESSION_LEVEL_DEFAULT:
@@ -313,10 +346,6 @@ uint64_t SnappyCodec::doMaxUncompressedLength() const {
   return std::numeric_limits<uint32_t>::max();
 }
 
-CodecType SnappyCodec::doType() const {
-  return CodecType::SNAPPY;
-}
-
 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
   IOBufSnappySource source(data);
   auto out =
@@ -366,11 +395,10 @@ std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
  */
 class ZlibCodec FOLLY_FINAL : public Codec {
  public:
-  static std::unique_ptr<Codec> create(int level);
-  explicit ZlibCodec(int level);
+  static std::unique_ptr<Codec> create(int level, CodecType type);
+  explicit ZlibCodec(int level, CodecType type);
 
  private:
-  CodecType doType() const FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doUncompress(
       const IOBuf* data,
@@ -382,11 +410,12 @@ class ZlibCodec FOLLY_FINAL : public Codec {
   int level_;
 };
 
-std::unique_ptr<Codec> ZlibCodec::create(int level) {
-  return make_unique<ZlibCodec>(level);
+std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
+  return make_unique<ZlibCodec>(level, type);
 }
 
-ZlibCodec::ZlibCodec(int level) {
+ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
+  DCHECK(type == CodecType::ZLIB);
   switch (level) {
   case COMPRESSION_LEVEL_FASTEST:
     level = 1;
@@ -405,10 +434,6 @@ ZlibCodec::ZlibCodec(int level) {
   level_ = level;
 }
 
-CodecType ZlibCodec::doType() const {
-  return CodecType::ZLIB;
-}
-
 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
                                                   uint32_t length) {
   CHECK_EQ(stream->avail_out, 0);
@@ -599,14 +624,16 @@ std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
   return out;
 }
 
-typedef std::unique_ptr<Codec> (*CodecFactory)(int);
+typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
 
 CodecFactory gCodecFactories[
     static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
+  nullptr,  // USER_DEFINED
   NoCompressionCodec::create,
   LZ4Codec::create,
   SnappyCodec::create,
-  ZlibCodec::create
+  ZlibCodec::create,
+  LZ4Codec::create
 };
 
 }  // namespace
@@ -622,7 +649,7 @@ std::unique_ptr<Codec> getCodec(CodecType type, int level) {
     throw std::invalid_argument(to<std::string>(
         "Compression type ", idx, " not supported"));
   }
-  auto codec = (*factory)(level);
+  auto codec = (*factory)(level, type);
   DCHECK_EQ(static_cast<size_t>(codec->type()), idx);
   return codec;
 }
index 5a5f9d8992c89e2a5740e191ebcaab275083effa..1edba5e6f3e07435378476935cb9b72c2c222e3c 100644 (file)
 namespace folly { namespace io {
 
 enum class CodecType {
+  /**
+   * This codec type is not defined; getCodec() will throw an exception
+   * if used. Useful if deriving your own classes from Codec without
+   * going through the getCodec() interface.
+   */
+  USER_DEFINED = 0,
+
   /**
    * Use no compression.
    * Levels supported: 0
    */
-  NO_COMPRESSION = 0,
+  NO_COMPRESSION = 1,
 
   /**
    * Use LZ4 compression.
    * Levels supported: 1 = fast, 2 = best; default = 1
    */
-  LZ4 = 1,
+  LZ4 = 2,
 
   /**
    * Use Snappy compression.
    * Levels supported: 1
    */
-  SNAPPY = 2,
+  SNAPPY = 3,
 
   /**
    * Use zlib compression.
    * Levels supported: 0 = no compression, 1 = fast, ..., 9 = best; default = 6
    */
-  ZLIB = 3,
+  ZLIB = 4,
 
-  NUM_CODEC_TYPES = 4,
+  /**
+   * Use LZ4 compression, prefixed with size (as Varint).
+   */
+  LZ4_VARINT_SIZE = 5,
+
+  NUM_CODEC_TYPES = 6,
 };
 
 class Codec {
@@ -71,7 +83,7 @@ class Codec {
   /**
    * Return the codec's type.
    */
-  CodecType type() const;
+  CodecType type() const { return type_; }
 
   /**
    * Does this codec need the exact uncompressed length on decompression?
@@ -106,15 +118,19 @@ class Codec {
       const IOBuf* data,
       uint64_t uncompressedLength = UNKNOWN_UNCOMPRESSED_LENGTH);
 
+ protected:
+  explicit Codec(CodecType type);
+
  private:
   // default: no limits (save for special value UNKNOWN_UNCOMPRESSED_LENGTH)
   virtual uint64_t doMaxUncompressedLength() const;
   // default: doesn't need uncompressed length
   virtual bool doNeedsUncompressedLength() const;
-  virtual CodecType doType() const = 0;
   virtual std::unique_ptr<IOBuf> doCompress(const folly::IOBuf* data) = 0;
   virtual std::unique_ptr<IOBuf> doUncompress(const folly::IOBuf* data,
                                               uint64_t uncompressedLength) = 0;
+
+  CodecType type_;
 };
 
 constexpr int COMPRESSION_LEVEL_FASTEST = -1;
@@ -132,6 +148,10 @@ constexpr int COMPRESSION_LEVEL_BEST = -3;
  *   FASTEST and BEST)
  * COMPRESSION_LEVEL_BEST is the best compression (uses most CPU / memory,
  *   best compression)
+ *
+ * When decompressing, the compression level is ignored. All codecs will
+ * decompress all data compressed with the a codec of the same type, regardless
+ * of compression level.
  */
 std::unique_ptr<Codec> getCodec(CodecType type,
                                 int level = COMPRESSION_LEVEL_DEFAULT);
index ae791cd3178fcb606761990f3fe515f7ee09d36c..57399bea1ee2a8c1e47a388c5d6e1642fa292afb 100644 (file)
@@ -84,6 +84,14 @@ void generateRandomData() {
   }
 }
 
+TEST(CompressionTestNeedsUncompressedLength, Simple) {
+  EXPECT_FALSE(getCodec(CodecType::NO_COMPRESSION)->needsUncompressedLength());
+  EXPECT_TRUE(getCodec(CodecType::LZ4)->needsUncompressedLength());
+  EXPECT_FALSE(getCodec(CodecType::SNAPPY)->needsUncompressedLength());
+  EXPECT_FALSE(getCodec(CodecType::ZLIB)->needsUncompressedLength());
+  EXPECT_FALSE(getCodec(CodecType::LZ4_VARINT_SIZE)->needsUncompressedLength());
+}
+
 class CompressionTest : public testing::TestWithParam<
     std::tr1::tuple<int, CodecType>> {
   protected:
@@ -123,7 +131,8 @@ INSTANTIATE_TEST_CASE_P(
         testing::Values(CodecType::NO_COMPRESSION,
                         CodecType::LZ4,
                         CodecType::SNAPPY,
-                        CodecType::ZLIB)));
+                        CodecType::ZLIB,
+                        CodecType::LZ4_VARINT_SIZE)));
 
 class CompressionCorruptionTest : public testing::TestWithParam<CodecType> {
  protected: