Add bzip2 support
authorNick Terrell <terrelln@fb.com>
Thu, 13 Apr 2017 02:43:02 +0000 (19:43 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Thu, 13 Apr 2017 02:50:08 +0000 (19:50 -0700)
Summary:
Adds bzip2 support to `folly/io/Compression.h`.
Adds bzip2 to the default set of supported codecs for the `AutomaticCodec`.

Reviewed By: yfeldblum

Differential Revision: D4873771

fbshipit-source-id: d4f4861aef7e4b9efb67095e8892c265b5ae5557

folly/configure.ac
folly/io/Compression.cpp
folly/io/Compression.h
folly/io/test/CompressionTest.cpp

index db8ff1b..22e2f9f 100644 (file)
@@ -554,6 +554,7 @@ AC_CHECK_HEADER([snappy.h], AC_CHECK_LIB([snappy], [main]))
 AC_CHECK_HEADER([zlib.h], AC_CHECK_LIB([z], [main]))
 AC_CHECK_HEADER([lzma.h], AC_CHECK_LIB([lzma], [main]))
 AC_CHECK_HEADER([zstd.h], AC_CHECK_LIB([zstd], [ZSTD_compressStream]))
+AC_CHECK_HEADER([bzlib.h], AC_CHECK_LIB([bz2], [main]))
 AC_CHECK_HEADER([linux/membarrier.h], AC_DEFINE([HAVE_LINUX_MEMBARRIER_H], [1], [Define to 1 if membarrier.h is available]))
 
 AC_ARG_ENABLE([follytestmain],
index 4a9deaa..3bcae86 100644 (file)
 #include <zstd.h>
 #endif
 
+#if FOLLY_HAVE_LIBBZ2
+#include <bzlib.h>
+#endif
+
 #include <folly/Bits.h>
 #include <folly/Conv.h>
 #include <folly/Memory.h>
@@ -285,6 +289,14 @@ prefixToStringLE(T prefix, uint64_t n = sizeof(T)) {
   memcpy(&result[0], &prefix, n);
   return result;
 }
+
+static uint64_t computeBufferLength(
+    uint64_t const compressedLength,
+    uint64_t const blockSize) {
+  uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
+  uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
+  return std::min(goodBufferSize, kMaxBufferLength);
+}
 } // namespace
 
 #if FOLLY_HAVE_LIBLZ4
@@ -969,13 +981,6 @@ std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
   return out;
 }
 
-static uint64_t computeBufferLength(uint64_t const compressedLength) {
-  constexpr uint64_t kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
-  constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB
-  const uint64_t goodBufferSize = 4 * std::max(kBlockSize, compressedLength);
-  return std::min(goodBufferSize, kMaxBufferLength);
-}
-
 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
                                                uint64_t uncompressedLength) {
   z_stream stream;
@@ -1009,8 +1014,9 @@ std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
 
   // Max 64MiB in one go
   constexpr uint64_t maxSingleStepLength = uint64_t(64) << 20; // 64MiB
+  constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB
   const uint64_t defaultBufferLength =
-      computeBufferLength(data->computeChainDataLength());
+      computeBufferLength(data->computeChainDataLength(), kBlockSize);
 
   auto out = addOutputBuffer(
       &stream,
@@ -1551,6 +1557,212 @@ std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(
 
 #endif  // FOLLY_HAVE_LIBZSTD
 
+#if FOLLY_HAVE_LIBBZ2
+
+class Bzip2Codec final : public Codec {
+ public:
+  static std::unique_ptr<Codec> create(int level, CodecType type);
+  explicit Bzip2Codec(int level, CodecType type);
+
+  std::vector<std::string> validPrefixes() const override;
+  bool canUncompress(IOBuf const* data, uint64_t uncompressedLength)
+      const override;
+
+ private:
+  std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
+  std::unique_ptr<IOBuf> doUncompress(
+      IOBuf const* data,
+      uint64_t uncompressedLength) override;
+
+  int level_;
+};
+
+/* static */ std::unique_ptr<Codec> Bzip2Codec::create(
+    int level,
+    CodecType type) {
+  return make_unique<Bzip2Codec>(level, type);
+}
+
+Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
+  DCHECK(type == CodecType::BZIP2);
+  switch (level) {
+    case COMPRESSION_LEVEL_FASTEST:
+      level = 1;
+      break;
+    case COMPRESSION_LEVEL_DEFAULT:
+      level = 9;
+      break;
+    case COMPRESSION_LEVEL_BEST:
+      level = 9;
+      break;
+  }
+  if (level < 1 || level > 9) {
+    throw std::invalid_argument(
+        to<std::string>("Bzip2: invalid level: ", level));
+  }
+  level_ = level;
+}
+
+static uint32_t constexpr kBzip2MagicLE = 0x685a42;
+static uint64_t constexpr kBzip2MagicBytes = 3;
+
+std::vector<std::string> Bzip2Codec::validPrefixes() const {
+  return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
+}
+
+bool Bzip2Codec::canUncompress(IOBuf const* data, uint64_t) const {
+  return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
+}
+
+static bz_stream createBzStream() {
+  bz_stream stream;
+  stream.bzalloc = nullptr;
+  stream.bzfree = nullptr;
+  stream.opaque = nullptr;
+  stream.next_in = stream.next_out = nullptr;
+  stream.avail_in = stream.avail_out = 0;
+  return stream;
+}
+
+// Throws on error condition, otherwise returns the code.
+static int bzCheck(int const rc) {
+  switch (rc) {
+    case BZ_OK:
+    case BZ_RUN_OK:
+    case BZ_FLUSH_OK:
+    case BZ_FINISH_OK:
+    case BZ_STREAM_END:
+      return rc;
+    default:
+      throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
+  }
+}
+
+static uint64_t bzCompressBound(uint64_t const uncompressedLength) {
+  // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
+  //   To guarantee that the compressed data will fit in its buffer, allocate an
+  //   output buffer of size 1% larger than the uncompressed data, plus six
+  //   hundred extra bytes.
+  return uncompressedLength + uncompressedLength / 100 + 600;
+}
+
+static std::unique_ptr<IOBuf> addOutputBuffer(
+    bz_stream* stream,
+    uint64_t const bufferLength) {
+  DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
+  DCHECK_EQ(stream->avail_out, 0);
+
+  auto buf = IOBuf::create(bufferLength);
+  buf->append(buf->capacity());
+
+  stream->next_out = reinterpret_cast<char*>(buf->writableData());
+  stream->avail_out = buf->length();
+
+  return buf;
+}
+
+std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
+  bz_stream stream = createBzStream();
+  bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
+  SCOPE_EXIT {
+    bzCheck(BZ2_bzCompressEnd(&stream));
+  };
+
+  uint64_t const uncompressedLength = data->computeChainDataLength();
+  uint64_t const maxCompressedLength = bzCompressBound(uncompressedLength);
+  uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
+  uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
+
+  auto out = addOutputBuffer(
+      &stream,
+      maxCompressedLength <= kMaxSingleStepLength ? maxCompressedLength
+                                                  : kDefaultBufferLength);
+
+  for (auto range : *data) {
+    while (!range.empty()) {
+      auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
+      stream.next_in =
+          const_cast<char*>(reinterpret_cast<char const*>(range.data()));
+      stream.avail_in = inSize;
+
+      if (stream.avail_out == 0) {
+        out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
+      }
+
+      bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
+      range.uncheckedAdvance(inSize - stream.avail_in);
+    }
+  }
+  do {
+    if (stream.avail_out == 0) {
+      out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
+    }
+  } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
+
+  out->prev()->trimEnd(stream.avail_out);
+
+  return out;
+}
+
+std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
+    const IOBuf* data,
+    uint64_t uncompressedLength) {
+  bz_stream stream = createBzStream();
+  bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
+  SCOPE_EXIT {
+    bzCheck(BZ2_bzDecompressEnd(&stream));
+  };
+
+  uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
+  uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
+  uint64_t const kDefaultBufferLength =
+      computeBufferLength(data->computeChainDataLength(), kBlockSize);
+
+  auto out = addOutputBuffer(
+      &stream,
+      ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+        uncompressedLength <= kMaxSingleStepLength)
+           ? uncompressedLength
+           : kDefaultBufferLength));
+
+  int rc = BZ_OK;
+  for (auto range : *data) {
+    while (!range.empty()) {
+      auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
+      stream.next_in =
+          const_cast<char*>(reinterpret_cast<char const*>(range.data()));
+      stream.avail_in = inSize;
+
+      if (stream.avail_out == 0) {
+        out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
+      }
+
+      rc = bzCheck(BZ2_bzDecompress(&stream));
+      range.uncheckedAdvance(inSize - stream.avail_in);
+    }
+  }
+  while (rc != BZ_STREAM_END) {
+    if (stream.avail_out == 0) {
+      out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
+    }
+
+    rc = bzCheck(BZ2_bzDecompress(&stream));
+  }
+
+  out->prev()->trimEnd(stream.avail_out);
+
+  uint64_t const totalOut =
+      (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
+  if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+      uncompressedLength != totalOut) {
+    throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
+  }
+
+  return out;
+}
+
+#endif // FOLLY_HAVE_LIBBZ2
+
 /**
  * Automatic decompression
  */
@@ -1630,6 +1842,7 @@ AutomaticCodec::AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs)
   addCodecIfSupported(CodecType::ZLIB);
   addCodecIfSupported(CodecType::GZIP);
   addCodecIfSupported(CodecType::LZMA2);
+  addCodecIfSupported(CodecType::BZIP2);
   if (kIsDebug) {
     checkCompatibleCodecs();
   }
@@ -1767,6 +1980,12 @@ static constexpr CodecFactory
 #else
         nullptr,
 #endif
+
+#if FOLLY_HAVE_LIBBZ2
+        Bzip2Codec::create,
+#else
+        nullptr
+#endif
 };
 
 bool hasCodec(CodecType type) {
index c46c616..99963c7 100644 (file)
@@ -93,7 +93,13 @@ enum class CodecType {
    */
   LZ4_FRAME = 10,
 
-  NUM_CODEC_TYPES = 11,
+  /**
+   * Use bzip2 compression.
+   * Levels supported: 1 = fast, 9 = best; default = 9
+   */
+  BZIP2 = 11,
+
+  NUM_CODEC_TYPES = 12,
 };
 
 class Codec {
@@ -230,7 +236,7 @@ std::unique_ptr<Codec> getCodec(CodecType type,
 
 /**
  * Returns a codec that can uncompress any of the given codec types as well as
- * {LZ4_FRAME, ZSTD, ZLIB, GZIP, LZMA2}. Appends each default codec to
+ * {LZ4_FRAME, ZSTD, ZLIB, GZIP, LZMA2, BZIP2}. Appends each default codec to
  * customCodecs in order, so long as a codec with the same type() isn't already
  * present. When uncompress() is called, each codec's canUncompress() is called
  * in the order that they are given. Appended default codecs are checked last.
index 197d50f..f378ab4 100644 (file)
@@ -161,6 +161,7 @@ TEST(CompressionTestNeedsUncompressedLength, Simple) {
       { CodecType::ZSTD, false },
       { CodecType::GZIP, false },
       { CodecType::LZ4_FRAME, false },
+      { CodecType::BZIP2, false },
     };
 
   for (auto const& test : expectations) {
@@ -396,6 +397,7 @@ INSTANTIATE_TEST_CASE_P(
             CodecType::LZMA2,
             CodecType::ZSTD,
             CodecType::LZ4_FRAME,
+            CodecType::BZIP2,
         })));
 
 class AutomaticCodecTest : public testing::TestWithParam<CodecType> {
@@ -584,7 +586,8 @@ INSTANTIATE_TEST_CASE_P(
         CodecType::ZSTD,
         CodecType::ZLIB,
         CodecType::GZIP,
-        CodecType::LZMA2));
+        CodecType::LZMA2,
+        CodecType::BZIP2));
 
 TEST(ValidPrefixesTest, CustomCodec) {
   std::vector<std::unique_ptr<Codec>> codecs;