From 8f4003a8d3b8d95215238d3fd52f85e87d591c0e Mon Sep 17 00:00:00 2001 From: Dan Melnic Date: Thu, 26 Oct 2017 08:37:29 -0700 Subject: [PATCH] Disable zerocopy if we're notified about deferred copies, add a isZeroCopyWriteInProgress method, replace pair with a proper struct Summary: Add zeroWriteDone callback Reviewed By: djwatson Differential Revision: D6097129 fbshipit-source-id: b82a942557680c3a7a3be8f81ee6f2886e99e165 --- folly/io/async/AsyncSocket.cpp | 63 ++++++++++++++++++++-------------- folly/io/async/AsyncSocket.h | 31 +++++++++++------ folly/portability/Sockets.h | 4 +++ 3 files changed, 61 insertions(+), 37 deletions(-) diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp index 86cf3b77..b8349687 100644 --- a/folly/io/async/AsyncSocket.cpp +++ b/folly/io/async/AsyncSocket.cpp @@ -114,16 +114,16 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest { if (bytesWritten_) { if (socket_->isZeroCopyRequest(writeFlags)) { if (isComplete()) { - socket_->addZeroCopyBuff(std::move(ioBuf_)); + socket_->addZeroCopyBuf(std::move(ioBuf_)); } else { - socket_->addZeroCopyBuff(ioBuf_.get()); + socket_->addZeroCopyBuf(ioBuf_.get()); } } else { // this happens if at least one of the prev requests were sent // with zero copy but not the last one if (isComplete() && socket_->getZeroCopy() && - socket_->containsZeroCopyBuff(ioBuf_.get())) { - socket_->setZeroCopyBuff(std::move(ioBuf_)); + socket_->containsZeroCopyBuf(ioBuf_.get())) { + socket_->setZeroCopyBuf(std::move(ioBuf_)); } } } @@ -891,46 +891,45 @@ void AsyncSocket::adjustZeroCopyFlags( } } -void AsyncSocket::addZeroCopyBuff(std::unique_ptr&& buf) { +void AsyncSocket::addZeroCopyBuf(std::unique_ptr&& buf) { uint32_t id = getNextZeroCopyBuffId(); folly::IOBuf* ptr = buf.get(); idZeroCopyBufPtrMap_[id] = ptr; - auto& p = idZeroCopyBufPtrToBufMap_[ptr]; - p.first++; - CHECK(p.second.get() == nullptr); - p.second = std::move(buf); + auto& p = idZeroCopyBufInfoMap_[ptr]; + p.count_++; + CHECK(p.buf_.get() == nullptr); + p.buf_ = std::move(buf); } -void AsyncSocket::addZeroCopyBuff(folly::IOBuf* ptr) { +void AsyncSocket::addZeroCopyBuf(folly::IOBuf* ptr) { uint32_t id = getNextZeroCopyBuffId(); idZeroCopyBufPtrMap_[id] = ptr; - idZeroCopyBufPtrToBufMap_[ptr].first++; + idZeroCopyBufInfoMap_[ptr].count_++; } -void AsyncSocket::releaseZeroCopyBuff(uint32_t id) { +void AsyncSocket::releaseZeroCopyBuf(uint32_t id) { auto iter = idZeroCopyBufPtrMap_.find(id); CHECK(iter != idZeroCopyBufPtrMap_.end()); auto ptr = iter->second; - auto iter1 = idZeroCopyBufPtrToBufMap_.find(ptr); - CHECK(iter1 != idZeroCopyBufPtrToBufMap_.end()); - if (0 == --iter1->second.first) { - idZeroCopyBufPtrToBufMap_.erase(iter1); + auto iter1 = idZeroCopyBufInfoMap_.find(ptr); + CHECK(iter1 != idZeroCopyBufInfoMap_.end()); + if (0 == --iter1->second.count_) { + idZeroCopyBufInfoMap_.erase(iter1); } } -void AsyncSocket::setZeroCopyBuff(std::unique_ptr&& buf) { +void AsyncSocket::setZeroCopyBuf(std::unique_ptr&& buf) { folly::IOBuf* ptr = buf.get(); - auto& p = idZeroCopyBufPtrToBufMap_[ptr]; - CHECK(p.second.get() == nullptr); + auto& p = idZeroCopyBufInfoMap_[ptr]; + CHECK(p.buf_.get() == nullptr); - p.second = std::move(buf); + p.buf_ = std::move(buf); } -bool AsyncSocket::containsZeroCopyBuff(folly::IOBuf* ptr) { - return ( - idZeroCopyBufPtrToBufMap_.find(ptr) != idZeroCopyBufPtrToBufMap_.end()); +bool AsyncSocket::containsZeroCopyBuf(folly::IOBuf* ptr) { + return (idZeroCopyBufInfoMap_.find(ptr) != idZeroCopyBufInfoMap_.end()); } bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const { @@ -953,9 +952,16 @@ void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) { reinterpret_cast(CMSG_DATA(&cmsg)); uint32_t hi = serr->ee_data; uint32_t lo = serr->ee_info; + // disable zero copy if the buffer was actually copied + if ((serr->ee_code & SO_EE_CODE_ZEROCOPY_COPIED) && zeroCopyEnabled_) { + VLOG(2) << "AsyncSocket::processZeroCopyMsg(): setting " + << "zeroCopyEnabled_ = false due to SO_EE_CODE_ZEROCOPY_COPIED " + << "on " << fd_; + zeroCopyEnabled_ = false; + } for (uint32_t i = lo; i <= hi; i++) { - releaseZeroCopyBuff(i); + releaseZeroCopyBuf(i); } #endif } @@ -1052,7 +1058,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, } else if (countWritten == count) { // done, add the whole buffer if (isZeroCopyRequest(flags)) { - addZeroCopyBuff(std::move(ioBuf)); + addZeroCopyBuf(std::move(ioBuf)); } // We successfully wrote everything. // Invoke the callback and return. @@ -1063,7 +1069,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, } else { // continue writing the next writeReq // add just the ptr if (isZeroCopyRequest(flags)) { - addZeroCopyBuff(ioBuf.get()); + addZeroCopyBuf(ioBuf.get()); } if (bufferCallback_) { bufferCallback_->onEgressBuffered(); @@ -1509,6 +1515,11 @@ void AsyncSocket::cachePeerAddress() const { } } +bool AsyncSocket::isZeroCopyWriteInProgress() const noexcept { + eventBase_->dcheckIsInEventBaseThread(); + return (!idZeroCopyBufPtrMap_.empty()); +} + void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const { cacheLocalAddress(); *address = localAddr_; diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h index 35a2149b..5687e6fa 100644 --- a/folly/io/async/AsyncSocket.h +++ b/folly/io/async/AsyncSocket.h @@ -808,6 +808,12 @@ class AsyncSocket : virtual public AsyncTransportWrapper { */ void cacheAddresses(); + /** + * Returns true if there is any zero copy write in progress + * Needs to be called from within the socket's EVB thread + */ + bool isZeroCopyWriteInProgress() const noexcept; + /** * writeReturn is the total number of bytes written, or WRITE_ERROR on error. * If no data has been written, 0 is returned. @@ -1157,22 +1163,25 @@ class AsyncSocket : virtual public AsyncTransportWrapper { const iovec* vec, uint32_t count, folly::WriteFlags& flags); - void addZeroCopyBuff(std::unique_ptr&& buf); - void addZeroCopyBuff(folly::IOBuf* ptr); - void setZeroCopyBuff(std::unique_ptr&& buf); - bool containsZeroCopyBuff(folly::IOBuf* ptr); - void releaseZeroCopyBuff(uint32_t id); + void addZeroCopyBuf(std::unique_ptr&& buf); + void addZeroCopyBuf(folly::IOBuf* ptr); + void setZeroCopyBuf(std::unique_ptr&& buf); + bool containsZeroCopyBuf(folly::IOBuf* ptr); + void releaseZeroCopyBuf(uint32_t id); // a folly::IOBuf can be used in multiple partial requests - // so we keep a map that maps a buffer id to a raw folly::IOBuf ptr - // and one more map that adds a ref count for a folly::IOBuf that is either + // there is a that maps a buffer id to a raw folly::IOBuf ptr + // and another one that adds a ref count for a folly::IOBuf that is either // the original ptr or nullptr uint32_t zeroCopyBuffId_{0}; + + struct IOBufInfo { + uint32_t count_{0}; + std::unique_ptr buf_; + }; + std::unordered_map idZeroCopyBufPtrMap_; - std::unordered_map< - folly::IOBuf*, - std::pair>> - idZeroCopyBufPtrToBufMap_; + std::unordered_map idZeroCopyBufInfoMap_; StateEnum state_; ///< StateEnum describing current state uint8_t shutdownFlags_; ///< Shutdown state (ShutdownFlags) diff --git a/folly/portability/Sockets.h b/folly/portability/Sockets.h index 314029ea..b78ddaba 100755 --- a/folly/portability/Sockets.h +++ b/folly/portability/Sockets.h @@ -35,6 +35,10 @@ #define SO_EE_ORIGIN_ZEROCOPY 5 #endif +#ifndef SO_EE_CODE_ZEROCOPY_COPIED +#define SO_EE_CODE_ZEROCOPY_COPIED 1 +#endif + #ifndef SO_ZEROCOPY #define SO_ZEROCOPY 60 #endif -- 2.34.1