Adds writer test case for RCU
[folly.git] / folly / io / async / AsyncPipe.cpp
index b4263346917849e7ec7d8b9b3dc57a190849543c..13581f4b4e2191c845051f0690a4390b4f7912dd 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2014-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -62,36 +62,57 @@ void AsyncPipeReader::handlerReady(uint16_t events) noexcept {
   assert(readCallback_ != nullptr);
 
   while (readCallback_) {
+    // - What API does callback support?
+    const auto movable = readCallback_->isBufferMovable(); // noexcept
+
     // Get the buffer to read into.
     void* buf = nullptr;
     size_t buflen = 0;
-    try {
-      readCallback_->getReadBuffer(&buf, &buflen);
-    } catch (const std::exception& ex) {
-      AsyncSocketException aex(AsyncSocketException::BAD_ARGS,
-                               string("ReadCallback::getReadBuffer() "
-                                      "threw exception: ") + ex.what());
-      failRead(aex);
-      return;
-    } catch (...) {
-      AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
-                              string("ReadCallback::getReadBuffer() "
-                                     "threw non-exception type"));
-      failRead(ex);
-      return;
-    }
-    if (buf == nullptr || buflen == 0) {
-      AsyncSocketException ex(AsyncSocketException::INVALID_STATE,
-                              string("ReadCallback::getReadBuffer() "
-                                     "returned empty buffer"));
-      failRead(ex);
-      return;
+    std::unique_ptr<IOBuf> ioBuf;
+
+    if (movable) {
+      ioBuf = IOBuf::create(readCallback_->maxBufferSize());
+      buf = ioBuf->writableBuffer();
+      buflen = ioBuf->capacity();
+    } else {
+      try {
+        readCallback_->getReadBuffer(&buf, &buflen);
+      } catch (const std::exception& ex) {
+        AsyncSocketException aex(
+            AsyncSocketException::BAD_ARGS,
+            string("ReadCallback::getReadBuffer() "
+                   "threw exception: ") +
+                ex.what());
+        failRead(aex);
+        return;
+      } catch (...) {
+        AsyncSocketException aex(
+            AsyncSocketException::BAD_ARGS,
+            string("ReadCallback::getReadBuffer() "
+                   "threw non-exception type"));
+        failRead(aex);
+        return;
+      }
+      if (buf == nullptr || buflen == 0) {
+        AsyncSocketException aex(
+            AsyncSocketException::INVALID_STATE,
+            string("ReadCallback::getReadBuffer() "
+                   "returned empty buffer"));
+        failRead(aex);
+        return;
+      }
     }
 
     // Perform the read
     ssize_t bytesRead = folly::readNoInt(fd_, buf, buflen);
+
     if (bytesRead > 0) {
-      readCallback_->readDataAvailable(bytesRead);
+      if (movable) {
+        ioBuf->append(uint64_t(bytesRead));
+        readCallback_->readBufferAvailable(std::move(ioBuf));
+      } else {
+        readCallback_->readDataAvailable(size_t(bytesRead));
+      }
       // Fall through and continue around the loop if the read
       // completely filled the available buffer.
       // Note that readCallback_ may have been uninstalled or changed inside
@@ -148,8 +169,7 @@ void AsyncPipeWriter::write(unique_ptr<folly::IOBuf> buf,
 
 void AsyncPipeWriter::writeChain(folly::AsyncWriter::WriteCallback* callback,
                                  std::unique_ptr<folly::IOBuf>&& buf,
-                                 WriteFlags,
-                                 BufferCallback*) {
+                                 WriteFlags) {
   write(std::move(buf), callback);
 }
 
@@ -227,7 +247,7 @@ void AsyncPipeWriter::handleWrite() {
       registerHandler(EventHandler::WRITE);
       return;
     }
-    curQueue.trimStart(rc);
+    curQueue.trimStart(size_t(rc));
     if (curQueue.empty()) {
       auto cb = front.second;
       queue_.pop_front();
@@ -246,4 +266,4 @@ void AsyncPipeWriter::handleWrite() {
   }
 }
 
-} // folly
+} // namespace folly