From 23e45679ddec0cc620ee6fedbb7891e488669bdd Mon Sep 17 00:00:00 2001 From: Dan Melnic Date: Thu, 14 Dec 2017 19:59:27 -0800 Subject: [PATCH] Fix zerocopy AsyncSocket memory leaks Summary: We currently leak ptr entries due to a missing map erase call in AsyncSocket::releaseZeroCopyBuf. Also addZeroCopyBuf called when the bytesWritten == 0 will cause and extra id to be allocated. This will delay the deletion of the last IOBuf sent via zerocopy. The number of buffers can accumulate over time if the same bytesWritten == 0 situation is encountered. Found when running the nbd-server zero downtime upgrade during heavy fio traffic. Add an AsyncSocket::processZeroCopyWriteInProgress so we can drain the zerocopy msg error queue even after we set the ReadCallback to nullptr. Reviewed By: djwatson Differential Revision: D6552982 fbshipit-source-id: 3d2fdca84ec3b5fc46c3bed06c0c9ede66ed565a --- folly/io/async/AsyncSocket.cpp | 17 +- folly/io/async/AsyncSocket.h | 6 + folly/io/async/test/ZeroCopy.cpp | 52 ++++ folly/io/async/test/ZeroCopy.h | 260 ++++++++++++++++++++ folly/io/async/test/ZeroCopyBenchmark.cpp | 274 +--------------------- folly/io/async/test/ZeroCopyTest.cpp | 29 +++ 6 files changed, 371 insertions(+), 267 deletions(-) create mode 100644 folly/io/async/test/ZeroCopy.cpp create mode 100644 folly/io/async/test/ZeroCopy.h create mode 100644 folly/io/async/test/ZeroCopyTest.cpp diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp index 2d1e76cb..883ee1d7 100644 --- a/folly/io/async/AsyncSocket.cpp +++ b/folly/io/async/AsyncSocket.cpp @@ -907,6 +907,8 @@ void AsyncSocket::releaseZeroCopyBuf(uint32_t id) { if (0 == --iter1->second.count_) { idZeroCopyBufInfoMap_.erase(iter1); } + + idZeroCopyBufPtrMap_.erase(iter); } void AsyncSocket::setZeroCopyBuf(std::unique_ptr&& buf) { @@ -1046,7 +1048,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, return failWrite(__func__, callback, 0, ex); } else if (countWritten == count) { // done, add the whole buffer - if (isZeroCopyRequest(flags)) { + if (countWritten && isZeroCopyRequest(flags)) { addZeroCopyBuf(std::move(ioBuf)); } // We successfully wrote everything. @@ -1057,7 +1059,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, return; } else { // continue writing the next writeReq // add just the ptr - if (isZeroCopyRequest(flags)) { + if (bytesWritten && isZeroCopyRequest(flags)) { addZeroCopyBuf(ioBuf.get()); } if (bufferCallback_) { @@ -1805,6 +1807,17 @@ void AsyncSocket::handleErrMessages() noexcept { #endif // FOLLY_HAVE_MSG_ERRQUEUE } +bool AsyncSocket::processZeroCopyWriteInProgress() noexcept { + eventBase_->dcheckIsInEventBaseThread(); + if (idZeroCopyBufPtrMap_.empty()) { + return true; + } + + handleErrMessages(); + + return idZeroCopyBufPtrMap_.empty(); +} + void AsyncSocket::handleRead() noexcept { VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_ << ", state=" << state_; diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h index 17980bb7..44ae5968 100644 --- a/folly/io/async/AsyncSocket.h +++ b/folly/io/async/AsyncSocket.h @@ -809,6 +809,12 @@ class AsyncSocket : virtual public AsyncTransportWrapper { */ bool isZeroCopyWriteInProgress() const noexcept; + /** + * Tries to process the msg error queue + * And returns true if there are no more zero copy writes in progress + */ + bool processZeroCopyWriteInProgress() noexcept; + /** * writeReturn is the total number of bytes written, or WRITE_ERROR on error. * If no data has been written, 0 is returned. diff --git a/folly/io/async/test/ZeroCopy.cpp b/folly/io/async/test/ZeroCopy.cpp new file mode 100644 index 00000000..c06badd5 --- /dev/null +++ b/folly/io/async/test/ZeroCopy.cpp @@ -0,0 +1,52 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace folly { + +// ZeroCopyTest +ZeroCopyTest::ZeroCopyTest(int numLoops, bool zeroCopy, size_t bufferSize) + : numLoops_(numLoops), + zeroCopy_(zeroCopy), + bufferSize_(bufferSize), + client_( + new ZeroCopyTestAsyncSocket(&evb_, numLoops_, bufferSize_, zeroCopy)), + listenSock_(new folly::AsyncServerSocket(&evb_)), + server_(&evb_, numLoops_, bufferSize_, zeroCopy) { + if (listenSock_) { + server_.addCallbackToServerSocket(*listenSock_); + } +} + +bool ZeroCopyTest::run() { + evb_.runInEventBaseThread([this]() { + if (listenSock_) { + listenSock_->bind(0); + listenSock_->setZeroCopy(zeroCopy_); + listenSock_->listen(10); + listenSock_->startAccepting(); + + connectOne(); + } + }); + + evb_.loopForever(); + + return !client_->isZeroCopyWriteInProgress(); +} + +} // namespace folly diff --git a/folly/io/async/test/ZeroCopy.h b/folly/io/async/test/ZeroCopy.h new file mode 100644 index 00000000..3273fbb3 --- /dev/null +++ b/folly/io/async/test/ZeroCopy.h @@ -0,0 +1,260 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace folly { + +class ZeroCopyTestAsyncSocket { + public: + explicit ZeroCopyTestAsyncSocket( + folly::EventBase* evb, + int numLoops, + size_t bufferSize, + bool zeroCopy) + : evb_(evb), + numLoops_(numLoops), + sock_(new folly::AsyncSocket(evb)), + callback_(this), + client_(true) { + setBufferSize(bufferSize); + setZeroCopy(zeroCopy); + } + + explicit ZeroCopyTestAsyncSocket( + folly::EventBase* evb, + int fd, + int numLoops, + size_t bufferSize, + bool zeroCopy) + : evb_(evb), + numLoops_(numLoops), + sock_(new folly::AsyncSocket(evb, fd)), + callback_(this), + client_(false) { + setBufferSize(bufferSize); + setZeroCopy(zeroCopy); + // enable reads + if (sock_) { + sock_->setReadCB(&callback_); + } + } + + ~ZeroCopyTestAsyncSocket() { + clearBuffers(); + } + + void connect(const folly::SocketAddress& remote) { + if (sock_) { + sock_->connect(&callback_, remote); + } + } + + bool isZeroCopyWriteInProgress() const { + return sock_->isZeroCopyWriteInProgress(); + } + + private: + void setZeroCopy(bool enable) { + zeroCopy_ = enable; + if (sock_) { + sock_->setZeroCopy(zeroCopy_); + } + } + + void setBufferSize(size_t bufferSize) { + clearBuffers(); + bufferSize_ = bufferSize; + + readBuffer_ = new char[bufferSize_]; + } + + class Callback : public folly::AsyncSocket::ReadCallback, + public folly::AsyncSocket::ConnectCallback { + public: + explicit Callback(ZeroCopyTestAsyncSocket* parent) : parent_(parent) {} + + void connectSuccess() noexcept override { + parent_->sock_->setReadCB(this); + parent_->onConnected(); + } + + void connectErr(const folly::AsyncSocketException& ex) noexcept override { + LOG(ERROR) << "Connect error: " << ex.what(); + parent_->onDataFinish(folly::exception_wrapper(ex)); + } + + void getReadBuffer(void** bufReturn, size_t* lenReturn) override { + parent_->getReadBuffer(bufReturn, lenReturn); + } + + void readDataAvailable(size_t len) noexcept override { + parent_->readDataAvailable(len); + } + + void readEOF() noexcept override { + parent_->onDataFinish(folly::exception_wrapper()); + } + + void readErr(const folly::AsyncSocketException& ex) noexcept override { + parent_->onDataFinish(folly::exception_wrapper(ex)); + } + + private: + ZeroCopyTestAsyncSocket* parent_{nullptr}; + }; + + void clearBuffers() { + if (readBuffer_) { + delete[] readBuffer_; + } + } + + void getReadBuffer(void** bufReturn, size_t* lenReturn) { + *bufReturn = readBuffer_ + readOffset_; + *lenReturn = bufferSize_ - readOffset_; + } + + void readDataAvailable(size_t len) noexcept { + readOffset_ += len; + if (readOffset_ == bufferSize_) { + readOffset_ = 0; + onDataReady(); + } + } + + void onConnected() { + setZeroCopy(zeroCopy_); + writeBuffer(); + } + + void onDataReady() { + currLoop_++; + if (client_ && currLoop_ >= numLoops_) { + evb_->runInLoop( + [this] { evb_->terminateLoopSoon(); }, false /*thisIteration*/); + return; + } + writeBuffer(); + } + + void onDataFinish(folly::exception_wrapper) { + if (client_) { + evb_->terminateLoopSoon(); + } + } + + bool writeBuffer() { + // use calloc to make sure the memory is touched + // if the memory is just malloc'd, running the zeroCopyOn + // and the zeroCopyOff back to back on a system that does not support + // zerocopy leads to the second test being much slower + writeBuffer_ = + folly::IOBuf::takeOwnership(::calloc(1, bufferSize_), bufferSize_); + + if (sock_ && writeBuffer_) { + sock_->writeChain( + nullptr, + std::move(writeBuffer_), + zeroCopy_ ? WriteFlags::WRITE_MSG_ZEROCOPY : WriteFlags::NONE); + } + + return true; + } + + folly::EventBase* evb_; + int numLoops_{0}; + int currLoop_{0}; + bool zeroCopy_{false}; + + folly::AsyncSocket::UniquePtr sock_; + Callback callback_; + + size_t bufferSize_{0}; + size_t readOffset_{0}; + char* readBuffer_{nullptr}; + std::unique_ptr writeBuffer_; + + bool client_; +}; + +class ZeroCopyTestServer : public folly::AsyncServerSocket::AcceptCallback { + public: + explicit ZeroCopyTestServer( + folly::EventBase* evb, + int numLoops, + size_t bufferSize, + bool zeroCopy) + : evb_(evb), + numLoops_(numLoops), + bufferSize_(bufferSize), + zeroCopy_(zeroCopy) {} + + void addCallbackToServerSocket(folly::AsyncServerSocket& sock) { + sock.addAcceptCallback(this, evb_); + } + + void connectionAccepted( + int fd, + const folly::SocketAddress& /* unused */) noexcept override { + auto client = std::make_shared( + evb_, fd, numLoops_, bufferSize_, zeroCopy_); + clients_[client.get()] = client; + } + + void acceptError(const std::exception&) noexcept override {} + + private: + folly::EventBase* evb_; + int numLoops_; + size_t bufferSize_; + bool zeroCopy_; + std::unique_ptr client_; + std::unordered_map< + ZeroCopyTestAsyncSocket*, + std::shared_ptr> + clients_; +}; + +class ZeroCopyTest { + public: + explicit ZeroCopyTest(int numLoops, bool zeroCopy, size_t bufferSize); + bool run(); + + private: + void connectOne() { + SocketAddress addr = listenSock_->getAddress(); + client_->connect(addr); + } + + int numLoops_; + bool zeroCopy_; + size_t bufferSize_; + + EventBase evb_; + std::unique_ptr client_; + folly::AsyncServerSocket::UniquePtr listenSock_; + ZeroCopyTestServer server_; +}; + +} // namespace folly diff --git a/folly/io/async/test/ZeroCopyBenchmark.cpp b/folly/io/async/test/ZeroCopyBenchmark.cpp index f7dd422d..ce734703 100644 --- a/folly/io/async/test/ZeroCopyBenchmark.cpp +++ b/folly/io/async/test/ZeroCopyBenchmark.cpp @@ -15,268 +15,11 @@ */ #include - -#include -#include -#include -#include -#include -#include - +#include #include using namespace folly; - -class TestAsyncSocket { - public: - explicit TestAsyncSocket( - folly::EventBase* evb, - int numLoops, - size_t bufferSize, - bool zeroCopy) - : evb_(evb), - numLoops_(numLoops), - sock_(new folly::AsyncSocket(evb)), - callback_(this), - client_(true) { - setBufferSize(bufferSize); - setZeroCopy(zeroCopy); - } - - explicit TestAsyncSocket( - folly::EventBase* evb, - int fd, - int numLoops, - size_t bufferSize, - bool zeroCopy) - : evb_(evb), - numLoops_(numLoops), - sock_(new folly::AsyncSocket(evb, fd)), - callback_(this), - client_(false) { - setBufferSize(bufferSize); - setZeroCopy(zeroCopy); - // enable reads - if (sock_) { - sock_->setReadCB(&callback_); - } - } - - ~TestAsyncSocket() { - clearBuffers(); - } - - void connect(const folly::SocketAddress& remote) { - if (sock_) { - sock_->connect(&callback_, remote); - } - } - - private: - void setZeroCopy(bool enable) { - zeroCopy_ = enable; - if (sock_) { - sock_->setZeroCopy(zeroCopy_); - } - } - - void setBufferSize(size_t bufferSize) { - clearBuffers(); - bufferSize_ = bufferSize; - - readBuffer_ = new char[bufferSize_]; - } - - class Callback : public folly::AsyncSocket::ReadCallback, - public folly::AsyncSocket::ConnectCallback { - public: - explicit Callback(TestAsyncSocket* parent) : parent_(parent) {} - - void connectSuccess() noexcept override { - parent_->sock_->setReadCB(this); - parent_->onConnected(); - } - - void connectErr(const folly::AsyncSocketException& ex) noexcept override { - LOG(ERROR) << "Connect error: " << ex.what(); - parent_->onDataFinish(folly::exception_wrapper(ex)); - } - - void getReadBuffer(void** bufReturn, size_t* lenReturn) override { - parent_->getReadBuffer(bufReturn, lenReturn); - } - - void readDataAvailable(size_t len) noexcept override { - parent_->readDataAvailable(len); - } - - void readEOF() noexcept override { - parent_->onDataFinish(folly::exception_wrapper()); - } - - void readErr(const folly::AsyncSocketException& ex) noexcept override { - parent_->onDataFinish(folly::exception_wrapper(ex)); - } - - private: - TestAsyncSocket* parent_{nullptr}; - }; - - void clearBuffers() { - if (readBuffer_) { - delete[] readBuffer_; - } - } - - void getReadBuffer(void** bufReturn, size_t* lenReturn) { - *bufReturn = readBuffer_ + readOffset_; - *lenReturn = bufferSize_ - readOffset_; - } - - void readDataAvailable(size_t len) noexcept { - readOffset_ += len; - if (readOffset_ == bufferSize_) { - readOffset_ = 0; - onDataReady(); - } - } - - void onConnected() { - setZeroCopy(zeroCopy_); - writeBuffer(); - } - - void onDataReady() { - currLoop_++; - if (client_ && currLoop_ >= numLoops_) { - evb_->terminateLoopSoon(); - return; - } - writeBuffer(); - } - - void onDataFinish(folly::exception_wrapper) { - if (client_) { - evb_->terminateLoopSoon(); - } - } - - bool writeBuffer() { - // use calloc to make sure the memory is touched - // if the memory is just malloc'd, running the zeroCopyOn - // and the zeroCopyOff back to back on a system that does not support - // zerocopy leads to the second test being much slower - writeBuffer_ = - folly::IOBuf::takeOwnership(::calloc(1, bufferSize_), bufferSize_); - - if (sock_ && writeBuffer_) { - sock_->writeChain( - nullptr, - std::move(writeBuffer_), - zeroCopy_ ? WriteFlags::WRITE_MSG_ZEROCOPY : WriteFlags::NONE); - } - - return true; - } - - folly::EventBase* evb_; - int numLoops_{0}; - int currLoop_{0}; - bool zeroCopy_{false}; - - folly::AsyncSocket::UniquePtr sock_; - Callback callback_; - - size_t bufferSize_{0}; - size_t readOffset_{0}; - char* readBuffer_{nullptr}; - std::unique_ptr writeBuffer_; - - bool client_; -}; - -class TestServer : public folly::AsyncServerSocket::AcceptCallback { - public: - explicit TestServer( - folly::EventBase* evb, - int numLoops, - size_t bufferSize, - bool zeroCopy) - : evb_(evb), - numLoops_(numLoops), - bufferSize_(bufferSize), - zeroCopy_(zeroCopy) {} - - void addCallbackToServerSocket(folly::AsyncServerSocket& sock) { - sock.addAcceptCallback(this, evb_); - } - - void connectionAccepted( - int fd, - const folly::SocketAddress& /* unused */) noexcept override { - auto client = std::make_shared( - evb_, fd, numLoops_, bufferSize_, zeroCopy_); - clients_[client.get()] = client; - } - - void acceptError(const std::exception&) noexcept override {} - - private: - folly::EventBase* evb_; - int numLoops_; - size_t bufferSize_; - bool zeroCopy_; - std::unique_ptr client_; - std::unordered_map> - clients_; -}; - -class Test { - public: - explicit Test(int numLoops, bool zeroCopy, size_t bufferSize) - : numLoops_(numLoops), - zeroCopy_(zeroCopy), - bufferSize_(bufferSize), - client_(new TestAsyncSocket(&evb_, numLoops_, bufferSize_, zeroCopy)), - listenSock_(new folly::AsyncServerSocket(&evb_)), - server_(&evb_, numLoops_, bufferSize_, zeroCopy) { - if (listenSock_) { - server_.addCallbackToServerSocket(*listenSock_); - } - } - - void run() { - evb_.runInEventBaseThread([this]() { - - if (listenSock_) { - listenSock_->bind(0); - listenSock_->setZeroCopy(zeroCopy_); - listenSock_->listen(10); - listenSock_->startAccepting(); - - connectOne(); - } - }); - - evb_.loopForever(); - } - - private: - void connectOne() { - SocketAddress addr = listenSock_->getAddress(); - client_->connect(addr); - } - - int numLoops_; - bool zeroCopy_; - size_t bufferSize_; - - EventBase evb_; - std::unique_ptr client_; - folly::AsyncServerSocket::UniquePtr listenSock_; - TestServer server_; -}; - +namespace { void runClient( const std::string& host, uint16_t port, @@ -288,8 +31,8 @@ void runClient( << " bufferSize = " << bufferSize; EventBase evb; - std::unique_ptr client( - new TestAsyncSocket(&evb, numLoops, bufferSize, zeroCopy)); + std::unique_ptr client( + new ZeroCopyTestAsyncSocket(&evb, numLoops, bufferSize, zeroCopy)); SocketAddress addr(host, port); evb.runInEventBaseThread([&]() { client->connect(addr); }); @@ -303,7 +46,7 @@ void runServer(uint16_t port, int numLoops, bool zeroCopy, size_t bufferSize) { EventBase evb; folly::AsyncServerSocket::UniquePtr listenSock( new folly::AsyncServerSocket(&evb)); - TestServer server(&evb, numLoops, bufferSize, zeroCopy); + ZeroCopyTestServer server(&evb, numLoops, bufferSize, zeroCopy); server.addCallbackToServerSocket(*listenSock); @@ -316,16 +59,17 @@ void runServer(uint16_t port, int numLoops, bool zeroCopy, size_t bufferSize) { evb.loopForever(); } +} // namespace -static auto constexpr kMaxLoops = 200000; +static auto constexpr kMaxLoops = 20000; void zeroCopyOn(unsigned /* unused */, size_t bufferSize) { - Test test(kMaxLoops, true, bufferSize); + ZeroCopyTest test(kMaxLoops, true, bufferSize); test.run(); } void zeroCopyOff(unsigned /* unused */, size_t bufferSize) { - Test test(kMaxLoops, false, bufferSize); + ZeroCopyTest test(kMaxLoops, false, bufferSize); test.run(); } diff --git a/folly/io/async/test/ZeroCopyTest.cpp b/folly/io/async/test/ZeroCopyTest.cpp new file mode 100644 index 00000000..11fe9206 --- /dev/null +++ b/folly/io/async/test/ZeroCopyTest.cpp @@ -0,0 +1,29 @@ +/* + * Copyright 2017 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +using namespace testing; +using namespace folly; + +static auto constexpr kMaxLoops = 20; +static auto constexpr kBufferSize = 4096; + +TEST(ZeroCopyTest, zero_copy_in_progress) { + ZeroCopyTest test(kMaxLoops, true, kBufferSize); + CHECK(test.run()); +} -- 2.34.1