folly AsyncPipeReader supports IOBuf
authorYuri Putivsky <yuri@fb.com>
Wed, 3 Aug 2016 19:02:26 +0000 (12:02 -0700)
committerFacebook Github Bot 0 <facebook-github-bot-0-bot@fb.com>
Wed, 3 Aug 2016 19:08:46 +0000 (12:08 -0700)
Summary: folly AsyncPipeReader takes a callback of type AsyncReader::ReadCallback. Now AsyncReader::ReadCallback class supports IOBuf as a buffer for transfer read bytes. Need to extend AsyncPipeReader class to support IOBuf as well

Reviewed By: yfeldblum

Differential Revision: D3650893

fbshipit-source-id: e2142341c8b8b0b2ef248c1f13a8caba9d50ba67

folly/io/async/AsyncPipe.cpp
folly/io/async/AsyncTransport.h
folly/io/async/test/AsyncPipeTest.cpp

index e38cc40996eca0ddccb9bc68ff9191110e161611..06f8c91106ffc7a454bff102851d801b9282f19e 100644 (file)
@@ -62,36 +62,57 @@ void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
   assert(readCallback_ != nullptr);
 
   while (readCallback_) {
+    // - What API does callback support?
+    const auto movable = readCallback_->isBufferMovable(); // noexcept
+
     // Get the buffer to read into.
     void* buf = nullptr;
     size_t buflen = 0;
-    try {
-      readCallback_->getReadBuffer(&buf, &buflen);
-    } catch (const std::exception& ex) {
-      AsyncSocketException aex(AsyncSocketException::BAD_ARGS,
-                               string("ReadCallback::getReadBuffer() "
-                                      "threw exception: ") + ex.what());
-      failRead(aex);
-      return;
-    } catch (...) {
-      AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
-                              string("ReadCallback::getReadBuffer() "
-                                     "threw non-exception type"));
-      failRead(ex);
-      return;
-    }
-    if (buf == nullptr || buflen == 0) {
-      AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
-                              string("ReadCallback::getReadBuffer() "
-                                     "returned empty buffer"));
-      failRead(ex);
-      return;
+    std::unique_ptr<IOBuf> ioBuf;
+
+    if (movable) {
+      ioBuf = IOBuf::create(readCallback_->maxBufferSize());
+      buf = ioBuf->writableBuffer();
+      buflen = ioBuf->capacity();
+    } else {
+      try {
+        readCallback_->getReadBuffer(&buf, &buflen);
+      } catch (const std::exception& ex) {
+        AsyncSocketException aex(
+            AsyncSocketException::BAD_ARGS,
+            string("ReadCallback::getReadBuffer() "
+                   "threw exception: ") +
+                ex.what());
+        failRead(aex);
+        return;
+      } catch (...) {
+        AsyncSocketException aex(
+            AsyncSocketException::BAD_ARGS,
+            string("ReadCallback::getReadBuffer() "
+                   "threw non-exception type"));
+        failRead(aex);
+        return;
+      }
+      if (buf == nullptr || buflen == 0) {
+        AsyncSocketException aex(
+            AsyncSocketException::INVALID_STATE,
+            string("ReadCallback::getReadBuffer() "
+                   "returned empty buffer"));
+        failRead(aex);
+        return;
+      }
     }
 
     // Perform the read
     ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
+
     if (bytesRead > 0) {
-      readCallback_->readDataAvailable(bytesRead);
+      if (movable) {
+        ioBuf->append(bytesRead);
+        readCallback_->readBufferAvailable(std::move(ioBuf));
+      } else {
+        readCallback_->readDataAvailable(bytesRead);
+      }
       // Fall through and continue around the loop if the read
       // completely filled the available buffer.
       // Note that readCallback_ may have been uninstalled or changed inside
index 632a782b5f09c3a9a7c082fdbd427a61d50c9241..a1023b2cca4c1c407e5877557e920a1d8b37a0ad 100644 (file)
@@ -471,6 +471,15 @@ class AsyncReader {
       return false;
     }
 
+    /**
+     * Suggested buffer size, allocated for read operations,
+     * if callback is movable and supports folly::IOBuf
+     */
+
+    virtual size_t maxBufferSize() const {
+      return 64 * 1024; // 64K
+    }
+
     /**
      * readBufferAvailable() will be invoked when data has been successfully
      * read.
index af74811db954a4878a6f73b36c39e6558fc27f84..2a9c426f7cbb5eaf964c49a654fa21663c9d5d7e 100644 (file)
@@ -27,6 +27,18 @@ namespace {
 
 class TestReadCallback : public folly::AsyncReader::ReadCallback {
  public:
+  bool isBufferMovable() noexcept override {
+    return movable_;
+  }
+  void setMovable(bool movable) {
+    movable_ = movable;
+  }
+
+  void readBufferAvailable(
+      std::unique_ptr<folly::IOBuf> readBuf) noexcept override {
+    readBuffer_.append(std::move(readBuf));
+  }
+
   void readDataAvailable(size_t len) noexcept override {
     readBuffer_.postallocate(len);
   }
@@ -49,8 +61,15 @@ class TestReadCallback : public folly::AsyncReader::ReadCallback {
     return std::string((char *)buf->data(), buf->length());
   }
 
+  void reset() {
+    movable_ = false;
+    error_ = false;
+    readBuffer_.clear();
+  }
+
   folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
   bool error_{false};
+  bool movable_{false};
 };
 
 class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
@@ -61,13 +80,23 @@ class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
     error_ = true;
   }
 
+  void reset() {
+    writes_ = 0;
+    error_ = false;
+  }
+
   uint32_t writes_{0};
   bool error_{false};
 };
 
 class AsyncPipeTest: public Test {
  public:
-  void SetUp() override {
+  void reset(bool movable) {
+    reader_.reset();
+    readCallback_.reset();
+    writer_.reset();
+    writeCallback_.reset();
+
     int rc = pipe(pipeFds_);
     EXPECT_EQ(rc, 0);
 
@@ -77,6 +106,8 @@ class AsyncPipeTest: public Test {
       &eventBase_, pipeFds_[0]);
     writer_ = folly::AsyncPipeWriter::newWriter(
       &eventBase_, pipeFds_[1]);
+
+    readCallback_.setMovable(movable);
   }
 
  protected:
@@ -97,46 +128,55 @@ std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) {
 
 
 TEST_F(AsyncPipeTest, simple) {
-  reader_->setReadCB(&readCallback_);
-  writer_->write(getBuf("hello"), &writeCallback_);
-  writer_->closeOnEmpty();
-  eventBase_.loop();
-  EXPECT_EQ(readCallback_.getData(), "hello");
-  EXPECT_FALSE(readCallback_.error_);
-  EXPECT_EQ(writeCallback_.writes_, 1);
-  EXPECT_FALSE(writeCallback_.error_);
+  for (int pass = 0; pass < 2; ++pass) {
+    reset(pass % 2 != 0);
+    reader_->setReadCB(&readCallback_);
+    writer_->write(getBuf("hello"), &writeCallback_);
+    writer_->closeOnEmpty();
+    eventBase_.loop();
+    EXPECT_EQ(readCallback_.getData(), "hello");
+    EXPECT_FALSE(readCallback_.error_);
+    EXPECT_EQ(writeCallback_.writes_, 1);
+    EXPECT_FALSE(writeCallback_.error_);
+  }
 }
 
 TEST_F(AsyncPipeTest, blocked_writes) {
-  uint32_t writeAttempts = 0;
-  do {
-    ++writeAttempts;
-    writer_->write(getBuf("hello"), &writeCallback_);
-  } while (writeCallback_.writes_ == writeAttempts);
-  // there is one blocked write
-  writer_->closeOnEmpty();
-
-  reader_->setReadCB(&readCallback_);
-
-  eventBase_.loop();
-  std::string expected;
-  for (uint32_t i = 0; i < writeAttempts; i++) {
-    expected += "hello";
+  for (int pass = 0; pass < 2; ++pass) {
+    reset(pass % 2 != 0);
+    uint32_t writeAttempts = 0;
+    do {
+      ++writeAttempts;
+      writer_->write(getBuf("hello"), &writeCallback_);
+    } while (writeCallback_.writes_ == writeAttempts);
+    // there is one blocked write
+    writer_->closeOnEmpty();
+
+    reader_->setReadCB(&readCallback_);
+
+    eventBase_.loop();
+    std::string expected;
+    for (uint32_t i = 0; i < writeAttempts; i++) {
+      expected += "hello";
+    }
+    EXPECT_EQ(readCallback_.getData(), expected);
+    EXPECT_FALSE(readCallback_.error_);
+    EXPECT_EQ(writeCallback_.writes_, writeAttempts);
+    EXPECT_FALSE(writeCallback_.error_);
   }
-  EXPECT_EQ(readCallback_.getData(), expected);
-  EXPECT_FALSE(readCallback_.error_);
-  EXPECT_EQ(writeCallback_.writes_, writeAttempts);
-  EXPECT_FALSE(writeCallback_.error_);
 }
 
 TEST_F(AsyncPipeTest, writeOnClose) {
-  reader_->setReadCB(&readCallback_);
-  writer_->write(getBuf("hello"), &writeCallback_);
-  writer_->closeOnEmpty();
-  writer_->write(getBuf("hello"), &writeCallback_);
-  eventBase_.loop();
-  EXPECT_EQ(readCallback_.getData(), "hello");
-  EXPECT_FALSE(readCallback_.error_);
-  EXPECT_EQ(writeCallback_.writes_, 1);
-  EXPECT_TRUE(writeCallback_.error_);
+  for (int pass = 0; pass < 2; ++pass) {
+    reset(pass % 2 != 0);
+    reader_->setReadCB(&readCallback_);
+    writer_->write(getBuf("hello"), &writeCallback_);
+    writer_->closeOnEmpty();
+    writer_->write(getBuf("hello"), &writeCallback_);
+    eventBase_.loop();
+    EXPECT_EQ(readCallback_.getData(), "hello");
+    EXPECT_FALSE(readCallback_.error_);
+    EXPECT_EQ(writeCallback_.writes_, 1);
+    EXPECT_TRUE(writeCallback_.error_);
+  }
 }