folly: build with -Wunused-parameter
[folly.git] / folly / io / async / AsyncSocket.cpp
index 167a49bfdd3b105ec2804128ae9151de4fff02c5..a8fb13e0425f25fce7b658fa8321e37a464b8617 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2015 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
 #include <folly/io/async/AsyncSocket.h>
 
 #include <folly/io/async/EventBase.h>
+#include <folly/io/async/EventHandler.h>
 #include <folly/SocketAddress.h>
 #include <folly/io/IOBuf.h>
 
 #include <errno.h>
 #include <limits.h>
 #include <unistd.h>
+#include <thread>
 #include <fcntl.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <netinet/tcp.h>
+#include <boost/preprocessor/control/if.hpp>
 
 using std::string;
 using std::unique_ptr;
@@ -37,15 +40,13 @@ namespace folly {
 
 // static members initializers
 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
-const folly::SocketAddress AsyncSocket::anyAddress =
-  folly::SocketAddress("0.0.0.0", 0);
 
 const AsyncSocketException socketClosedLocallyEx(
     AsyncSocketException::END_OF_FILE, "socket closed locally");
 const AsyncSocketException socketShutdownForWritesEx(
     AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
 
-// TODO: It might help performance to provide a version of WriteRequest that
+// TODO: It might help performance to provide a version of BytesWriteRequest that
 // users could derive from, so we can avoid the extra allocation for each call
 // to write()/writev().  We could templatize TFramedAsyncChannel just like the
 // protocols are currently templatized for transports.
@@ -54,139 +55,139 @@ const AsyncSocketException socketShutdownForWritesEx(
 // storage space, and only our internal version would allocate it at the end of
 // the WriteRequest.
 
-/**
- * A WriteRequest object tracks information about a pending write() or writev()
- * operation.
+/* The default WriteRequest implementation, used for write(), writev() and
+ * writeChain()
  *
- * A new WriteRequest operation is allocated on the heap for all write
+ * A new BytesWriteRequest operation is allocated on the heap for all write
  * operations that cannot be completed immediately.
  */
-class AsyncSocket::WriteRequest {
+class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
  public:
-  static WriteRequest* newRequest(WriteCallback* callback,
-                                  const iovec* ops,
-                                  uint32_t opCount,
-                                  unique_ptr<IOBuf>&& ioBuf,
-                                  WriteFlags flags) {
+  static BytesWriteRequest* newRequest(AsyncSocket* socket,
+                                       WriteCallback* callback,
+                                       const iovec* ops,
+                                       uint32_t opCount,
+                                       uint32_t partialWritten,
+                                       uint32_t bytesWritten,
+                                       unique_ptr<IOBuf>&& ioBuf,
+                                       WriteFlags flags) {
     assert(opCount > 0);
     // Since we put a variable size iovec array at the end
-    // of each WriteRequest, we have to manually allocate the memory.
-    void* buf = malloc(sizeof(WriteRequest) +
+    // of each BytesWriteRequest, we have to manually allocate the memory.
+    void* buf = malloc(sizeof(BytesWriteRequest) +
                        (opCount * sizeof(struct iovec)));
     if (buf == nullptr) {
       throw std::bad_alloc();
     }
 
-    return new(buf) WriteRequest(callback, ops, opCount, std::move(ioBuf),
-                                 flags);
+    return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
+                                      partialWritten, bytesWritten,
+                                      std::move(ioBuf), flags);
   }
 
-  void destroy() {
-    this->~WriteRequest();
+  void destroy() override {
+    this->~BytesWriteRequest();
     free(this);
   }
 
-  bool cork() const {
-    return isSet(flags_, WriteFlags::CORK);
-  }
-
-  WriteFlags flags() const {
-    return flags_;
-  }
-
-  WriteRequest* getNext() const {
-    return next_;
-  }
-
-  WriteCallback* getCallback() const {
-    return callback_;
-  }
-
-  uint32_t getBytesWritten() const {
-    return bytesWritten_;
-  }
-
-  const struct iovec* getOps() const {
-    assert(opCount_ > opIndex_);
-    return writeOps_ + opIndex_;
+  bool performWrite() override {
+    WriteFlags writeFlags = flags_;
+    if (getNext() != nullptr) {
+      writeFlags = writeFlags | WriteFlags::CORK;
+    }
+    bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
+                                          &opsWritten_, &partialBytes_);
+    return bytesWritten_ >= 0;
   }
 
-  uint32_t getOpCount() const {
-    assert(opCount_ > opIndex_);
-    return opCount_ - opIndex_;
+  bool isComplete() override {
+    return opsWritten_ == getOpCount();
   }
 
-  void consume(uint32_t wholeOps, uint32_t partialBytes,
-               uint32_t totalBytesWritten) {
-    // Advance opIndex_ forward by wholeOps
-    opIndex_ += wholeOps;
+  void consume() override {
+    // Advance opIndex_ forward by opsWritten_
+    opIndex_ += opsWritten_;
     assert(opIndex_ < opCount_);
 
     // If we've finished writing any IOBufs, release them
     if (ioBuf_) {
-      for (uint32_t i = wholeOps; i != 0; --i) {
+      for (uint32_t i = opsWritten_; i != 0; --i) {
         assert(ioBuf_);
         ioBuf_ = ioBuf_->pop();
       }
     }
 
-    // Move partialBytes forward into the current iovec buffer
+    // Move partialBytes_ forward into the current iovec buffer
     struct iovec* currentOp = writeOps_ + opIndex_;
-    assert((partialBytes < currentOp->iov_len) || (currentOp->iov_len == 0));
+    assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
     currentOp->iov_base =
-      reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes;
-    currentOp->iov_len -= partialBytes;
+      reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
+    currentOp->iov_len -= partialBytes_;
 
-    // Increment the bytesWritten_ count by totalBytesWritten
-    bytesWritten_ += totalBytesWritten;
-  }
-
-  void append(WriteRequest* next) {
-    assert(next_ == nullptr);
-    next_ = next;
+    // Increment the totalBytesWritten_ count by bytesWritten_;
+    totalBytesWritten_ += bytesWritten_;
   }
 
  private:
-  WriteRequest(WriteCallback* callback,
-               const struct iovec* ops,
-               uint32_t opCount,
-               unique_ptr<IOBuf>&& ioBuf,
-               WriteFlags flags)
-    : next_(nullptr)
-    , callback_(callback)
-    , bytesWritten_(0)
+  BytesWriteRequest(AsyncSocket* socket,
+                    WriteCallback* callback,
+                    const struct iovec* ops,
+                    uint32_t opCount,
+                    uint32_t partialBytes,
+                    uint32_t bytesWritten,
+                    unique_ptr<IOBuf>&& ioBuf,
+                    WriteFlags flags)
+    : AsyncSocket::WriteRequest(socket, callback)
     , opCount_(opCount)
     , opIndex_(0)
     , flags_(flags)
-    , ioBuf_(std::move(ioBuf)) {
+    , ioBuf_(std::move(ioBuf))
+    , opsWritten_(0)
+    , partialBytes_(partialBytes)
+    , bytesWritten_(bytesWritten) {
     memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
   }
 
-  // Private destructor, to ensure callers use destroy()
-  ~WriteRequest() {}
+  // private destructor, to ensure callers use destroy()
+  ~BytesWriteRequest() override = default;
+
+  const struct iovec* getOps() const {
+    assert(opCount_ > opIndex_);
+    return writeOps_ + opIndex_;
+  }
+
+  uint32_t getOpCount() const {
+    assert(opCount_ > opIndex_);
+    return opCount_ - opIndex_;
+  }
 
-  WriteRequest* next_;          ///< pointer to next WriteRequest
-  WriteCallback* callback_;     ///< completion callback
-  uint32_t bytesWritten_;       ///< bytes written
   uint32_t opCount_;            ///< number of entries in writeOps_
   uint32_t opIndex_;            ///< current index into writeOps_
   WriteFlags flags_;            ///< set for WriteFlags
   unique_ptr<IOBuf> ioBuf_;     ///< underlying IOBuf, or nullptr if N/A
+
+  // for consume(), how much we wrote on the last write
+  uint32_t opsWritten_;         ///< complete ops written
+  uint32_t partialBytes_;       ///< partial bytes of incomplete op written
+  ssize_t bytesWritten_;        ///< bytes written altogether
+
   struct iovec writeOps_[];     ///< write operation(s) list
 };
 
 AsyncSocket::AsyncSocket()
   : eventBase_(nullptr)
   , writeTimeout_(this, nullptr)
-  , ioHandler_(this, nullptr) {
-  VLOG(5) << "new AsyncSocket(" << ")";
+  , ioHandler_(this, nullptr)
+  , immediateReadHandler_(this) {
+  VLOG(5) << "new AsyncSocket()";
   init();
 }
 
 AsyncSocket::AsyncSocket(EventBase* evb)
   : eventBase_(evb)
   , writeTimeout_(this, evb)
-  , ioHandler_(this, evb) {
+  , ioHandler_(this, evb)
+  , immediateReadHandler_(this) {
   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
   init();
 }
@@ -194,11 +195,7 @@ AsyncSocket::AsyncSocket(EventBase* evb)
 AsyncSocket::AsyncSocket(EventBase* evb,
                            const folly::SocketAddress& address,
                            uint32_t connectTimeout)
-  : eventBase_(evb)
-  , writeTimeout_(this, evb)
-  , ioHandler_(this, evb) {
-  VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
-  init();
+  : AsyncSocket(evb) {
   connect(nullptr, address, connectTimeout);
 }
 
@@ -206,22 +203,20 @@ AsyncSocket::AsyncSocket(EventBase* evb,
                            const std::string& ip,
                            uint16_t port,
                            uint32_t connectTimeout)
-  : eventBase_(evb)
-  , writeTimeout_(this, evb)
-  , ioHandler_(this, evb) {
-  VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
-  init();
+  : AsyncSocket(evb) {
   connect(nullptr, ip, port, connectTimeout);
 }
 
 AsyncSocket::AsyncSocket(EventBase* evb, int fd)
   : eventBase_(evb)
   , writeTimeout_(this, evb)
-  , ioHandler_(this, evb, fd) {
+  , ioHandler_(this, evb, fd)
+  , immediateReadHandler_(this) {
   VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
           << fd << ")";
   init();
   fd_ = fd;
+  setCloseOnExec();
   state_ = StateEnum::ESTABLISHED;
 }
 
@@ -280,6 +275,12 @@ int AsyncSocket::detachFd() {
   return fd;
 }
 
+const folly::SocketAddress& AsyncSocket::anyAddress() {
+  static const folly::SocketAddress anyAddress =
+    folly::SocketAddress("0.0.0.0", 0);
+  return anyAddress;
+}
+
 void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
   if (shutdownSocketSet_ == newSS) {
     return;
@@ -293,6 +294,15 @@ void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
   }
 }
 
+void AsyncSocket::setCloseOnExec() {
+  int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
+  if (rv != 0) {
+    throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
+                               withAddr("failed to set close-on-exec flag"),
+                               errno);
+  }
+}
+
 void AsyncSocket::connect(ConnectCallback* callback,
                            const folly::SocketAddress& address,
                            int timeout,
@@ -308,6 +318,10 @@ void AsyncSocket::connect(ConnectCallback* callback,
     return invalidState(callback);
   }
 
+  connectStartTime_ = std::chrono::steady_clock::now();
+  // Make connect end time at least >= connectStartTime.
+  connectEndTime_ = connectStartTime_;
+
   assert(fd_ == -1);
   state_ = StateEnum::CONNECTING;
   connectCallback_ = callback;
@@ -331,14 +345,7 @@ void AsyncSocket::connect(ConnectCallback* callback,
     }
     ioHandler_.changeHandlerFD(fd_);
 
-    // Set the FD_CLOEXEC flag so that the socket will be closed if the program
-    // later forks and execs.
-    int rv = fcntl(fd_, F_SETFD, FD_CLOEXEC);
-    if (rv != 0) {
-      throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
-                                withAddr("failed to set close-on-exec flag"),
-                                errno);
-    }
+    setCloseOnExec();
 
     // Put the socket in non-blocking mode
     int flags = fcntl(fd_, F_GETFL, 0);
@@ -346,7 +353,7 @@ void AsyncSocket::connect(ConnectCallback* callback,
       throw AsyncSocketException(AsyncSocketException::INTERNAL_ERROR,
                                 withAddr("failed to get socket flags"), errno);
     }
-    rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
+    int rv = fcntl(fd_, F_SETFL, flags | O_NONBLOCK);
     if (rv == -1) {
       throw AsyncSocketException(
           AsyncSocketException::INTERNAL_ERROR,
@@ -376,7 +383,7 @@ void AsyncSocket::connect(ConnectCallback* callback,
             << ", fd=" << fd_ << ", host=" << address.describe().c_str();
 
     // bind the socket
-    if (bindAddr != anyAddress) {
+    if (bindAddr != anyAddress()) {
       int one = 1;
       if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
         doClose();
@@ -460,10 +467,7 @@ void AsyncSocket::connect(ConnectCallback* callback,
   assert(readCallback_ == nullptr);
   assert(writeReqHead_ == nullptr);
   state_ = StateEnum::ESTABLISHED;
-  if (callback) {
-    connectCallback_ = nullptr;
-    callback->connectSuccess();
-  }
+  invokeConnectSuccess();
 }
 
 void AsyncSocket::connect(ConnectCallback* callback,
@@ -525,6 +529,12 @@ void AsyncSocket::setReadCB(ReadCallback *callback) {
     return;
   }
 
+  /* We are removing a read callback */
+  if (callback == nullptr &&
+      immediateReadHandler_.isLoopCallbackScheduled()) {
+    immediateReadHandler_.cancelLoopCallback();
+  }
+
   if (shutdownFlags_ & SHUT_READ) {
     // Reads have already been shut down on this socket.
     //
@@ -602,21 +612,22 @@ void AsyncSocket::write(WriteCallback* callback,
   iovec op;
   op.iov_base = const_cast<void*>(buf);
   op.iov_len = bytes;
-  writeImpl(callback, &op, 1, std::move(unique_ptr<IOBuf>()), flags);
+  writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
 }
 
 void AsyncSocket::writev(WriteCallback* callback,
                           const iovec* vec,
                           size_t count,
                           WriteFlags flags) {
-  writeImpl(callback, vec, count, std::move(unique_ptr<IOBuf>()), flags);
+  writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
 }
 
 void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
                               WriteFlags flags) {
+  constexpr size_t kSmallSizeMax = 64;
   size_t count = buf->countChainElements();
-  if (count <= 64) {
-    iovec vec[count];
+  if (count <= kSmallSizeMax) {
+    iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
     writeChainImpl(callback, vec, count, std::move(buf), flags);
   } else {
     iovec* vec = new iovec[count];
@@ -627,21 +638,8 @@ void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
 
 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
-  const IOBuf* head = buf.get();
-  const IOBuf* next = head;
-  unsigned i = 0;
-  do {
-    vec[i].iov_base = const_cast<uint8_t *>(next->data());
-    vec[i].iov_len = next->length();
-    // IOBuf can get confused by empty iovec buffers, so increment the
-    // output pointer only if the iovec buffer is non-empty.  We could
-    // end the loop with i < count, but that's ok.
-    if (vec[i].iov_len != 0) {
-      i++;
-    }
-    next = next->next();
-  } while (next != head);
-  writeImpl(callback, vec, i, std::move(buf), flags);
+  size_t veclen = buf->fillIov(vec, count);
+  writeImpl(callback, vec, veclen, std::move(buf), flags);
 }
 
 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
@@ -690,7 +688,11 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
           callback->writeSuccess();
         }
         return;
-      } // else { continue writing the next writeReq }
+      } else { // continue writing the next writeReq
+        if (bufferCallback_) {
+          bufferCallback_->onEgressBuffered();
+        }
+      }
       mustRegister = true;
     }
   } else if (!connecting()) {
@@ -701,16 +703,16 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
   // Create a new WriteRequest to add to the queue
   WriteRequest* req;
   try {
-    req = WriteRequest::newRequest(callback, vec + countWritten,
-                                   count - countWritten, std::move(ioBuf),
-                                   flags);
+    req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
+                                        count - countWritten, partialWritten,
+                                        bytesWritten, std::move(ioBuf), flags);
   } catch (const std::exception& ex) {
     // we mainly expect to catch std::bad_alloc here
     AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
         withAddr(string("failed to append new WriteRequest: ") + ex.what()));
     return failWrite(__func__, callback, bytesWritten, tex);
   }
-  req->consume(0, partialWritten, bytesWritten);
+  req->consume();
   if (writeReqTail_ == nullptr) {
     assert(writeReqHead_ == nullptr);
     writeReqHead_ = writeReqTail_ = req;
@@ -739,6 +741,17 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
   }
 }
 
+void AsyncSocket::writeRequest(WriteRequest* req) {
+  if (writeReqTail_ == nullptr) {
+    assert(writeReqHead_ == nullptr);
+    writeReqHead_ = writeReqTail_ = req;
+    req->start();
+  } else {
+    writeReqTail_->append(req);
+    writeReqTail_ = req;
+  }
+}
+
 void AsyncSocket::close() {
   VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
           << ", state=" << state_ << ", shutdownFlags="
@@ -821,16 +834,16 @@ void AsyncSocket::closeNow() {
         }
       }
 
+      if (immediateReadHandler_.isLoopCallbackScheduled()) {
+        immediateReadHandler_.cancelLoopCallback();
+      }
+
       if (fd_ >= 0) {
         ioHandler_.changeHandlerFD(-1);
         doClose();
       }
 
-      if (connectCallback_) {
-        ConnectCallback* callback = connectCallback_;
-        connectCallback_ = nullptr;
-        callback->connectErr(socketClosedLocallyEx);
-      }
+      invokeConnectErr(socketClosedLocallyEx);
 
       failAllWrites(socketClosedLocallyEx);
 
@@ -1053,7 +1066,10 @@ bool AsyncSocket::isDetachable() const {
 }
 
 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
-  address->setFromLocalAddress(fd_);
+  if (!localAddr_.isInitialized()) {
+    localAddr_.setFromLocalAddress(fd_);
+  }
+  *address = localAddr_;
 }
 
 void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
@@ -1186,6 +1202,37 @@ int AsyncSocket::setTCPProfile(int profd) {
   return 0;
 }
 
+void AsyncSocket::setPersistentCork(bool cork) {
+  if (setCork(cork) == 0) {
+    persistentCork_ = cork;
+  }
+}
+
+int AsyncSocket::setCork(bool cork) {
+#ifdef TCP_CORK
+  if (fd_ < 0) {
+    VLOG(4) << "AsyncSocket::setCork() called on non-open socket "
+            << this << "(stats=" << state_ << ")";
+    return EINVAL;
+  }
+
+  if (corked_ == cork) {
+    return 0;
+  }
+
+  int flag = cork ? 1 : 0;
+  if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) {
+    int errnoCopy = errno;
+    VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket"
+            << this << "(fd=" << fd_ << ", state=" << state_ << "):"
+            << folly::errnoStr(errnoCopy);
+    return errnoCopy;
+  }
+  corked_ = cork;
+#endif
+  return 0;
+}
+
 void AsyncSocket::ioReady(uint16_t events) noexcept {
   VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
           << ", events=" << std::hex << events << ", state=" << state_;
@@ -1221,8 +1268,18 @@ void AsyncSocket::ioReady(uint16_t events) noexcept {
   }
 }
 
-ssize_t AsyncSocket::performRead(void* buf, size_t buflen) {
-  ssize_t bytes = recv(fd_, buf, buflen, MSG_DONTWAIT);
+ssize_t AsyncSocket::performRead(void** buf,
+                                 size_t* buflen,
+                                 size_t* /* offset */) {
+  VLOG(5) << "AsyncSocket::performRead() this=" << this
+          << ", buf=" << *buf << ", buflen=" << *buflen;
+
+  int recvFlags = 0;
+  if (peek_) {
+    recvFlags |= MSG_PEEK;
+  }
+
+  ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
   if (bytes < 0) {
     if (errno == EAGAIN || errno == EWOULDBLOCK) {
       // No more data to read right now.
@@ -1236,6 +1293,12 @@ ssize_t AsyncSocket::performRead(void* buf, size_t buflen) {
   }
 }
 
+void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
+  // no matter what, buffer should be preapared for non-ssl socket
+  CHECK(readCallback_);
+  readCallback_->getReadBuffer(buf, buflen);
+}
+
 void AsyncSocket::handleRead() noexcept {
   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
           << ", state=" << state_;
@@ -1267,9 +1330,10 @@ void AsyncSocket::handleRead() noexcept {
   while (readCallback_ && eventBase_ == originalEventBase) {
     // Get the buffer to read into.
     void* buf = nullptr;
-    size_t buflen = 0;
+    size_t buflen = 0, offset = 0;
     try {
-      readCallback_->getReadBuffer(&buf, &buflen);
+      prepareReadBuffer(&buf, &buflen);
+      VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
     } catch (const AsyncSocketException& ex) {
       return failRead(__func__, ex);
     } catch (const std::exception& ex) {
@@ -1284,7 +1348,7 @@ void AsyncSocket::handleRead() noexcept {
                              "non-exception type");
       return failRead(__func__, ex);
     }
-    if (buf == nullptr || buflen == 0) {
+    if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
       AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
                              "ReadCallback::getReadBuffer() returned "
                              "empty buffer");
@@ -1292,9 +1356,23 @@ void AsyncSocket::handleRead() noexcept {
     }
 
     // Perform the read
-    ssize_t bytesRead = performRead(buf, buflen);
+    ssize_t bytesRead = performRead(&buf, &buflen, &offset);
+    VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
+            << bytesRead << " bytes";
     if (bytesRead > 0) {
-      readCallback_->readDataAvailable(bytesRead);
+      if (!isBufferMovable_) {
+        readCallback_->readDataAvailable(bytesRead);
+      } else {
+        CHECK(kOpenSslModeMoveBufferOwnership);
+        VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
+                << "buf=" << buf << ", " << bytesRead << "/" << buflen
+                << ", offset=" << offset;
+        auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
+        readBuf->trimStart(offset);
+        readBuf->trimEnd(buflen - offset - bytesRead);
+        readCallback_->readBufferAvailable(std::move(readBuf));
+      }
+
       // Fall through and continue around the loop if the read
       // completely filled the available buffer.
       // Note that readCallback_ may have been uninstalled or changed inside
@@ -1306,11 +1384,13 @@ void AsyncSocket::handleRead() noexcept {
         // No more data to read right now.
         return;
     } else if (bytesRead == READ_ERROR) {
+      readErr_ = READ_ERROR;
       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
                              withAddr("recv() failed"), errno);
       return failRead(__func__, ex);
     } else {
       assert(bytesRead == READ_EOF);
+      readErr_ = READ_EOF;
       // EOF
       shutdownFlags_ |= SHUT_READ;
       if (!updateEventRegistration(0, EventHandler::READ)) {
@@ -1326,6 +1406,11 @@ void AsyncSocket::handleRead() noexcept {
       return;
     }
     if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
+      if (readCallback_ != nullptr) {
+        // We might still have data in the socket.
+        // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
+        scheduleImmediateRead();
+      }
       return;
     }
   }
@@ -1360,20 +1445,11 @@ void AsyncSocket::handleWrite() noexcept {
   // (See the comment in handleRead() explaining how this can happen.)
   EventBase* originalEventBase = eventBase_;
   while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
-    uint32_t countWritten;
-    uint32_t partialWritten;
-    WriteFlags writeFlags = writeReqHead_->flags();
-    if (writeReqHead_->getNext() != nullptr) {
-      writeFlags = writeFlags | WriteFlags::CORK;
-    }
-    int bytesWritten = performWrite(writeReqHead_->getOps(),
-                                    writeReqHead_->getOpCount(),
-                                    writeFlags, &countWritten, &partialWritten);
-    if (bytesWritten < 0) {
+    if (!writeReqHead_->performWrite()) {
       AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
                              withAddr("writev() failed"), errno);
       return failWrite(__func__, ex);
-    } else if (countWritten == writeReqHead_->getOpCount()) {
+    } else if (writeReqHead_->isComplete()) {
       // We finished this request
       WriteRequest* req = writeReqHead_;
       writeReqHead_ = req->getNext();
@@ -1438,7 +1514,10 @@ void AsyncSocket::handleWrite() noexcept {
       // We'll continue around the loop, trying to write another request
     } else {
       // Partial write.
-      writeReqHead_->consume(countWritten, partialWritten, bytesWritten);
+      if (bufferCallback_) {
+        bufferCallback_->onEgressBuffered();
+      }
+      writeReqHead_->consume();
       // Stop after a partial write; it's highly likely that a subsequent write
       // attempt will just return EAGAIN.
       //
@@ -1461,6 +1540,9 @@ void AsyncSocket::handleWrite() noexcept {
       return;
     }
   }
+  if (!writeReqHead_ && bufferCallback_) {
+    bufferCallback_->onEgressBufferCleared();
+  }
 }
 
 void AsyncSocket::checkForImmediateRead() noexcept {
@@ -1578,13 +1660,7 @@ void AsyncSocket::handleConnect() noexcept {
   // callbacks (since the callbacks may call detachEventBase()).
   EventBase* originalEventBase = eventBase_;
 
-  // Call the connect callback.
-  if (connectCallback_) {
-    ConnectCallback* callback = connectCallback_;
-    connectCallback_ = nullptr;
-    callback->connectSuccess();
-  }
-
+  invokeConnectSuccess();
   // Note that the connect callback may have changed our state.
   // (set or unset the read callback, called write(), closed the socket, etc.)
   // The following code needs to handle these situations correctly.
@@ -1766,12 +1842,7 @@ void AsyncSocket::finishFail() {
 
   AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
                          withAddr("socket closing after error"));
-  if (connectCallback_) {
-    ConnectCallback* callback = connectCallback_;
-    connectCallback_ = nullptr;
-    callback->connectErr(ex);
-  }
-
+  invokeConnectErr(ex);
   failAllWrites(ex);
 
   if (readCallback_) {
@@ -1797,12 +1868,7 @@ void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) {
                << ex.what();
   startFail();
 
-  if (connectCallback_ != nullptr) {
-    ConnectCallback* callback = connectCallback_;
-    connectCallback_ = nullptr;
-    callback->connectErr(ex);
-  }
-
+  invokeConnectErr(ex);
   finishFail();
 }
 
@@ -1836,7 +1902,7 @@ void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
     WriteRequest* req = writeReqHead_;
     writeReqHead_ = req->getNext();
     WriteCallback* callback = req->getCallback();
-    uint32_t bytesWritten = req->getBytesWritten();
+    uint32_t bytesWritten = req->getTotalBytesWritten();
     req->destroy();
     if (callback) {
       callback->writeErr(bytesWritten, ex);
@@ -1873,7 +1939,7 @@ void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
     writeReqHead_ = req->getNext();
     WriteCallback* callback = req->getCallback();
     if (callback) {
-      callback->writeErr(req->getBytesWritten(), ex);
+      callback->writeErr(req->getTotalBytesWritten(), ex);
     }
     req->destroy();
   }
@@ -1892,6 +1958,7 @@ void AsyncSocket::invalidState(ConnectCallback* callback) {
 
   AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
                          "connect() called with socket in invalid state");
+  connectEndTime_ = std::chrono::steady_clock::now();
   if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
     if (callback) {
       callback->connectErr(ex);
@@ -1908,6 +1975,24 @@ void AsyncSocket::invalidState(ConnectCallback* callback) {
   }
 }
 
+void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
+  connectEndTime_ = std::chrono::steady_clock::now();
+  if (connectCallback_) {
+    ConnectCallback* callback = connectCallback_;
+    connectCallback_ = nullptr;
+    callback->connectErr(ex);
+  }
+}
+
+void AsyncSocket::invokeConnectSuccess() {
+  connectEndTime_ = std::chrono::steady_clock::now();
+  if (connectCallback_) {
+    ConnectCallback* callback = connectCallback_;
+    connectCallback_ = nullptr;
+    callback->connectSuccess();
+  }
+}
+
 void AsyncSocket::invalidState(ReadCallback* callback) {
   VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
              << "): setReadCallback(" << callback
@@ -1979,4 +2064,8 @@ std::string AsyncSocket::withAddr(const std::string& s) {
   return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
 }
 
+void AsyncSocket::setBufferCallback(BufferCallback* cb) {
+  bufferCallback_ = cb;
+}
+
 } // folly