Add zlib/gzip streaming support
authorNick Terrell <terrelln@fb.com>
Wed, 7 Jun 2017 04:28:09 +0000 (21:28 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Wed, 7 Jun 2017 04:35:05 +0000 (21:35 -0700)
Summary:
Add streaming interface to the ZlibCodec.
Implement ZlibStreamCodec::doCompress() and ZlibStreamCodec::doUncompress() using the streaming interface. fbgs CodecType::ZLIB and check that no caller requires thread-safety.
I found one caller, but it was fixed in D5090855.

Reviewed By: yfeldblum

Differential Revision: D5169338

fbshipit-source-id: 40478e162143623ad28fd8bc937d0195521f13fe

folly/io/Compression.cpp

index b523e697d5dcc02303cde019cdc343c46253924c..d014b69abc6e8202a8e1f25c8cca205f8771c1de 100644 (file)
@@ -992,10 +992,12 @@ std::unique_ptr<IOBuf> SnappyCodec::doUncompress(
 /**
  * Zlib codec
  */
-class ZlibCodec final : public Codec {
+class ZlibStreamCodec final : public StreamCodec {
  public:
-  static std::unique_ptr<Codec> create(int level, CodecType type);
-  explicit ZlibCodec(int level, CodecType type);
+  static std::unique_ptr<Codec> createCodec(int level, CodecType type);
+  static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
+  explicit ZlibStreamCodec(int level, CodecType type);
+  ~ZlibStreamCodec();
 
   std::vector<std::string> validPrefixes() const override;
   bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
@@ -1003,20 +1005,29 @@ class ZlibCodec final : public Codec {
 
  private:
   uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
-  std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
-  std::unique_ptr<IOBuf> doUncompress(
-      const IOBuf* data,
-      Optional<uint64_t> uncompressedLength) override;
 
-  std::unique_ptr<IOBuf> addOutputBuffer(z_stream* stream, uint32_t length);
-  bool doInflate(z_stream* stream, IOBuf* head, uint32_t bufferLength);
+  void doResetStream() override;
+  bool doCompressStream(
+      ByteRange& input,
+      MutableByteRange& output,
+      StreamCodec::FlushOp flush) override;
+  bool doUncompressStream(
+      ByteRange& input,
+      MutableByteRange& output,
+      StreamCodec::FlushOp flush) override;
 
+  void resetDeflateStream();
+  void resetInflateStream();
+
+  Optional<z_stream> deflateStream_{};
+  Optional<z_stream> inflateStream_{};
   int level_;
+  bool needReset_{true};
 };
 
 static constexpr uint16_t kGZIPMagicLE = 0x8B1F;
 
-std::vector<std::string> ZlibCodec::validPrefixes() const {
+std::vector<std::string> ZlibStreamCodec::validPrefixes() const {
   if (type() == CodecType::ZLIB) {
     // Zlib streams start with a 2 byte header.
     //
@@ -1060,7 +1071,8 @@ std::vector<std::string> ZlibCodec::validPrefixes() const {
   }
 }
 
-bool ZlibCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
+bool ZlibStreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
+    const {
   if (type() == CodecType::ZLIB) {
     uint16_t value;
     Cursor cursor{data};
@@ -1074,80 +1086,68 @@ bool ZlibCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
   }
 }
 
-uint64_t ZlibCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
+uint64_t ZlibStreamCodec::doMaxCompressedLength(
+    uint64_t uncompressedLength) const {
   return deflateBound(nullptr, uncompressedLength);
 }
 
-std::unique_ptr<Codec> ZlibCodec::create(int level, CodecType type) {
-  return std::make_unique<ZlibCodec>(level, type);
+std::unique_ptr<Codec> ZlibStreamCodec::createCodec(int level, CodecType type) {
+  return std::make_unique<ZlibStreamCodec>(level, type);
 }
 
-ZlibCodec::ZlibCodec(int level, CodecType type) : Codec(type) {
+std::unique_ptr<StreamCodec> ZlibStreamCodec::createStream(
+    int level,
+    CodecType type) {
+  return std::make_unique<ZlibStreamCodec>(level, type);
+}
+
+ZlibStreamCodec::ZlibStreamCodec(int level, CodecType type)
+    : StreamCodec(type) {
   DCHECK(type == CodecType::ZLIB || type == CodecType::GZIP);
   switch (level) {
-  case COMPRESSION_LEVEL_FASTEST:
-    level = 1;
-    break;
-  case COMPRESSION_LEVEL_DEFAULT:
-    level = Z_DEFAULT_COMPRESSION;
-    break;
-  case COMPRESSION_LEVEL_BEST:
-    level = 9;
-    break;
+    case COMPRESSION_LEVEL_FASTEST:
+      level = 1;
+      break;
+    case COMPRESSION_LEVEL_DEFAULT:
+      level = Z_DEFAULT_COMPRESSION;
+      break;
+    case COMPRESSION_LEVEL_BEST:
+      level = 9;
+      break;
   }
   if (level != Z_DEFAULT_COMPRESSION && (level < 0 || level > 9)) {
-    throw std::invalid_argument(to<std::string>(
-        "ZlibCodec: invalid level: ", level));
+    throw std::invalid_argument(
+        to<std::string>("ZlibStreamCodec: invalid level: ", level));
   }
   level_ = level;
 }
 
-std::unique_ptr<IOBuf> ZlibCodec::addOutputBuffer(z_stream* stream,
-                                                  uint32_t length) {
-  CHECK_EQ(stream->avail_out, 0);
-
-  auto buf = IOBuf::create(length);
-  buf->append(buf->capacity());
-
-  stream->next_out = buf->writableData();
-  stream->avail_out = buf->length();
-
-  return buf;
-}
-
-bool ZlibCodec::doInflate(z_stream* stream,
-                          IOBuf* head,
-                          uint32_t bufferLength) {
-  if (stream->avail_out == 0) {
-    head->prependChain(addOutputBuffer(stream, bufferLength));
+ZlibStreamCodec::~ZlibStreamCodec() {
+  if (deflateStream_) {
+    deflateEnd(deflateStream_.get_pointer());
+    deflateStream_.clear();
   }
-
-  int rc = inflate(stream, Z_NO_FLUSH);
-
-  switch (rc) {
-  case Z_OK:
-    break;
-  case Z_STREAM_END:
-    return true;
-  case Z_BUF_ERROR:
-  case Z_NEED_DICT:
-  case Z_DATA_ERROR:
-  case Z_MEM_ERROR:
-    throw std::runtime_error(to<std::string>(
-        "ZlibCodec: inflate error: ", rc, ": ", stream->msg));
-  default:
-    CHECK(false) << rc << ": " << stream->msg;
+  if (inflateStream_) {
+    inflateEnd(inflateStream_.get_pointer());
+    inflateStream_.clear();
   }
-
-  return false;
 }
 
-std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
-  z_stream stream;
-  stream.zalloc = nullptr;
-  stream.zfree = nullptr;
-  stream.opaque = nullptr;
+void ZlibStreamCodec::doResetStream() {
+  needReset_ = true;
+}
 
+void ZlibStreamCodec::resetDeflateStream() {
+  if (deflateStream_) {
+    int const rc = deflateReset(deflateStream_.get_pointer());
+    if (rc != Z_OK) {
+      deflateStream_.clear();
+      throw std::runtime_error(
+          to<std::string>("ZlibStreamCodec: deflateReset error: ", rc));
+    }
+    return;
+  }
+  deflateStream_ = z_stream{};
   // Using deflateInit2() to support gzip.  "The windowBits parameter is the
   // base two logarithm of the maximum window size (...) The default value is
   // 15 (...) Add 16 to windowBits to write a simple gzip header and trailer
@@ -1155,167 +1155,132 @@ std::unique_ptr<IOBuf> ZlibCodec::doCompress(const IOBuf* data) {
   // will have no file name, no extra data, no comment, no modification time
   // (set to zero), no header crc, and the operating system will be set to 255
   // (unknown)."
-  int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
+  int const windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
   // All other parameters (method, memLevel, strategy) get default values from
   // the zlib manual.
-  int rc = deflateInit2(&stream,
-                        level_,
-                        Z_DEFLATED,
-                        windowBits,
-                        /* memLevel */ 8,
-                        Z_DEFAULT_STRATEGY);
+  int const rc = deflateInit2(
+      deflateStream_.get_pointer(),
+      level_,
+      Z_DEFLATED,
+      windowBits,
+      /* memLevel */ 8,
+      Z_DEFAULT_STRATEGY);
   if (rc != Z_OK) {
-    throw std::runtime_error(to<std::string>(
-        "ZlibCodec: deflateInit error: ", rc, ": ", stream.msg));
+    deflateStream_.clear();
+    throw std::runtime_error(
+        to<std::string>("ZlibStreamCodec: deflateInit error: ", rc));
   }
+}
 
-  stream.next_in = stream.next_out = nullptr;
-  stream.avail_in = stream.avail_out = 0;
-  stream.total_in = stream.total_out = 0;
-
-  bool success = false;
-
-  SCOPE_EXIT {
-    rc = deflateEnd(&stream);
-    // If we're here because of an exception, it's okay if some data
-    // got dropped.
-    CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
-      << rc << ": " << stream.msg;
-  };
-
-  uint64_t uncompressedLength = data->computeChainDataLength();
-  uint64_t maxCompressedLength = deflateBound(&stream, uncompressedLength);
-
-  // Max 64MiB in one go
-  constexpr uint32_t maxSingleStepLength = uint32_t(64) << 20;    // 64MiB
-  constexpr uint32_t defaultBufferLength = uint32_t(4) << 20;     // 4MiB
-
-  auto out = addOutputBuffer(
-      &stream,
-      (maxCompressedLength <= maxSingleStepLength ?
-       maxCompressedLength :
-       defaultBufferLength));
-
-  for (auto& range : *data) {
-    uint64_t remaining = range.size();
-    uint64_t written = 0;
-    while (remaining) {
-      uint32_t step = (remaining > maxSingleStepLength ?
-                       maxSingleStepLength : remaining);
-      stream.next_in = const_cast<uint8_t*>(range.data() + written);
-      stream.avail_in = step;
-      remaining -= step;
-      written += step;
-
-      while (stream.avail_in != 0) {
-        if (stream.avail_out == 0) {
-          out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
-        }
-
-        rc = deflate(&stream, Z_NO_FLUSH);
-
-        CHECK_EQ(rc, Z_OK) << stream.msg;
-      }
+void ZlibStreamCodec::resetInflateStream() {
+  if (inflateStream_) {
+    int const rc = inflateReset(inflateStream_.get_pointer());
+    if (rc != Z_OK) {
+      inflateStream_.clear();
+      throw std::runtime_error(
+          to<std::string>("ZlibStreamCodec: inflateReset error: ", rc));
     }
+    return;
   }
-
-  do {
-    if (stream.avail_out == 0) {
-      out->prependChain(addOutputBuffer(&stream, defaultBufferLength));
-    }
-
-    rc = deflate(&stream, Z_FINISH);
-  } while (rc == Z_OK);
-
-  CHECK_EQ(rc, Z_STREAM_END) << stream.msg;
-
-  out->prev()->trimEnd(stream.avail_out);
-
-  success = true;  // we survived
-
-  return out;
-}
-
-std::unique_ptr<IOBuf> ZlibCodec::doUncompress(
-    const IOBuf* data,
-    Optional<uint64_t> uncompressedLength) {
-  z_stream stream;
-  stream.zalloc = nullptr;
-  stream.zfree = nullptr;
-  stream.opaque = nullptr;
-
+  inflateStream_ = z_stream{};
   // "The windowBits parameter is the base two logarithm of the maximum window
   // size (...) The default value is 15 (...) add 16 to decode only the gzip
   // format (the zlib format will return a Z_DATA_ERROR)."
-  int windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
-  int rc = inflateInit2(&stream, windowBits);
+  int const windowBits = 15 + (type() == CodecType::GZIP ? 16 : 0);
+  int const rc = inflateInit2(inflateStream_.get_pointer(), windowBits);
   if (rc != Z_OK) {
-    throw std::runtime_error(to<std::string>(
-        "ZlibCodec: inflateInit error: ", rc, ": ", stream.msg));
+    inflateStream_.clear();
+    throw std::runtime_error(
+        to<std::string>("ZlibStreamCodec: inflateInit error: ", rc));
   }
+}
 
-  stream.next_in = stream.next_out = nullptr;
-  stream.avail_in = stream.avail_out = 0;
-  stream.total_in = stream.total_out = 0;
+static int zlibTranslateFlush(StreamCodec::FlushOp flush) {
+  switch (flush) {
+    case StreamCodec::FlushOp::NONE:
+      return Z_NO_FLUSH;
+    case StreamCodec::FlushOp::FLUSH:
+      return Z_SYNC_FLUSH;
+    case StreamCodec::FlushOp::END:
+      return Z_FINISH;
+    default:
+      throw std::invalid_argument("ZlibStreamCodec: Invalid flush");
+  }
+}
 
-  bool success = false;
+static int zlibThrowOnError(int rc) {
+  switch (rc) {
+    case Z_OK:
+    case Z_BUF_ERROR:
+    case Z_STREAM_END:
+      return rc;
+    default:
+      throw std::runtime_error(to<std::string>("ZlibStreamCodec: error: ", rc));
+  }
+}
 
+bool ZlibStreamCodec::doCompressStream(
+    ByteRange& input,
+    MutableByteRange& output,
+    StreamCodec::FlushOp flush) {
+  if (needReset_) {
+    resetDeflateStream();
+    needReset_ = false;
+  }
+  DCHECK(deflateStream_.hasValue());
+  // zlib will return Z_STREAM_ERROR if output.data() is null.
+  if (output.data() == nullptr) {
+    return false;
+  }
+  deflateStream_->next_in = const_cast<uint8_t*>(input.data());
+  deflateStream_->avail_in = input.size();
+  deflateStream_->next_out = output.data();
+  deflateStream_->avail_out = output.size();
   SCOPE_EXIT {
-    rc = inflateEnd(&stream);
-    // If we're here because of an exception, it's okay if some data
-    // got dropped.
-    CHECK(rc == Z_OK || (!success && rc == Z_DATA_ERROR))
-      << rc << ": " << stream.msg;
+    input.uncheckedAdvance(input.size() - deflateStream_->avail_in);
+    output.uncheckedAdvance(output.size() - deflateStream_->avail_out);
   };
-
-  // Max 64MiB in one go
-  constexpr uint64_t maxSingleStepLength = uint64_t(64) << 20; // 64MiB
-  constexpr uint64_t kBlockSize = uint64_t(32) << 10; // 32 KiB
-  const uint64_t defaultBufferLength =
-      computeBufferLength(data->computeChainDataLength(), kBlockSize);
-
-  auto out = addOutputBuffer(
-      &stream,
-      ((uncompressedLength && *uncompressedLength <= maxSingleStepLength)
-           ? *uncompressedLength
-           : defaultBufferLength));
-
-  bool streamEnd = false;
-  for (auto& range : *data) {
-    if (range.empty()) {
-      continue;
-    }
-
-    stream.next_in = const_cast<uint8_t*>(range.data());
-    stream.avail_in = range.size();
-
-    while (stream.avail_in != 0) {
-      if (streamEnd) {
-        throw std::runtime_error(to<std::string>(
-            "ZlibCodec: junk after end of data"));
-      }
-
-      streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
-    }
+  int const rc = zlibThrowOnError(
+      deflate(deflateStream_.get_pointer(), zlibTranslateFlush(flush)));
+  switch (flush) {
+    case StreamCodec::FlushOp::NONE:
+      return false;
+    case StreamCodec::FlushOp::FLUSH:
+      return deflateStream_->avail_in == 0 && deflateStream_->avail_out != 0;
+    case StreamCodec::FlushOp::END:
+      return rc == Z_STREAM_END;
+    default:
+      throw std::invalid_argument("ZlibStreamCodec: Invalid flush");
   }
+}
 
-  while (!streamEnd) {
-    streamEnd = doInflate(&stream, out.get(), defaultBufferLength);
+bool ZlibStreamCodec::doUncompressStream(
+    ByteRange& input,
+    MutableByteRange& output,
+    StreamCodec::FlushOp flush) {
+  if (needReset_) {
+    resetInflateStream();
+    needReset_ = false;
   }
-
-  out->prev()->trimEnd(stream.avail_out);
-
-  if (uncompressedLength && *uncompressedLength != stream.total_out) {
-    throw std::runtime_error(
-        to<std::string>("ZlibCodec: invalid uncompressed length"));
+  DCHECK(inflateStream_.hasValue());
+  // zlib will return Z_STREAM_ERROR if output.data() is null.
+  if (output.data() == nullptr) {
+    return false;
   }
-
-  success = true;  // we survived
-
-  return out;
+  inflateStream_->next_in = const_cast<uint8_t*>(input.data());
+  inflateStream_->avail_in = input.size();
+  inflateStream_->next_out = output.data();
+  inflateStream_->avail_out = output.size();
+  SCOPE_EXIT {
+    input.advance(input.size() - inflateStream_->avail_in);
+    output.advance(output.size() - inflateStream_->avail_out);
+  };
+  int const rc = zlibThrowOnError(
+      inflate(inflateStream_.get_pointer(), zlibTranslateFlush(flush)));
+  return rc == Z_STREAM_END;
 }
 
-#endif  // FOLLY_HAVE_LIBZ
+#endif // FOLLY_HAVE_LIBZ
 
 #if FOLLY_HAVE_LIBLZMA
 
@@ -2249,7 +2214,7 @@ constexpr Factory
 #endif
 
 #if FOLLY_HAVE_LIBZ
-        {ZlibCodec::create, nullptr},
+        {ZlibStreamCodec::createCodec, ZlibStreamCodec::createStream},
 #else
         {},
 #endif
@@ -2275,7 +2240,7 @@ constexpr Factory
 #endif
 
 #if FOLLY_HAVE_LIBZ
-        {ZlibCodec::create, nullptr},
+        {ZlibStreamCodec::createCodec, ZlibStreamCodec::createStream},
 #else
         {},
 #endif