LineBasedFrameDecoder
authorDave Watson <davejwatson@fb.com>
Thu, 9 Apr 2015 17:00:48 +0000 (10:00 -0700)
committerViswanath Sivakumar <viswanath@fb.com>
Fri, 10 Apr 2015 03:35:13 +0000 (20:35 -0700)
Summary: Copy of netty's line based decoder.

Test Plan:
unittests
fbconfig folly/wangle/codec; fbmake runtests

Reviewed By: hans@fb.com

Subscribers: doug, fugalh, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D1959155

Signature: t1:1959155:1427935150:e11280c5567df9ad9964dbb656aa090267856f57

folly/wangle/codec/CodecTest.cpp
folly/wangle/codec/LineBasedFrameDecoder.cpp [new file with mode: 0644]
folly/wangle/codec/LineBasedFrameDecoder.h [new file with mode: 0644]

index df2c03e1ce921f48274c6d30a15f55e7beb1c4a6..7ba020435737b8aaad4700e0bee5150af2a43f0e 100644 (file)
@@ -16,6 +16,7 @@
 #include <gtest/gtest.h>
 
 #include <folly/wangle/codec/FixedLengthFrameDecoder.h>
+#include <folly/wangle/codec/LineBasedFrameDecoder.h>
 #include <folly/wangle/codec/LengthFieldBasedFrameDecoder.h>
 #include <folly/wangle/codec/LengthFieldPrepender.h>
 
@@ -32,6 +33,10 @@ class FrameTester
   void read(Context* ctx, IOBufQueue& q) {
     test_(q.move());
   }
+
+  void readException(Context* ctx, exception_wrapper w) {
+    test_(nullptr);
+  }
  private:
   std::function<void(std::unique_ptr<IOBuf>)> test_;
 };
@@ -348,3 +353,205 @@ TEST(CodecTest, LengthFieldFrameDecoderStripPrePostHeaderFrameInclHeader) {
   pipeline.read(q);
   EXPECT_EQ(called, 1);
 }
+
+TEST(CodecTest, LineBasedFrameDecoder) {
+  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  int called = 0;
+
+  pipeline
+    .addBack(LineBasedFrameDecoder(10))
+    .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+        auto sz = buf->computeChainDataLength();
+        called++;
+        EXPECT_EQ(sz, 3);
+      }))
+    .finalize();
+
+  auto buf = IOBuf::create(3);
+  buf->append(3);
+
+  IOBufQueue q(IOBufQueue::cacheChainLength());
+
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 0);
+
+  buf = IOBuf::create(1);
+  buf->append(1);
+  RWPrivateCursor c(buf.get());
+  c.write<char>('\n');
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 1);
+
+  buf = IOBuf::create(4);
+  buf->append(4);
+  RWPrivateCursor c1(buf.get());
+  c1.write(' ');
+  c1.write(' ');
+  c1.write(' ');
+
+  c1.write('\r');
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 1);
+
+  buf = IOBuf::create(1);
+  buf->append(1);
+  RWPrivateCursor c2(buf.get());
+  c2.write('\n');
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 2);
+}
+
+TEST(CodecTest, LineBasedFrameDecoderSaveDelimiter) {
+  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  int called = 0;
+
+  pipeline
+    .addBack(LineBasedFrameDecoder(10, false))
+    .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+        auto sz = buf->computeChainDataLength();
+        called++;
+        EXPECT_EQ(sz, 4);
+      }))
+    .finalize();
+
+  auto buf = IOBuf::create(3);
+  buf->append(3);
+
+  IOBufQueue q(IOBufQueue::cacheChainLength());
+
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 0);
+
+  buf = IOBuf::create(1);
+  buf->append(1);
+  RWPrivateCursor c(buf.get());
+  c.write<char>('\n');
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 1);
+
+  buf = IOBuf::create(3);
+  buf->append(3);
+  RWPrivateCursor c1(buf.get());
+  c1.write(' ');
+  c1.write(' ');
+  c1.write('\r');
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 1);
+
+  buf = IOBuf::create(1);
+  buf->append(1);
+  RWPrivateCursor c2(buf.get());
+  c2.write('\n');
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 2);
+}
+
+TEST(CodecTest, LineBasedFrameDecoderFail) {
+  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  int called = 0;
+
+  pipeline
+    .addBack(LineBasedFrameDecoder(10))
+    .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+        called++;
+      }))
+    .finalize();
+
+  auto buf = IOBuf::create(11);
+  buf->append(11);
+
+  IOBufQueue q(IOBufQueue::cacheChainLength());
+
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 1);
+
+  buf = IOBuf::create(1);
+  buf->append(1);
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 1);
+
+  buf = IOBuf::create(2);
+  buf->append(2);
+  RWPrivateCursor c(buf.get());
+  c.write(' ');
+  c.write<char>('\n');
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 1);
+
+  buf = IOBuf::create(12);
+  buf->append(12);
+  RWPrivateCursor c2(buf.get());
+  for (int i = 0; i < 11; i++) {
+    c2.write(' ');
+  }
+  c2.write<char>('\n');
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 2);
+}
+
+TEST(CodecTest, LineBasedFrameDecoderNewLineOnly) {
+  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  int called = 0;
+
+  pipeline
+    .addBack(LineBasedFrameDecoder(
+               10, true, LineBasedFrameDecoder::TerminatorType::NEWLINE))
+    .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+        auto sz = buf->computeChainDataLength();
+        called++;
+        EXPECT_EQ(sz, 1);
+      }))
+    .finalize();
+
+  auto buf = IOBuf::create(2);
+  buf->append(2);
+  RWPrivateCursor c(buf.get());
+  c.write<char>('\r');
+  c.write<char>('\n');
+
+  IOBufQueue q(IOBufQueue::cacheChainLength());
+
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 1);
+}
+
+TEST(CodecTest, LineBasedFrameDecoderCarriageNewLineOnly) {
+  ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> pipeline;
+  int called = 0;
+
+  pipeline
+    .addBack(LineBasedFrameDecoder(
+              10, true, LineBasedFrameDecoder::TerminatorType::CARRIAGENEWLINE))
+    .addBack(FrameTester([&](std::unique_ptr<IOBuf> buf) {
+        auto sz = buf->computeChainDataLength();
+        called++;
+        EXPECT_EQ(sz, 1);
+      }))
+    .finalize();
+
+  auto buf = IOBuf::create(3);
+  buf->append(3);
+  RWPrivateCursor c(buf.get());
+  c.write<char>('\n');
+  c.write<char>('\r');
+  c.write<char>('\n');
+
+  IOBufQueue q(IOBufQueue::cacheChainLength());
+
+  q.append(std::move(buf));
+  pipeline.read(q);
+  EXPECT_EQ(called, 1);
+}
diff --git a/folly/wangle/codec/LineBasedFrameDecoder.cpp b/folly/wangle/codec/LineBasedFrameDecoder.cpp
new file mode 100644 (file)
index 0000000..ab0bb07
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/wangle/codec/LineBasedFrameDecoder.h>
+
+namespace folly { namespace wangle {
+
+using folly::io::Cursor;
+
+LineBasedFrameDecoder::LineBasedFrameDecoder(uint32_t maxLength,
+                                             bool stripDelimiter,
+                                             TerminatorType terminatorType)
+    : maxLength_(maxLength)
+    , stripDelimiter_(stripDelimiter)
+    , terminatorType_(terminatorType) {}
+
+std::unique_ptr<IOBuf> LineBasedFrameDecoder::decode(
+  Context* ctx, IOBufQueue& buf, size_t&) {
+  int64_t eol = findEndOfLine(buf);
+
+  if (!discarding_) {
+    if (eol >= 0) {
+      Cursor c(buf.front());
+      c += eol;
+      auto delimLength = c.read<char>() == '\r' ? 2 : 1;
+      if (eol > maxLength_) {
+        buf.split(eol + delimLength);
+        fail(ctx, folly::to<std::string>(eol));
+        return nullptr;
+      }
+
+      std::unique_ptr<folly::IOBuf> frame;
+
+      if (stripDelimiter_) {
+        frame = buf.split(eol);
+        buf.trimStart(delimLength);
+      } else {
+        frame = buf.split(eol + delimLength);
+      }
+
+      return std::move(frame);
+    } else {
+      auto len = buf.chainLength();
+      if (len > maxLength_) {
+        discardedBytes_ = len;
+        buf.trimStart(len);
+        discarding_ = true;
+        fail(ctx, "over " + folly::to<std::string>(len));
+      }
+      return nullptr;
+    }
+  } else {
+    if (eol >= 0) {
+      Cursor c(buf.front());
+      c += eol;
+      auto delimLength = c.read<char>() == '\r' ? 2 : 1;
+      buf.trimStart(eol + delimLength);
+      discardedBytes_ = 0;
+      discarding_ = false;
+    } else {
+      discardedBytes_ = buf.chainLength();
+      buf.move();
+    }
+
+    return nullptr;
+  }
+}
+
+void LineBasedFrameDecoder::fail(Context* ctx, std::string len) {
+  ctx->fireReadException(
+    folly::make_exception_wrapper<std::runtime_error>(
+      "frame length" + len +
+      " exeeds max " + folly::to<std::string>(maxLength_)));
+}
+
+int64_t LineBasedFrameDecoder::findEndOfLine(IOBufQueue& buf) {
+  Cursor c(buf.front());
+  for (uint32_t i = 0; i < maxLength_ && i < buf.chainLength(); i++) {
+    auto b = c.read<char>();
+    if (b == '\n' && terminatorType_ != TerminatorType::CARRIAGENEWLINE) {
+      return i;
+    } else if (terminatorType_ != TerminatorType::NEWLINE &&
+               b == '\r' && !c.isAtEnd() && c.read<char>() == '\n') {
+      return i;
+    }
+  }
+
+  return -1;
+}
+
+}} // namespace
diff --git a/folly/wangle/codec/LineBasedFrameDecoder.h b/folly/wangle/codec/LineBasedFrameDecoder.h
new file mode 100644 (file)
index 0000000..5ae9433
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/wangle/codec/ByteToMessageCodec.h>
+#include <folly/io/Cursor.h>
+
+namespace folly { namespace wangle {
+
+/**
+ * A decoder that splits the received IOBufQueue on line endings.
+ *
+ * Both "\n" and "\r\n" are handled, or optionally reqire only
+ * one or the other.
+ */
+class LineBasedFrameDecoder : public ByteToMessageCodec {
+ public:
+  enum class TerminatorType {
+    BOTH,
+    NEWLINE,
+    CARRIAGENEWLINE
+  };
+
+  LineBasedFrameDecoder(uint32_t maxLength = UINT_MAX,
+                        bool stripDelimiter = true,
+                        TerminatorType terminatorType =
+                        TerminatorType::BOTH);
+
+  std::unique_ptr<IOBuf> decode(Context* ctx, IOBufQueue& buf, size_t&);
+
+ private:
+
+  int64_t findEndOfLine(IOBufQueue& buf);
+
+  void fail(Context* ctx, std::string len);
+
+  uint32_t maxLength_;
+  bool stripDelimiter_;
+
+  bool discarding_{false};
+  uint32_t discardedBytes_{0};
+
+  TerminatorType terminatorType_;
+};
+
+}} // namespace