Disable zerocopy if we're notified about deferred copies, add a isZeroCopyWriteInProg...
authorDan Melnic <dmm@fb.com>
Thu, 26 Oct 2017 15:37:29 +0000 (08:37 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Thu, 26 Oct 2017 15:51:01 +0000 (08:51 -0700)
Summary: Add zeroWriteDone callback

Reviewed By: djwatson

Differential Revision: D6097129

fbshipit-source-id: b82a942557680c3a7a3be8f81ee6f2886e99e165

folly/io/async/AsyncSocket.cpp
folly/io/async/AsyncSocket.h
folly/portability/Sockets.h

index 86cf3b7..b834968 100644 (file)
@@ -114,16 +114,16 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
     if (bytesWritten_) {
       if (socket_->isZeroCopyRequest(writeFlags)) {
         if (isComplete()) {
-          socket_->addZeroCopyBuff(std::move(ioBuf_));
+          socket_->addZeroCopyBuf(std::move(ioBuf_));
         } else {
-          socket_->addZeroCopyBuff(ioBuf_.get());
+          socket_->addZeroCopyBuf(ioBuf_.get());
         }
       } else {
         // this happens if at least one of the prev requests were sent
         // with zero copy but not the last one
         if (isComplete() && socket_->getZeroCopy() &&
-            socket_->containsZeroCopyBuff(ioBuf_.get())) {
-          socket_->setZeroCopyBuff(std::move(ioBuf_));
+            socket_->containsZeroCopyBuf(ioBuf_.get())) {
+          socket_->setZeroCopyBuf(std::move(ioBuf_));
         }
       }
     }
@@ -891,46 +891,45 @@ void AsyncSocket::adjustZeroCopyFlags(
   }
 }
 
-void AsyncSocket::addZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf) {
+void AsyncSocket::addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
   uint32_t id = getNextZeroCopyBuffId();
   folly::IOBuf* ptr = buf.get();
 
   idZeroCopyBufPtrMap_[id] = ptr;
-  auto& p = idZeroCopyBufPtrToBufMap_[ptr];
-  p.first++;
-  CHECK(p.second.get() == nullptr);
-  p.second = std::move(buf);
+  auto& p = idZeroCopyBufInfoMap_[ptr];
+  p.count_++;
+  CHECK(p.buf_.get() == nullptr);
+  p.buf_ = std::move(buf);
 }
 
-void AsyncSocket::addZeroCopyBuff(folly::IOBuf* ptr) {
+void AsyncSocket::addZeroCopyBuf(folly::IOBuf* ptr) {
   uint32_t id = getNextZeroCopyBuffId();
   idZeroCopyBufPtrMap_[id] = ptr;
 
-  idZeroCopyBufPtrToBufMap_[ptr].first++;
+  idZeroCopyBufInfoMap_[ptr].count_++;
 }
 
-void AsyncSocket::releaseZeroCopyBuff(uint32_t id) {
+void AsyncSocket::releaseZeroCopyBuf(uint32_t id) {
   auto iter = idZeroCopyBufPtrMap_.find(id);
   CHECK(iter != idZeroCopyBufPtrMap_.end());
   auto ptr = iter->second;
-  auto iter1 = idZeroCopyBufPtrToBufMap_.find(ptr);
-  CHECK(iter1 != idZeroCopyBufPtrToBufMap_.end());
-  if (0 == --iter1->second.first) {
-    idZeroCopyBufPtrToBufMap_.erase(iter1);
+  auto iter1 = idZeroCopyBufInfoMap_.find(ptr);
+  CHECK(iter1 != idZeroCopyBufInfoMap_.end());
+  if (0 == --iter1->second.count_) {
+    idZeroCopyBufInfoMap_.erase(iter1);
   }
 }
 
-void AsyncSocket::setZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf) {
+void AsyncSocket::setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
   folly::IOBuf* ptr = buf.get();
-  auto& p = idZeroCopyBufPtrToBufMap_[ptr];
-  CHECK(p.second.get() == nullptr);
+  auto& p = idZeroCopyBufInfoMap_[ptr];
+  CHECK(p.buf_.get() == nullptr);
 
-  p.second = std::move(buf);
+  p.buf_ = std::move(buf);
 }
 
-bool AsyncSocket::containsZeroCopyBuff(folly::IOBuf* ptr) {
-  return (
-      idZeroCopyBufPtrToBufMap_.find(ptr) != idZeroCopyBufPtrToBufMap_.end());
+bool AsyncSocket::containsZeroCopyBuf(folly::IOBuf* ptr) {
+  return (idZeroCopyBufInfoMap_.find(ptr) != idZeroCopyBufInfoMap_.end());
 }
 
 bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const {
@@ -953,9 +952,16 @@ void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) {
       reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
   uint32_t hi = serr->ee_data;
   uint32_t lo = serr->ee_info;
+  // disable zero copy if the buffer was actually copied
+  if ((serr->ee_code & SO_EE_CODE_ZEROCOPY_COPIED) && zeroCopyEnabled_) {
+    VLOG(2) << "AsyncSocket::processZeroCopyMsg(): setting "
+            << "zeroCopyEnabled_ = false due to SO_EE_CODE_ZEROCOPY_COPIED "
+            << "on " << fd_;
+    zeroCopyEnabled_ = false;
+  }
 
   for (uint32_t i = lo; i <= hi; i++) {
-    releaseZeroCopyBuff(i);
+    releaseZeroCopyBuf(i);
   }
 #endif
 }
@@ -1052,7 +1058,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
       } else if (countWritten == count) {
         // done, add the whole buffer
         if (isZeroCopyRequest(flags)) {
-          addZeroCopyBuff(std::move(ioBuf));
+          addZeroCopyBuf(std::move(ioBuf));
         }
         // We successfully wrote everything.
         // Invoke the callback and return.
@@ -1063,7 +1069,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
       } else { // continue writing the next writeReq
         // add just the ptr
         if (isZeroCopyRequest(flags)) {
-          addZeroCopyBuff(ioBuf.get());
+          addZeroCopyBuf(ioBuf.get());
         }
         if (bufferCallback_) {
           bufferCallback_->onEgressBuffered();
@@ -1509,6 +1515,11 @@ void AsyncSocket::cachePeerAddress() const {
   }
 }
 
+bool AsyncSocket::isZeroCopyWriteInProgress() const noexcept {
+  eventBase_->dcheckIsInEventBaseThread();
+  return (!idZeroCopyBufPtrMap_.empty());
+}
+
 void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
   cacheLocalAddress();
   *address = localAddr_;
index 35a2149..5687e6f 100644 (file)
@@ -808,6 +808,12 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
    */
   void cacheAddresses();
 
+  /**
+   * Returns true if there is any zero copy write in progress
+   * Needs to be called from within the socket's EVB thread
+   */
+  bool isZeroCopyWriteInProgress() const noexcept;
+
   /**
    * writeReturn is the total number of bytes written, or WRITE_ERROR on error.
    * If no data has been written, 0 is returned.
@@ -1157,22 +1163,25 @@ class AsyncSocket : virtual public AsyncTransportWrapper {
       const iovec* vec,
       uint32_t count,
       folly::WriteFlags& flags);
-  void addZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf);
-  void addZeroCopyBuff(folly::IOBuf* ptr);
-  void setZeroCopyBuff(std::unique_ptr<folly::IOBuf>&& buf);
-  bool containsZeroCopyBuff(folly::IOBuf* ptr);
-  void releaseZeroCopyBuff(uint32_t id);
+  void addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
+  void addZeroCopyBuf(folly::IOBuf* ptr);
+  void setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf);
+  bool containsZeroCopyBuf(folly::IOBuf* ptr);
+  void releaseZeroCopyBuf(uint32_t id);
 
   // a folly::IOBuf can be used in multiple partial requests
-  // so we keep a map that maps a buffer id to a raw folly::IOBuf ptr
-  // and one more map that adds a ref count for a folly::IOBuf that is either
+  // there is a that maps a buffer id to a raw folly::IOBuf ptr
+  // and another one that adds a ref count for a folly::IOBuf that is either
   // the original ptr or nullptr
   uint32_t zeroCopyBuffId_{0};
+
+  struct IOBufInfo {
+    uint32_t count_{0};
+    std::unique_ptr<folly::IOBuf> buf_;
+  };
+
   std::unordered_map<uint32_t, folly::IOBuf*> idZeroCopyBufPtrMap_;
-  std::unordered_map<
-      folly::IOBuf*,
-      std::pair<uint32_t, std::unique_ptr<folly::IOBuf>>>
-      idZeroCopyBufPtrToBufMap_;
+  std::unordered_map<folly::IOBuf*, IOBufInfo> idZeroCopyBufInfoMap_;
 
   StateEnum state_;                      ///< StateEnum describing current state
   uint8_t shutdownFlags_;                ///< Shutdown state (ShutdownFlags)
index 314029e..b78ddab 100755 (executable)
 #define SO_EE_ORIGIN_ZEROCOPY 5
 #endif
 
+#ifndef SO_EE_CODE_ZEROCOPY_COPIED
+#define SO_EE_CODE_ZEROCOPY_COPIED 1
+#endif
+
 #ifndef SO_ZEROCOPY
 #define SO_ZEROCOPY 60
 #endif