folly AsyncPipeReader supports IOBuf
[folly.git] / folly / io / async / test / AsyncPipeTest.cpp
index af74811db954a4878a6f73b36c39e6558fc27f84..2a9c426f7cbb5eaf964c49a654fa21663c9d5d7e 100644 (file)
@@ -27,6 +27,18 @@ namespace {
 
 class TestReadCallback : public folly::AsyncReader::ReadCallback {
  public:
+  bool isBufferMovable() noexcept override {
+    return movable_;
+  }
+  void setMovable(bool movable) {
+    movable_ = movable;
+  }
+
+  void readBufferAvailable(
+      std::unique_ptr<folly::IOBuf> readBuf) noexcept override {
+    readBuffer_.append(std::move(readBuf));
+  }
+
   void readDataAvailable(size_t len) noexcept override {
     readBuffer_.postallocate(len);
   }
@@ -49,8 +61,15 @@ class TestReadCallback : public folly::AsyncReader::ReadCallback {
     return std::string((char *)buf->data(), buf->length());
   }
 
+  void reset() {
+    movable_ = false;
+    error_ = false;
+    readBuffer_.clear();
+  }
+
   folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
   bool error_{false};
+  bool movable_{false};
 };
 
 class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
@@ -61,13 +80,23 @@ class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
     error_ = true;
   }
 
+  void reset() {
+    writes_ = 0;
+    error_ = false;
+  }
+
   uint32_t writes_{0};
   bool error_{false};
 };
 
 class AsyncPipeTest: public Test {
  public:
-  void SetUp() override {
+  void reset(bool movable) {
+    reader_.reset();
+    readCallback_.reset();
+    writer_.reset();
+    writeCallback_.reset();
+
     int rc = pipe(pipeFds_);
     EXPECT_EQ(rc, 0);
 
@@ -77,6 +106,8 @@ class AsyncPipeTest: public Test {
       &eventBase_, pipeFds_[0]);
     writer_ = folly::AsyncPipeWriter::newWriter(
       &eventBase_, pipeFds_[1]);
+
+    readCallback_.setMovable(movable);
   }
 
  protected:
@@ -97,46 +128,55 @@ std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) {
 
 
 TEST_F(AsyncPipeTest, simple) {
-  reader_->setReadCB(&readCallback_);
-  writer_->write(getBuf("hello"), &writeCallback_);
-  writer_->closeOnEmpty();
-  eventBase_.loop();
-  EXPECT_EQ(readCallback_.getData(), "hello");
-  EXPECT_FALSE(readCallback_.error_);
-  EXPECT_EQ(writeCallback_.writes_, 1);
-  EXPECT_FALSE(writeCallback_.error_);
+  for (int pass = 0; pass < 2; ++pass) {
+    reset(pass % 2 != 0);
+    reader_->setReadCB(&readCallback_);
+    writer_->write(getBuf("hello"), &writeCallback_);
+    writer_->closeOnEmpty();
+    eventBase_.loop();
+    EXPECT_EQ(readCallback_.getData(), "hello");
+    EXPECT_FALSE(readCallback_.error_);
+    EXPECT_EQ(writeCallback_.writes_, 1);
+    EXPECT_FALSE(writeCallback_.error_);
+  }
 }
 
 TEST_F(AsyncPipeTest, blocked_writes) {
-  uint32_t writeAttempts = 0;
-  do {
-    ++writeAttempts;
-    writer_->write(getBuf("hello"), &writeCallback_);
-  } while (writeCallback_.writes_ == writeAttempts);
-  // there is one blocked write
-  writer_->closeOnEmpty();
-
-  reader_->setReadCB(&readCallback_);
-
-  eventBase_.loop();
-  std::string expected;
-  for (uint32_t i = 0; i < writeAttempts; i++) {
-    expected += "hello";
+  for (int pass = 0; pass < 2; ++pass) {
+    reset(pass % 2 != 0);
+    uint32_t writeAttempts = 0;
+    do {
+      ++writeAttempts;
+      writer_->write(getBuf("hello"), &writeCallback_);
+    } while (writeCallback_.writes_ == writeAttempts);
+    // there is one blocked write
+    writer_->closeOnEmpty();
+
+    reader_->setReadCB(&readCallback_);
+
+    eventBase_.loop();
+    std::string expected;
+    for (uint32_t i = 0; i < writeAttempts; i++) {
+      expected += "hello";
+    }
+    EXPECT_EQ(readCallback_.getData(), expected);
+    EXPECT_FALSE(readCallback_.error_);
+    EXPECT_EQ(writeCallback_.writes_, writeAttempts);
+    EXPECT_FALSE(writeCallback_.error_);
   }
-  EXPECT_EQ(readCallback_.getData(), expected);
-  EXPECT_FALSE(readCallback_.error_);
-  EXPECT_EQ(writeCallback_.writes_, writeAttempts);
-  EXPECT_FALSE(writeCallback_.error_);
 }
 
 TEST_F(AsyncPipeTest, writeOnClose) {
-  reader_->setReadCB(&readCallback_);
-  writer_->write(getBuf("hello"), &writeCallback_);
-  writer_->closeOnEmpty();
-  writer_->write(getBuf("hello"), &writeCallback_);
-  eventBase_.loop();
-  EXPECT_EQ(readCallback_.getData(), "hello");
-  EXPECT_FALSE(readCallback_.error_);
-  EXPECT_EQ(writeCallback_.writes_, 1);
-  EXPECT_TRUE(writeCallback_.error_);
+  for (int pass = 0; pass < 2; ++pass) {
+    reset(pass % 2 != 0);
+    reader_->setReadCB(&readCallback_);
+    writer_->write(getBuf("hello"), &writeCallback_);
+    writer_->closeOnEmpty();
+    writer_->write(getBuf("hello"), &writeCallback_);
+    eventBase_.loop();
+    EXPECT_EQ(readCallback_.getData(), "hello");
+    EXPECT_FALSE(readCallback_.error_);
+    EXPECT_EQ(writeCallback_.writes_, 1);
+    EXPECT_TRUE(writeCallback_.error_);
+  }
 }