Add LZ4_FRAME codec
authorNick Terrell <terrelln@fb.com>
Fri, 24 Mar 2017 18:08:17 +0000 (11:08 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Fri, 24 Mar 2017 18:20:56 +0000 (11:20 -0700)
Summary:
The LZ4 Frame codec encodes data using the LZ4 frame format.
One advantage of the LZ4 frame format is that it has 4 magic bytes in the header, so users can transparently determine compression type.
It also allows the user to interop with the lz4 command line tool.

Reviewed By: yfeldblum

Differential Revision: D4715918

fbshipit-source-id: 689833fef526b1cfe98685179e7b494380d49cba

folly/io/Compression.cpp
folly/io/Compression.h
folly/io/test/CompressionTest.cpp

index d618da0c51c6e88ea2855093d90fe25d95c284ab..e0beac2a6720e65d87afd1529c9fea5b80ae6cfa 100644 (file)
@@ -18,6 +18,7 @@
 
 #if FOLLY_HAVE_LIBLZ4
 #include <lz4.h>
 
 #if FOLLY_HAVE_LIBLZ4
 #include <lz4.h>
+#include <lz4frame.h>
 #include <lz4hc.h>
 #endif
 
 #include <lz4hc.h>
 #endif
 
@@ -383,7 +384,160 @@ std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
   return out;
 }
 
   return out;
 }
 
-#endif  // FOLLY_HAVE_LIBLZ4
+class LZ4FrameCodec final : public Codec {
+ public:
+  static std::unique_ptr<Codec> create(int level, CodecType type);
+  explicit LZ4FrameCodec(int level, CodecType type);
+  ~LZ4FrameCodec();
+
+ private:
+  std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
+  std::unique_ptr<IOBuf> doUncompress(
+      const IOBuf* data,
+      uint64_t uncompressedLength) override;
+
+  // Reset the dctx_ if it is dirty or null.
+  void resetDCtx();
+
+  int level_;
+  LZ4F_dctx* dctx_{nullptr};
+  bool dirty_{false};
+};
+
+/* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
+    int level,
+    CodecType type) {
+  return make_unique<LZ4FrameCodec>(level, type);
+}
+
+static size_t lz4FrameThrowOnError(size_t code) {
+  if (LZ4F_isError(code)) {
+    throw std::runtime_error(
+        to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
+  }
+  return code;
+}
+
+void LZ4FrameCodec::resetDCtx() {
+  if (dctx_ && !dirty_) {
+    return;
+  }
+  if (dctx_) {
+    LZ4F_freeDecompressionContext(dctx_);
+  }
+  lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
+  dirty_ = false;
+}
+
+LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type) : Codec(type) {
+  DCHECK(type == CodecType::LZ4_FRAME);
+  switch (level) {
+    case COMPRESSION_LEVEL_FASTEST:
+    case COMPRESSION_LEVEL_DEFAULT:
+      level_ = 0;
+      break;
+    case COMPRESSION_LEVEL_BEST:
+      level_ = 16;
+      break;
+    default:
+      level_ = level;
+      break;
+  }
+}
+
+LZ4FrameCodec::~LZ4FrameCodec() {
+  if (dctx_) {
+    LZ4F_freeDecompressionContext(dctx_);
+  }
+}
+
+std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
+  // LZ4 Frame compression doesn't support streaming so we have to coalesce
+  IOBuf clone;
+  if (data->isChained()) {
+    clone = data->cloneCoalescedAsValue();
+    data = &clone;
+  }
+  // Set preferences
+  const auto uncompressedLength = data->length();
+  LZ4F_preferences_t prefs{};
+  prefs.compressionLevel = level_;
+  prefs.frameInfo.contentSize = uncompressedLength;
+  // Compress
+  auto buf = IOBuf::create(LZ4F_compressFrameBound(uncompressedLength, &prefs));
+  const size_t written = lz4FrameThrowOnError(LZ4F_compressFrame(
+      buf->writableTail(),
+      buf->tailroom(),
+      data->data(),
+      data->length(),
+      &prefs));
+  buf->append(written);
+  return buf;
+}
+
+std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
+    const IOBuf* data,
+    uint64_t uncompressedLength) {
+  // Reset the dctx if any errors have occurred
+  resetDCtx();
+  // Coalesce the data
+  ByteRange in = *data->begin();
+  IOBuf clone;
+  if (data->isChained()) {
+    clone = data->cloneCoalescedAsValue();
+    in = clone.coalesce();
+  }
+  data = nullptr;
+  // Select decompression options
+  LZ4F_decompressOptions_t options;
+  options.stableDst = 1;
+  // Select blockSize and growthSize for the IOBufQueue
+  IOBufQueue queue(IOBufQueue::cacheChainLength());
+  auto blockSize = uint64_t{64} << 10;
+  auto growthSize = uint64_t{4} << 20;
+  if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH) {
+    // Allocate uncompressedLength in one chunk (up to 64 MB)
+    const auto allocateSize = std::min(uncompressedLength, uint64_t{64} << 20);
+    queue.preallocate(allocateSize, allocateSize);
+    blockSize = std::min(uncompressedLength, blockSize);
+    growthSize = std::min(uncompressedLength, growthSize);
+  } else {
+    // Reduce growthSize for small data
+    const auto guessUncompressedLen = 4 * std::max(blockSize, in.size());
+    growthSize = std::min(guessUncompressedLen, growthSize);
+  }
+  // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
+  // returns 0
+  dirty_ = true;
+  // Decompress until the frame is over
+  size_t code = 0;
+  do {
+    // Allocate enough space to decompress at least a block
+    void* out;
+    size_t outSize;
+    std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
+    // Decompress
+    size_t inSize = in.size();
+    code = lz4FrameThrowOnError(
+        LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
+    if (in.empty() && outSize == 0 && code != 0) {
+      // We passed no input, no output was produced, and the frame isn't over
+      // No more forward progress is possible
+      throw std::runtime_error("LZ4Frame error: Incomplete frame");
+    }
+    in.uncheckedAdvance(inSize);
+    queue.postallocate(outSize);
+  } while (code != 0);
+  // At this point the decompression context can be reused
+  dirty_ = false;
+  if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+      queue.chainLength() != uncompressedLength) {
+    throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
+  }
+  return queue.move();
+}
+
+#endif // FOLLY_HAVE_LIBLZ4
 
 #if FOLLY_HAVE_LIBSNAPPY
 
 
 #if FOLLY_HAVE_LIBSNAPPY
 
@@ -1238,6 +1392,12 @@ static constexpr CodecFactory
 #else
         nullptr,
 #endif
 #else
         nullptr,
 #endif
+
+#if FOLLY_HAVE_LIBLZ4
+        LZ4FrameCodec::create,
+#else
+        nullptr,
+#endif
 };
 
 bool hasCodec(CodecType type) {
 };
 
 bool hasCodec(CodecType type) {
index 05ce06d1f66662e102945914ab9351f6f255aa9a..6d6ae3a4ce843adaa8e3fa16835cecb4ebfd4841 100644 (file)
@@ -85,7 +85,13 @@ enum class CodecType {
    */
   GZIP = 9,
 
    */
   GZIP = 9,
 
-  NUM_CODEC_TYPES = 10,
+  /**
+   * Use LZ4 frame compression.
+   * Levels supported: 0 = fast, 16 = best; default = 0
+   */
+  LZ4_FRAME = 10,
+
+  NUM_CODEC_TYPES = 11,
 };
 
 class Codec {
 };
 
 class Codec {
index e468287fba3df514571302aeed0a4375bfafb535..b1599f8d391c8e02901b7606e881487db91c5b25 100644 (file)
@@ -159,6 +159,7 @@ TEST(CompressionTestNeedsUncompressedLength, Simple) {
       { CodecType::LZMA2_VARINT_SIZE, false },
       { CodecType::ZSTD, false },
       { CodecType::GZIP, false },
       { CodecType::LZMA2_VARINT_SIZE, false },
       { CodecType::ZSTD, false },
       { CodecType::GZIP, false },
+      { CodecType::LZ4_FRAME, false },
     };
 
   for (auto const& test : expectations) {
     };
 
   for (auto const& test : expectations) {
@@ -391,8 +392,8 @@ INSTANTIATE_TEST_CASE_P(
         supportedCodecs({
             CodecType::SNAPPY,
             CodecType::ZLIB,
         supportedCodecs({
             CodecType::SNAPPY,
             CodecType::ZLIB,
-            })));
-
+            CodecType::LZ4_FRAME,
+        })));
 }}}  // namespaces
 
 int main(int argc, char *argv[]) {
 }}}  // namespaces
 
 int main(int argc, char *argv[]) {