Add bzip2 support
[folly.git] / folly / io / Compression.cpp
index 4a9deaaf554737c921562dfd687de6c81bc219c5..3bcae86193e3beff7717f19b78e155ac2185fc5d 100644 (file)
 #include <zstd.h>
 #endif
 
+#if FOLLY_HAVE_LIBBZ2
+#include <bzlib.h>
+#endif
+
 #include <folly/Bits.h>
 #include <folly/Conv.h>
 #include <folly/Memory.h>
@@ -285,6 +289,14 @@ prefixToStringLE(T prefix, uint64_t n = sizeof(T)) {
   memcpy(&result[0], &prefix, n);
   return result;
 }
+
+static uint64_t computeBufferLength(
+    uint64_t const compressedLength,
+    uint64_t const blockSize) {
+  uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
+  uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
+  return std::min(goodBufferSize, kMaxBufferLength);
+}
 } // namespace
 
 #if FOLLY_HAVE_LIBLZ4
@@ -969,13 +981,6 @@ std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
   return out;
 }
 
-static uint64_t computeBufferLength(uint64_t const compressedLength) {
-  constexpr uint64_t kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
-  constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB
-  const uint64_t goodBufferSize = 4 * std::max(kBlockSize, compressedLength);
-  return std::min(goodBufferSize, kMaxBufferLength);
-}
-
 std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
                                                uint64_t uncompressedLength) {
   z_stream stream;
@@ -1009,8 +1014,9 @@ std::unique_ptr<IOBuf> ZlibCodec::doUncompress(const IOBuf* data,
 
   // Max 64MiB in one go
   constexpr uint64_t maxSingleStepLength = uint64_t(64) << 20; // 64MiB
+  constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB
   const uint64_t defaultBufferLength =
-      computeBufferLength(data->computeChainDataLength());
+      computeBufferLength(data->computeChainDataLength(), kBlockSize);
 
   auto out = addOutputBuffer(
       &stream,
@@ -1551,6 +1557,212 @@ std::unique_ptr<IOBuf> ZSTDCodec::doUncompress(
 
 #endif  // FOLLY_HAVE_LIBZSTD
 
+#if FOLLY_HAVE_LIBBZ2
+
+class Bzip2Codec final : public Codec {
+ public:
+  static std::unique_ptr<Codec> create(int level, CodecType type);
+  explicit Bzip2Codec(int level, CodecType type);
+
+  std::vector<std::string> validPrefixes() const override;
+  bool canUncompress(IOBuf const* data, uint64_t uncompressedLength)
+      const override;
+
+ private:
+  std::unique_ptr<IOBuf> doCompress(IOBuf const* data) override;
+  std::unique_ptr<IOBuf> doUncompress(
+      IOBuf const* data,
+      uint64_t uncompressedLength) override;
+
+  int level_;
+};
+
+/* static */ std::unique_ptr<Codec> Bzip2Codec::create(
+    int level,
+    CodecType type) {
+  return make_unique<Bzip2Codec>(level, type);
+}
+
+Bzip2Codec::Bzip2Codec(int level, CodecType type) : Codec(type) {
+  DCHECK(type == CodecType::BZIP2);
+  switch (level) {
+    case COMPRESSION_LEVEL_FASTEST:
+      level = 1;
+      break;
+    case COMPRESSION_LEVEL_DEFAULT:
+      level = 9;
+      break;
+    case COMPRESSION_LEVEL_BEST:
+      level = 9;
+      break;
+  }
+  if (level < 1 || level > 9) {
+    throw std::invalid_argument(
+        to<std::string>("Bzip2: invalid level: ", level));
+  }
+  level_ = level;
+}
+
+static uint32_t constexpr kBzip2MagicLE = 0x685a42;
+static uint64_t constexpr kBzip2MagicBytes = 3;
+
+std::vector<std::string> Bzip2Codec::validPrefixes() const {
+  return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
+}
+
+bool Bzip2Codec::canUncompress(IOBuf const* data, uint64_t) const {
+  return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
+}
+
+static bz_stream createBzStream() {
+  bz_stream stream;
+  stream.bzalloc = nullptr;
+  stream.bzfree = nullptr;
+  stream.opaque = nullptr;
+  stream.next_in = stream.next_out = nullptr;
+  stream.avail_in = stream.avail_out = 0;
+  return stream;
+}
+
+// Throws on error condition, otherwise returns the code.
+static int bzCheck(int const rc) {
+  switch (rc) {
+    case BZ_OK:
+    case BZ_RUN_OK:
+    case BZ_FLUSH_OK:
+    case BZ_FINISH_OK:
+    case BZ_STREAM_END:
+      return rc;
+    default:
+      throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
+  }
+}
+
+static uint64_t bzCompressBound(uint64_t const uncompressedLength) {
+  // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
+  //   To guarantee that the compressed data will fit in its buffer, allocate an
+  //   output buffer of size 1% larger than the uncompressed data, plus six
+  //   hundred extra bytes.
+  return uncompressedLength + uncompressedLength / 100 + 600;
+}
+
+static std::unique_ptr<IOBuf> addOutputBuffer(
+    bz_stream* stream,
+    uint64_t const bufferLength) {
+  DCHECK_LE(bufferLength, std::numeric_limits<unsigned>::max());
+  DCHECK_EQ(stream->avail_out, 0);
+
+  auto buf = IOBuf::create(bufferLength);
+  buf->append(buf->capacity());
+
+  stream->next_out = reinterpret_cast<char*>(buf->writableData());
+  stream->avail_out = buf->length();
+
+  return buf;
+}
+
+std::unique_ptr<IOBuf> Bzip2Codec::doCompress(IOBuf const* data) {
+  bz_stream stream = createBzStream();
+  bzCheck(BZ2_bzCompressInit(&stream, level_, 0, 0));
+  SCOPE_EXIT {
+    bzCheck(BZ2_bzCompressEnd(&stream));
+  };
+
+  uint64_t const uncompressedLength = data->computeChainDataLength();
+  uint64_t const maxCompressedLength = bzCompressBound(uncompressedLength);
+  uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
+  uint64_t constexpr kDefaultBufferLength = uint64_t(4) << 20;
+
+  auto out = addOutputBuffer(
+      &stream,
+      maxCompressedLength <= kMaxSingleStepLength ? maxCompressedLength
+                                                  : kDefaultBufferLength);
+
+  for (auto range : *data) {
+    while (!range.empty()) {
+      auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
+      stream.next_in =
+          const_cast<char*>(reinterpret_cast<char const*>(range.data()));
+      stream.avail_in = inSize;
+
+      if (stream.avail_out == 0) {
+        out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
+      }
+
+      bzCheck(BZ2_bzCompress(&stream, BZ_RUN));
+      range.uncheckedAdvance(inSize - stream.avail_in);
+    }
+  }
+  do {
+    if (stream.avail_out == 0) {
+      out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
+    }
+  } while (bzCheck(BZ2_bzCompress(&stream, BZ_FINISH)) != BZ_STREAM_END);
+
+  out->prev()->trimEnd(stream.avail_out);
+
+  return out;
+}
+
+std::unique_ptr<IOBuf> Bzip2Codec::doUncompress(
+    const IOBuf* data,
+    uint64_t uncompressedLength) {
+  bz_stream stream = createBzStream();
+  bzCheck(BZ2_bzDecompressInit(&stream, 0, 0));
+  SCOPE_EXIT {
+    bzCheck(BZ2_bzDecompressEnd(&stream));
+  };
+
+  uint64_t constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MiB
+  uint64_t const kBlockSize = uint64_t(100) << 10; // 100 KiB
+  uint64_t const kDefaultBufferLength =
+      computeBufferLength(data->computeChainDataLength(), kBlockSize);
+
+  auto out = addOutputBuffer(
+      &stream,
+      ((uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+        uncompressedLength <= kMaxSingleStepLength)
+           ? uncompressedLength
+           : kDefaultBufferLength));
+
+  int rc = BZ_OK;
+  for (auto range : *data) {
+    while (!range.empty()) {
+      auto const inSize = std::min<size_t>(range.size(), kMaxSingleStepLength);
+      stream.next_in =
+          const_cast<char*>(reinterpret_cast<char const*>(range.data()));
+      stream.avail_in = inSize;
+
+      if (stream.avail_out == 0) {
+        out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
+      }
+
+      rc = bzCheck(BZ2_bzDecompress(&stream));
+      range.uncheckedAdvance(inSize - stream.avail_in);
+    }
+  }
+  while (rc != BZ_STREAM_END) {
+    if (stream.avail_out == 0) {
+      out->prependChain(addOutputBuffer(&stream, kDefaultBufferLength));
+    }
+
+    rc = bzCheck(BZ2_bzDecompress(&stream));
+  }
+
+  out->prev()->trimEnd(stream.avail_out);
+
+  uint64_t const totalOut =
+      (uint64_t(stream.total_out_hi32) << 32) + stream.total_out_lo32;
+  if (uncompressedLength != UNKNOWN_UNCOMPRESSED_LENGTH &&
+      uncompressedLength != totalOut) {
+    throw std::runtime_error("Bzip2 error: Invalid uncompressed length");
+  }
+
+  return out;
+}
+
+#endif // FOLLY_HAVE_LIBBZ2
+
 /**
  * Automatic decompression
  */
@@ -1630,6 +1842,7 @@ AutomaticCodec::AutomaticCodec(std::vector<std::unique_ptr<Codec>> customCodecs)
   addCodecIfSupported(CodecType::ZLIB);
   addCodecIfSupported(CodecType::GZIP);
   addCodecIfSupported(CodecType::LZMA2);
+  addCodecIfSupported(CodecType::BZIP2);
   if (kIsDebug) {
     checkCompatibleCodecs();
   }
@@ -1767,6 +1980,12 @@ static constexpr CodecFactory
 #else
         nullptr,
 #endif
+
+#if FOLLY_HAVE_LIBBZ2
+        Bzip2Codec::create,
+#else
+        nullptr
+#endif
 };
 
 bool hasCodec(CodecType type) {