Add Varint-length-prefixed flavor of LZ4
[folly.git] / folly / io / Compression.cpp
index f46df40103759db64b44682186134415305098b0..951619a17debccd79c0b2edfc0a5d1909e084dc2 100644 (file)
 #include "folly/Memory.h"
 #include "folly/Portability.h"
 #include "folly/ScopeGuard.h"
+#include "folly/Varint.h"
 #include "folly/io/Cursor.h"
 
 namespace folly { namespace io {
 
+Codec::Codec(CodecType type) : type_(type) { }
+
 // Ensure consistent behavior in the nullptr case
 std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
   return !data->empty() ? doCompress(data) : IOBuf::create(0);
@@ -65,10 +68,6 @@ uint64_t Codec::maxUncompressedLength() const {
   return doMaxUncompressedLength();
 }
 
-CodecType Codec::type() const {
-  return doType();
-}
-
 bool Codec::doNeedsUncompressedLength() const {
   return false;
 }
@@ -84,22 +83,23 @@ namespace {
  */
 class NoCompressionCodec FOLLY_FINAL : public Codec {
  public:
-  static std::unique_ptr<Codec> create(int level);
-  explicit NoCompressionCodec(int level);
+  static std::unique_ptr<Codec> create(int level, CodecType type);
+  explicit NoCompressionCodec(int level, CodecType type);
 
  private:
-  CodecType doType() const FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doUncompress(
       const IOBuf* data,
       uint64_t uncompressedLength) FOLLY_OVERRIDE;
 };
 
-std::unique_ptr<Codec> NoCompressionCodec::create(int level) {
-  return make_unique<NoCompressionCodec>(level);
+std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
+  return make_unique<NoCompressionCodec>(level, type);
 }
 
-NoCompressionCodec::NoCompressionCodec(int level) {
+NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
+  : Codec(type) {
+  DCHECK(type == CodecType::NO_COMPRESSION);
   switch (level) {
   case COMPRESSION_LEVEL_DEFAULT:
   case COMPRESSION_LEVEL_FASTEST:
@@ -112,10 +112,6 @@ NoCompressionCodec::NoCompressionCodec(int level) {
   }
 }
 
-CodecType NoCompressionCodec::doType() const {
-  return CodecType::NO_COMPRESSION;
-}
-
 std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(
     const IOBuf* data) {
   return data->clone();
@@ -137,13 +133,15 @@ std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
  */
 class LZ4Codec FOLLY_FINAL : public Codec {
  public:
-  static std::unique_ptr<Codec> create(int level);
-  explicit LZ4Codec(int level);
+  static std::unique_ptr<Codec> create(int level, CodecType type);
+  explicit LZ4Codec(int level, CodecType type);
 
  private:
   bool doNeedsUncompressedLength() const FOLLY_OVERRIDE;
   uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
-  CodecType doType() const FOLLY_OVERRIDE;
+
+  bool encodeSize() const { return type() == CodecType::LZ4_VARINT_SIZE; }
+
   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doUncompress(
       const IOBuf* data,
@@ -152,11 +150,13 @@ class LZ4Codec FOLLY_FINAL : public Codec {
   bool highCompression_;
 };
 
-std::unique_ptr<Codec> LZ4Codec::create(int level) {
-  return make_unique<LZ4Codec>(level);
+std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
+  return make_unique<LZ4Codec>(level, type);
 }
 
-LZ4Codec::LZ4Codec(int level) {
+LZ4Codec::LZ4Codec(int level, CodecType type) : Codec(type) {
+  DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
+
   switch (level) {
   case COMPRESSION_LEVEL_FASTEST:
   case COMPRESSION_LEVEL_DEFAULT:
@@ -174,7 +174,7 @@ LZ4Codec::LZ4Codec(int level) {
 }
 
 bool LZ4Codec::doNeedsUncompressedLength() const {
-  return true;
+  return !encodeSize();
 }
 
 uint64_t LZ4Codec::doMaxUncompressedLength() const {
@@ -183,10 +183,24 @@ uint64_t LZ4Codec::doMaxUncompressedLength() const {
   return 1.8 * (uint64_t(1) << 30);
 }
 
-CodecType LZ4Codec::doType() const {
-  return CodecType::LZ4;
+namespace {
+
+void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
+  DCHECK_GE(out->tailroom(), kMaxVarintLength64);
+  out->append(encodeVarint(val, out->writableTail()));
+}
+
+uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
+  // Must have enough room in *this* buffer.
+  auto p = cursor.peek();
+  folly::ByteRange range(p.first, p.second);
+  uint64_t val = decodeVarint(range);
+  cursor.skip(range.data() - p.first);
+  return val;
 }
 
+}  // namespace
+
 std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
   std::unique_ptr<IOBuf> clone;
   if (data->isChained()) {
@@ -196,16 +210,21 @@ std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
     data = clone.get();
   }
 
-  auto out = IOBuf::create(LZ4_compressBound(data->length()));
+  uint32_t extraSize = encodeSize() ? kMaxVarintLength64 : 0;
+  auto out = IOBuf::create(extraSize + LZ4_compressBound(data->length()));
+  if (encodeSize()) {
+    encodeVarintToIOBuf(data->length(), out.get());
+  }
+
   int n;
   if (highCompression_) {
-    n = LZ4_compress(reinterpret_cast<const char*>(data->data()),
-                     reinterpret_cast<char*>(out->writableTail()),
-                     data->length());
-  } else {
     n = LZ4_compressHC(reinterpret_cast<const char*>(data->data()),
                        reinterpret_cast<char*>(out->writableTail()),
                        data->length());
+  } else {
+    n = LZ4_compress(reinterpret_cast<const char*>(data->data()),
+                     reinterpret_cast<char*>(out->writableTail()),
+                     data->length());
   }
 
   CHECK_GE(n, 0);
@@ -226,15 +245,29 @@ std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
     data = clone.get();
   }
 
-  auto out = IOBuf::create(uncompressedLength);
-  int n = LZ4_uncompress(reinterpret_cast<const char*>(data->data()),
+  folly::io::Cursor cursor(data);
+  uint64_t actualUncompressedLength;
+  if (encodeSize()) {
+    actualUncompressedLength = decodeVarintFromCursor(cursor);
+    if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+        uncompressedLength != actualUncompressedLength) {
+      throw std::runtime_error("LZ4Codec: invalid uncompressed length");
+    }
+  } else {
+    actualUncompressedLength = uncompressedLength;
+    DCHECK_NE(actualUncompressedLength, UNKNOWN_UNCOMPRESSED_LENGTH);
+  }
+
+  auto out = IOBuf::create(actualUncompressedLength);
+  auto p = cursor.peek();
+  int n = LZ4_uncompress(reinterpret_cast<const char*>(p.first),
                          reinterpret_cast<char*>(out->writableTail()),
-                         uncompressedLength);
-  if (n != data->length()) {
+                         actualUncompressedLength);
+  if (n != p.second) {
     throw std::runtime_error(to<std::string>(
         "LZ4 decompression returned invalid value ", n));
   }
-  out->append(uncompressedLength);
+  out->append(actualUncompressedLength);
   return out;
 }
 
@@ -279,23 +312,23 @@ void IOBufSnappySource::Skip(size_t n) {
 
 class SnappyCodec FOLLY_FINAL : public Codec {
  public:
-  static std::unique_ptr<Codec> create(int level);
-  explicit SnappyCodec(int level);
+  static std::unique_ptr<Codec> create(int level, CodecType type);
+  explicit SnappyCodec(int level, CodecType type);
 
  private:
   uint64_t doMaxUncompressedLength() const FOLLY_OVERRIDE;
-  CodecType doType() const FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doUncompress(
       const IOBuf* data,
       uint64_t uncompressedLength) FOLLY_OVERRIDE;
 };
 
-std::unique_ptr<Codec> SnappyCodec::create(int level) {
-  return make_unique<SnappyCodec>(level);
+std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
+  return make_unique<SnappyCodec>(level, type);
 }
 
-SnappyCodec::SnappyCodec(int level) {
+SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
+  DCHECK(type == CodecType::SNAPPY);
   switch (level) {
   case COMPRESSION_LEVEL_FASTEST:
   case COMPRESSION_LEVEL_DEFAULT:
@@ -313,10 +346,6 @@ uint64_t SnappyCodec::doMaxUncompressedLength() const {
   return std::numeric_limits<uint32_t>::max();
 }
 
-CodecType SnappyCodec::doType() const {
-  return CodecType::SNAPPY;
-}
-
 std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
   IOBufSnappySource source(data);
   auto out =
@@ -366,11 +395,10 @@ std::unique_ptr<IOBuf> SnappyCodec::doUncompress(const IOBuf* data,
  */
 class ZlibCodec FOLLY_FINAL : public Codec {
  public:
-  static std::unique_ptr<Codec> create(int level);
-  explicit ZlibCodec(int level);
+  static std::unique_ptr<Codec> create(int level, CodecType type);
+  explicit ZlibCodec(int level, CodecType type);
 
  private:
-  CodecType doType() const FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doCompress(const IOBuf* data) FOLLY_OVERRIDE;
   std::unique_ptr<IOBuf> doUncompress(
       const IOBuf* data,
@@ -382,11 +410,12 @@ class ZlibCodec FOLLY_FINAL : public Codec {
   int level_;
 };
 
-std::unique_ptr<Codec> ZlibCodec::create(int level) {
-  return make_unique<ZlibCodec>(level);
+std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
+  return make_unique<ZlibCodec>(level, type);
 }
 
-ZlibCodec::ZlibCodec(int level) {
+ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
+  DCHECK(type == CodecType::ZLIB);
   switch (level) {
   case COMPRESSION_LEVEL_FASTEST:
     level = 1;
@@ -405,10 +434,6 @@ ZlibCodec::ZlibCodec(int level) {
   level_ = level;
 }
 
-CodecType ZlibCodec::doType() const {
-  return CodecType::ZLIB;
-}
-
 std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
                                                   uint32_t length) {
   CHECK_EQ(stream->avail_out, 0);
@@ -599,14 +624,16 @@ std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
   return out;
 }
 
-typedef std::unique_ptr<Codec> (*CodecFactory)(int);
+typedef std::unique_ptr<Codec> (*CodecFactory)(int, CodecType);
 
 CodecFactory gCodecFactories[
     static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
+  nullptr,  // USER_DEFINED
   NoCompressionCodec::create,
   LZ4Codec::create,
   SnappyCodec::create,
-  ZlibCodec::create
+  ZlibCodec::create,
+  LZ4Codec::create
 };
 
 }  // namespace
@@ -622,7 +649,7 @@ std::unique_ptr<Codec> getCodec(CodecType type, int level) {
     throw std::invalid_argument(to<std::string>(
         "Compression type ", idx, " not supported"));
   }
-  auto codec = (*factory)(level);
+  auto codec = (*factory)(level, type);
   DCHECK_EQ(static_cast<size_t>(codec->type()), idx);
   return codec;
 }