Open source AsyncPipe
authorAndre Pinto <aap@fb.com>
Tue, 29 Sep 2015 21:13:54 +0000 (14:13 -0700)
committerfacebook-github-bot-4 <folly-bot@fb.com>
Tue, 29 Sep 2015 21:20:20 +0000 (14:20 -0700)
Summary: AsyncPipeReader and AsyncPipeWriter are classes to asynchronously
read and write to pipes.

Reviewed By: @djwatson

Differential Revision: D2479514

folly/Makefile.am
folly/io/async/AsyncPipe.cpp [new file with mode: 0644]
folly/io/async/AsyncPipe.h [new file with mode: 0644]
folly/io/async/AsyncTransport.h
folly/io/async/README.md
folly/io/async/test/AsyncPipeTest.cpp [new file with mode: 0644]

index 2d90added67b27af92da191ed8b16947f4d7a8b2..a4e2e483f435e68275798aca22cc0a6bf71ccacb 100644 (file)
@@ -186,6 +186,7 @@ nobase_follyinclude_HEADERS = \
        io/RecordIO-inl.h \
        io/TypedIOBuf.h \
        io/ShutdownSocketSet.h \
+       io/async/AsyncPipe.h \
        io/async/AsyncTimeout.h \
        io/async/AsyncTransport.h \
        io/async/AsyncUDPServerSocket.h \
@@ -336,6 +337,7 @@ libfolly_la_SOURCES = \
        io/IOBufQueue.cpp \
        io/RecordIO.cpp \
        io/ShutdownSocketSet.cpp \
+       io/async/AsyncPipe.cpp \
        io/async/AsyncTimeout.cpp \
        io/async/AsyncUDPSocket.cpp \
        io/async/AsyncServerSocket.cpp \
diff --git a/folly/io/async/AsyncPipe.cpp b/folly/io/async/AsyncPipe.cpp
new file mode 100644 (file)
index 0000000..7937e40
--- /dev/null
@@ -0,0 +1,249 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/io/async/AsyncPipe.h>
+
+#include <folly/FileUtil.h>
+#include <folly/io/async/AsyncSocketException.h>
+
+using std::string;
+using std::unique_ptr;
+using folly::IOBuf;
+using folly::IOBufQueue;
+
+namespace folly {
+
+AsyncPipeReader::~AsyncPipeReader() {
+  close();
+}
+
+void AsyncPipeReader::failRead(const AsyncSocketException& ex) {
+  VLOG(5) << "AsyncPipeReader(this=" << this << ", fd=" << fd_ <<
+    "): failed while reading: " << ex.what();
+
+  DCHECK(readCallback_ != nullptr);
+  AsyncReader::ReadCallback* callback = readCallback_;
+  readCallback_ = nullptr;
+  callback->readErr(ex);
+  close();
+}
+
+void AsyncPipeReader::close() {
+  unregisterHandler();
+  if (fd_ >= 0) {
+    changeHandlerFD(-1);
+
+    if (closeCb_) {
+      closeCb_(fd_);
+    } else {
+      ::close(fd_);
+    }
+    fd_ = -1;
+  }
+}
+
+void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
+  DestructorGuard dg(this);
+  CHECK(events & EventHandler::READ);
+
+  VLOG(5) << "AsyncPipeReader::handlerReady() this=" << this << ", fd=" << fd_;
+  assert(readCallback_ != nullptr);
+
+  uint16_t numReads = 0;
+  while (readCallback_) {
+    // Get the buffer to read into.
+    void* buf = nullptr;
+    size_t buflen = 0;
+    try {
+      readCallback_->getReadBuffer(&buf, &buflen);
+    } catch (const std::exception& ex) {
+      AsyncSocketException aex(AsyncSocketException::BAD_ARGS,
+                               string("ReadCallback::getReadBuffer() "
+                                      "threw exception: ") + ex.what());
+      failRead(aex);
+      return;
+    } catch (...) {
+      AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
+                              string("ReadCallback::getReadBuffer() "
+                                     "threw non-exception type"));
+      failRead(ex);
+      return;
+    }
+    if (buf == nullptr || buflen == 0) {
+      AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
+                              string("ReadCallback::getReadBuffer() "
+                                     "returned empty buffer"));
+      failRead(ex);
+      return;
+    }
+
+    // Perform the read
+    ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
+    if (bytesRead > 0) {
+      readCallback_->readDataAvailable(bytesRead);
+      // Fall through and continue around the loop if the read
+      // completely filled the available buffer.
+      // Note that readCallback_ may have been uninstalled or changed inside
+      // readDataAvailable().
+      if (static_cast<size_t>(bytesRead) < buflen) {
+        return;
+      }
+    } else if (bytesRead < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+      // No more data to read right now.
+      return;
+    } else if (bytesRead < 0) {
+      AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
+                              "read failed", errno);
+      failRead(ex);
+      return;
+    } else {
+      assert(bytesRead == 0);
+      // EOF
+
+      unregisterHandler();
+      AsyncReader::ReadCallback* callback = readCallback_;
+      readCallback_ = nullptr;
+      callback->readEOF();
+      return;
+    }
+    // Max reads per loop?
+  }
+}
+
+
+void AsyncPipeWriter::write(unique_ptr<folly::IOBuf> buf,
+                            AsyncWriter::WriteCallback* callback) {
+  if (closed()) {
+    if (callback) {
+      AsyncSocketException ex(AsyncSocketException::NOT_OPEN,
+                              "attempt to write to closed pipe");
+      callback->writeErr(0, ex);
+    }
+    return;
+  }
+  bool wasEmpty = (queue_.empty());
+  folly::IOBufQueue iobq;
+  iobq.append(std::move(buf));
+  std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*> p(
+    std::move(iobq), callback);
+  queue_.emplace_back(std::move(p));
+  if (wasEmpty)  {
+    handleWrite();
+  } else {
+    CHECK(!queue_.empty());
+    CHECK(isHandlerRegistered());
+  }
+}
+
+void AsyncPipeWriter::writeChain(folly::AsyncWriter::WriteCallback* callback,
+                                 std::unique_ptr<folly::IOBuf>&& buf,
+                                 WriteFlags) {
+  write(std::move(buf), callback);
+}
+
+void AsyncPipeWriter::closeOnEmpty() {
+  VLOG(5) << "close on empty";
+  if (queue_.empty()) {
+    closeNow();
+  } else {
+    closeOnEmpty_ = true;
+    CHECK(isHandlerRegistered());
+  }
+}
+
+void AsyncPipeWriter::closeNow() {
+  VLOG(5) << "close now";
+  if (!queue_.empty()) {
+    failAllWrites(AsyncSocketException(AsyncSocketException::NOT_OPEN,
+                                       "closed with pending writes"));
+  }
+  if (fd_ >= 0) {
+    unregisterHandler();
+    changeHandlerFD(-1);
+    if (closeCb_) {
+      closeCb_(fd_);
+    } else {
+      close(fd_);
+    }
+    fd_ = -1;
+  }
+}
+
+void AsyncPipeWriter::failAllWrites(const AsyncSocketException& ex) {
+  DestructorGuard dg(this);
+  while (!queue_.empty()) {
+    // the first entry of the queue could have had a partial write, but needs to
+    // be tracked.
+    if (queue_.front().second) {
+      queue_.front().second->writeErr(0, ex);
+    }
+    queue_.pop_front();
+  }
+}
+
+
+void AsyncPipeWriter::handlerReady(uint16_t events) noexcept {
+  CHECK(events & EventHandler::WRITE);
+
+  handleWrite();
+}
+
+void AsyncPipeWriter::handleWrite() {
+  DestructorGuard dg(this);
+  assert(!queue_.empty());
+  do {
+    auto& front = queue_.front();
+    folly::IOBufQueue &curQueue = front.first;
+    DCHECK(!curQueue.empty());
+    // someday, support writev.  The logic for partial writes is a bit complex
+    const IOBuf* head = curQueue.front();
+    CHECK(head->length());
+    ssize_t rc = folly::writeNoInt(fd_, head->data(), head->length());
+    if (rc < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        // pipe is full
+        VLOG(5) << "write blocked";
+        registerHandler(EventHandler::WRITE);
+        return;
+      } else {
+        failAllWrites(AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
+                                           "write failed", errno));
+        closeNow();
+        return;
+      }
+    } else if (rc == 0) {
+      registerHandler(EventHandler::WRITE);
+      return;
+    }
+    curQueue.trimStart(rc);
+    if (curQueue.empty()) {
+      auto cb = front.second;
+      queue_.pop_front();
+      if (cb) {
+        cb->writeSuccess();
+      }
+    } else {
+      VLOG(5) << "partial write blocked";
+    }
+  } while (!queue_.empty());
+
+  if (closeOnEmpty_) {
+    closeNow();
+  } else {
+    unregisterHandler();
+  }
+}
+
+} // folly
diff --git a/folly/io/async/AsyncPipe.h b/folly/io/async/AsyncPipe.h
new file mode 100644 (file)
index 0000000..63e73ac
--- /dev/null
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/io/async/AsyncTransport.h>
+#include <folly/io/async/EventHandler.h>
+#include <folly/io/async/DelayedDestruction.h>
+#include <folly/io/IOBufQueue.h>
+
+#include <list>
+#include <system_error>
+
+namespace folly {
+
+class AsyncSocketException;
+
+/**
+ * Read from a pipe in an async manner.
+ */
+class AsyncPipeReader : public EventHandler,
+                        public AsyncReader,
+                        public DelayedDestruction {
+ public:
+  typedef std::unique_ptr<AsyncPipeReader,
+                          folly::DelayedDestruction::Destructor> UniquePtr;
+
+  AsyncPipeReader(folly::EventBase* eventBase, int pipeFd)
+    : EventHandler(eventBase, pipeFd),
+    fd_(pipeFd) {}
+
+  /**
+   * Set the read callback and automatically install/uninstall the handler
+   * for events.
+   */
+  void setReadCB(AsyncReader::ReadCallback* callback) override {
+    if (callback == readCallback_) {
+      return;
+    }
+    readCallback_ = callback;
+    if (readCallback_ && !isHandlerRegistered()) {
+      registerHandler(EventHandler::READ | EventHandler::PERSIST);
+    } else if (!readCallback_ && isHandlerRegistered()) {
+      unregisterHandler();
+    }
+  }
+
+  /**
+   * Get the read callback
+   */
+  AsyncReader::ReadCallback* getReadCallback() const override {
+    return readCallback_;
+  }
+
+  /**
+   * Set a special hook to close the socket (otherwise, will call close())
+   */
+  void setCloseCallback(std::function<void(int)> closeCb) {
+    closeCb_ = closeCb;
+  }
+
+ private:
+  ~AsyncPipeReader();
+
+  void handlerReady(uint16_t events) noexcept override;
+  void failRead(const AsyncSocketException& ex);
+  void close();
+
+  int fd_;
+  AsyncReader::ReadCallback* readCallback_{nullptr};
+  std::function<void(int)> closeCb_;
+};
+
+/**
+ * Write to a pipe in an async manner.
+ */
+class AsyncPipeWriter : public EventHandler,
+                        public AsyncWriter,
+                        public DelayedDestruction {
+ public:
+  typedef std::unique_ptr<AsyncPipeWriter,
+                          folly::DelayedDestruction::Destructor> UniquePtr;
+
+  AsyncPipeWriter(folly::EventBase* eventBase, int pipeFd)
+    : EventHandler(eventBase, pipeFd),
+    fd_(pipeFd) {}
+
+  /**
+   * Asynchronously write the given iobuf to this pipe, and invoke the callback
+   * on success/error.
+   */
+  void write(std::unique_ptr<folly::IOBuf> iob,
+             AsyncWriter::WriteCallback* wcb = nullptr);
+
+  /**
+   * Set a special hook to close the socket (otherwise, will call close())
+   */
+  void setCloseCallback(std::function<void(int)> closeCb) {
+    closeCb_ = closeCb;
+  }
+
+  /**
+   * Returns true if the pipe is closed
+   */
+  bool closed() const {
+    return (fd_ < 0 || closeOnEmpty_);
+  }
+
+  /**
+   * Notify the pipe to close as soon as all pending writes complete
+   */
+  void closeOnEmpty();
+
+  /**
+   * Close the pipe immediately, and fail all pending writes
+   */
+  void closeNow();
+
+  /**
+   * Return true if there are currently writes pending (eg: the pipe is blocked
+   * for writing)
+   */
+  bool hasPendingWrites() const {
+    return !queue_.empty();
+  }
+
+  // AsyncWriter methods
+  void write(folly::AsyncWriter::WriteCallback* callback, const void* buf,
+             size_t bytes, WriteFlags flags = WriteFlags::NONE) override {
+    writeChain(callback, IOBuf::wrapBuffer(buf, bytes), flags);
+  }
+  void writev(folly::AsyncWriter::WriteCallback*, const iovec*,
+              size_t, WriteFlags = WriteFlags::NONE) override {
+    throw std::runtime_error("writev is not supported. Please use writeChain.");
+  }
+  void writeChain(folly::AsyncWriter::WriteCallback* callback,
+                  std::unique_ptr<folly::IOBuf>&& buf,
+                  WriteFlags flags = WriteFlags::NONE) override;
+
+ private:
+  void handlerReady(uint16_t events) noexcept override;
+  void handleWrite();
+  void failAllWrites(const AsyncSocketException& ex);
+
+  int fd_;
+  std::list<std::pair<folly::IOBufQueue, AsyncWriter::WriteCallback*>> queue_;
+  bool closeOnEmpty_{false};
+  std::function<void(int)> closeCb_;
+
+  ~AsyncPipeWriter() {
+    closeNow();
+  }
+};
+
+} // folly
index b45ed96d5fc3c4c459dfecc575a2e928ae021f45..e4a6c29b32b997ebb4236414ec0315ca77f0b7b9 100644 (file)
@@ -331,12 +331,8 @@ class AsyncTransport : public DelayedDestruction, public AsyncSocketBase {
   virtual ~AsyncTransport() = default;
 };
 
-// Transitional intermediate interface. This is deprecated.
-// Wrapper around folly::AsyncTransport, that includes read/write callbacks
-class AsyncTransportWrapper : virtual public AsyncTransport {
+class AsyncReader {
  public:
-  typedef std::unique_ptr<AsyncTransportWrapper, Destructor> UniquePtr;
-
   class ReadCallback {
    public:
     virtual ~ReadCallback() = default;
@@ -453,6 +449,16 @@ class AsyncTransportWrapper : virtual public AsyncTransport {
     virtual void readErr(const AsyncSocketException& ex) noexcept = 0;
   };
 
+  // Read methods that aren't part of AsyncTransport.
+  virtual void setReadCB(ReadCallback* callback) = 0;
+  virtual ReadCallback* getReadCallback() const = 0;
+
+ protected:
+  virtual ~AsyncReader() = default;
+};
+
+class AsyncWriter {
+ public:
   class WriteCallback {
    public:
     virtual ~WriteCallback() = default;
@@ -480,10 +486,7 @@ class AsyncTransportWrapper : virtual public AsyncTransport {
                           const AsyncSocketException& ex) noexcept = 0;
   };
 
-  // Read/write methods that aren't part of AsyncTransport
-  virtual void setReadCB(ReadCallback* callback) = 0;
-  virtual ReadCallback* getReadCallback() const = 0;
-
+  // Write methods that aren't part of AsyncTransport
   virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
                      WriteFlags flags = WriteFlags::NONE) = 0;
   virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
@@ -491,6 +494,32 @@ class AsyncTransportWrapper : virtual public AsyncTransport {
   virtual void writeChain(WriteCallback* callback,
                           std::unique_ptr<IOBuf>&& buf,
                           WriteFlags flags = WriteFlags::NONE) = 0;
+
+ protected:
+  virtual ~AsyncWriter() = default;
+};
+
+// Transitional intermediate interface. This is deprecated.
+// Wrapper around folly::AsyncTransport, that includes read/write callbacks
+class AsyncTransportWrapper : virtual public AsyncTransport,
+                              virtual public AsyncReader,
+                              virtual public AsyncWriter {
+ public:
+  using UniquePtr = std::unique_ptr<AsyncTransportWrapper, Destructor>;
+
+  // Alias for inherited members from AsyncReader and AsyncWriter
+  // to keep compatibility.
+  using ReadCallback    = AsyncReader::ReadCallback;
+  using WriteCallback   = AsyncWriter::WriteCallback;
+  virtual void setReadCB(ReadCallback* callback) override = 0;
+  virtual ReadCallback* getReadCallback() const override = 0;
+  virtual void write(WriteCallback* callback, const void* buf, size_t bytes,
+                     WriteFlags flags = WriteFlags::NONE) override = 0;
+  virtual void writev(WriteCallback* callback, const iovec* vec, size_t count,
+                      WriteFlags flags = WriteFlags::NONE) override = 0;
+  virtual void writeChain(WriteCallback* callback,
+                          std::unique_ptr<IOBuf>&& buf,
+                          WriteFlags flags = WriteFlags::NONE) override = 0;
 };
 
 } // folly
index ce61f64effee3f658c59bd58ff698d240bb538ed..ed03085d94bb632ca519f1912467949dbb8943db 100644 (file)
@@ -256,8 +256,6 @@ clarity, we don't reuse the same fd as a socket to receive signals.
 
 ### AsyncPipe
 
-TODO: not currently open souce
-
 Async reads/writes to a unix pipe, to send data between processes.
 Why don't you just use AsyncSocket for now?
 
@@ -353,4 +351,4 @@ Some best practices we've found:
    notify of overload, such as timeouts, or CPU usage.  For sync
    systems, you are almost always limited by the number of threads.
    For more details see [No Time for
-   Asynchrony](https://www.usenix.org/legacy/event/hotos09/tech/full_papers/aguilera/aguilera.pdf)
\ No newline at end of file
+   Asynchrony](https://www.usenix.org/legacy/event/hotos09/tech/full_papers/aguilera/aguilera.pdf)
diff --git a/folly/io/async/test/AsyncPipeTest.cpp b/folly/io/async/test/AsyncPipeTest.cpp
new file mode 100644 (file)
index 0000000..750e9aa
--- /dev/null
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/io/async/AsyncPipe.h>
+#include <folly/io/async/EventBase.h>
+#include <folly/Memory.h>
+#include <gtest/gtest.h>
+
+#include <fcntl.h>
+
+using namespace testing;
+
+namespace {
+
+class TestReadCallback : public folly::AsyncReader::ReadCallback {
+ public:
+  void readDataAvailable(size_t len) noexcept override {
+    readBuffer_.postallocate(len);
+  }
+
+  void getReadBuffer(void** bufReturn, size_t* lenReturn) noexcept override {
+    auto res = readBuffer_.preallocate(4000, 65000);
+    *bufReturn = res.first;
+    *lenReturn = res.second;
+  }
+
+  void readEOF() noexcept override {}
+
+  void readErr(const folly::AsyncSocketException&) noexcept override {
+    error_ = true;
+  }
+
+  std::string getData() {
+    auto buf = readBuffer_.move();
+    buf->coalesce();
+    return std::string((char *)buf->data(), buf->length());
+  }
+
+  folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
+  bool error_{false};
+};
+
+class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
+ public:
+  void writeSuccess() noexcept override { writes_++; }
+
+  void writeErr(size_t, const folly::AsyncSocketException&) noexcept override {
+    error_ = true;
+  }
+
+  uint32_t writes_{0};
+  bool error_{false};
+};
+
+class AsyncPipeTest: public Test {
+ public:
+  void SetUp() override {
+    int rc = pipe(pipeFds_);
+    EXPECT_EQ(rc, 0);
+
+    EXPECT_EQ(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK), 0);
+    EXPECT_EQ(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK), 0);
+    reader_ = folly::make_unique<folly::AsyncPipeReader,
+                                 folly::DelayedDestruction::Destructor>(
+      &eventBase_, pipeFds_[0]);
+    writer_ = folly::make_unique<folly::AsyncPipeWriter,
+                                 folly::DelayedDestruction::Destructor>(
+      &eventBase_, pipeFds_[1]);
+  }
+
+ protected:
+  folly::EventBase eventBase_;
+  int pipeFds_[2];
+  folly::AsyncPipeReader::UniquePtr reader_;
+  folly::AsyncPipeWriter::UniquePtr writer_;
+  TestReadCallback readCallback_;
+  TestWriteCallback writeCallback_;
+};
+
+std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) {
+  auto buf = folly::IOBuf::copyBuffer(data.c_str(), data.length());
+  return buf;
+}
+
+} // anonymous namespace
+
+
+TEST_F(AsyncPipeTest, simple) {
+  reader_->setReadCB(&readCallback_);
+  writer_->write(getBuf("hello"), &writeCallback_);
+  writer_->closeOnEmpty();
+  eventBase_.loop();
+  EXPECT_EQ(readCallback_.getData(), "hello");
+  EXPECT_FALSE(readCallback_.error_);
+  EXPECT_EQ(writeCallback_.writes_, 1);
+  EXPECT_FALSE(writeCallback_.error_);
+}
+
+TEST_F(AsyncPipeTest, blocked_writes) {
+  uint32_t writeAttempts = 0;
+  do {
+    ++writeAttempts;
+    writer_->write(getBuf("hello"), &writeCallback_);
+  } while (writeCallback_.writes_ == writeAttempts);
+  // there is one blocked write
+  writer_->closeOnEmpty();
+
+  reader_->setReadCB(&readCallback_);
+
+  eventBase_.loop();
+  std::string expected;
+  for (uint32_t i = 0; i < writeAttempts; i++) {
+    expected += "hello";
+  }
+  EXPECT_EQ(readCallback_.getData(), expected);
+  EXPECT_FALSE(readCallback_.error_);
+  EXPECT_EQ(writeCallback_.writes_, writeAttempts);
+  EXPECT_FALSE(writeCallback_.error_);
+}
+
+TEST_F(AsyncPipeTest, writeOnClose) {
+  reader_->setReadCB(&readCallback_);
+  writer_->write(getBuf("hello"), &writeCallback_);
+  writer_->closeOnEmpty();
+  writer_->write(getBuf("hello"), &writeCallback_);
+  eventBase_.loop();
+  EXPECT_EQ(readCallback_.getData(), "hello");
+  EXPECT_FALSE(readCallback_.error_);
+  EXPECT_EQ(writeCallback_.writes_, 1);
+  EXPECT_TRUE(writeCallback_.error_);
+}