Fix zerocopy AsyncSocket memory leaks
[folly.git] / folly / io / async / test / ZeroCopyBenchmark.cpp
index f7dd422dad3e10f029272000e5828f62c6d16c35..ce734703ca2f13835a9ffcdb458900b855798625 100644 (file)
  */
 
 #include <folly/Benchmark.h>
-
-#include <folly/ExceptionWrapper.h>
-#include <folly/SocketAddress.h>
-#include <folly/io/IOBufQueue.h>
-#include <folly/io/async/AsyncServerSocket.h>
-#include <folly/io/async/AsyncSocket.h>
-#include <folly/io/async/EventBase.h>
-
+#include <folly/io/async/test/ZeroCopy.h>
 #include <folly/portability/GFlags.h>
 
 using namespace folly;
-
-class TestAsyncSocket {
- public:
-  explicit TestAsyncSocket(
-      folly::EventBase* evb,
-      int numLoops,
-      size_t bufferSize,
-      bool zeroCopy)
-      : evb_(evb),
-        numLoops_(numLoops),
-        sock_(new folly::AsyncSocket(evb)),
-        callback_(this),
-        client_(true) {
-    setBufferSize(bufferSize);
-    setZeroCopy(zeroCopy);
-  }
-
-  explicit TestAsyncSocket(
-      folly::EventBase* evb,
-      int fd,
-      int numLoops,
-      size_t bufferSize,
-      bool zeroCopy)
-      : evb_(evb),
-        numLoops_(numLoops),
-        sock_(new folly::AsyncSocket(evb, fd)),
-        callback_(this),
-        client_(false) {
-    setBufferSize(bufferSize);
-    setZeroCopy(zeroCopy);
-    // enable reads
-    if (sock_) {
-      sock_->setReadCB(&callback_);
-    }
-  }
-
-  ~TestAsyncSocket() {
-    clearBuffers();
-  }
-
-  void connect(const folly::SocketAddress& remote) {
-    if (sock_) {
-      sock_->connect(&callback_, remote);
-    }
-  }
-
- private:
-  void setZeroCopy(bool enable) {
-    zeroCopy_ = enable;
-    if (sock_) {
-      sock_->setZeroCopy(zeroCopy_);
-    }
-  }
-
-  void setBufferSize(size_t bufferSize) {
-    clearBuffers();
-    bufferSize_ = bufferSize;
-
-    readBuffer_ = new char[bufferSize_];
-  }
-
-  class Callback : public folly::AsyncSocket::ReadCallback,
-                   public folly::AsyncSocket::ConnectCallback {
-   public:
-    explicit Callback(TestAsyncSocket* parent) : parent_(parent) {}
-
-    void connectSuccess() noexcept override {
-      parent_->sock_->setReadCB(this);
-      parent_->onConnected();
-    }
-
-    void connectErr(const folly::AsyncSocketException& ex) noexcept override {
-      LOG(ERROR) << "Connect error: " << ex.what();
-      parent_->onDataFinish(folly::exception_wrapper(ex));
-    }
-
-    void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
-      parent_->getReadBuffer(bufReturn, lenReturn);
-    }
-
-    void readDataAvailable(size_t len) noexcept override {
-      parent_->readDataAvailable(len);
-    }
-
-    void readEOF() noexcept override {
-      parent_->onDataFinish(folly::exception_wrapper());
-    }
-
-    void readErr(const folly::AsyncSocketException& ex) noexcept override {
-      parent_->onDataFinish(folly::exception_wrapper(ex));
-    }
-
-   private:
-    TestAsyncSocket* parent_{nullptr};
-  };
-
-  void clearBuffers() {
-    if (readBuffer_) {
-      delete[] readBuffer_;
-    }
-  }
-
-  void getReadBuffer(void** bufReturn, size_t* lenReturn) {
-    *bufReturn = readBuffer_ + readOffset_;
-    *lenReturn = bufferSize_ - readOffset_;
-  }
-
-  void readDataAvailable(size_t len) noexcept {
-    readOffset_ += len;
-    if (readOffset_ == bufferSize_) {
-      readOffset_ = 0;
-      onDataReady();
-    }
-  }
-
-  void onConnected() {
-    setZeroCopy(zeroCopy_);
-    writeBuffer();
-  }
-
-  void onDataReady() {
-    currLoop_++;
-    if (client_ && currLoop_ >= numLoops_) {
-      evb_->terminateLoopSoon();
-      return;
-    }
-    writeBuffer();
-  }
-
-  void onDataFinish(folly::exception_wrapper) {
-    if (client_) {
-      evb_->terminateLoopSoon();
-    }
-  }
-
-  bool writeBuffer() {
-    // use calloc to make sure the memory is touched
-    // if the memory is just malloc'd, running the zeroCopyOn
-    // and the zeroCopyOff back to back on a system that does not support
-    // zerocopy leads to the second test being much slower
-    writeBuffer_ =
-        folly::IOBuf::takeOwnership(::calloc(1, bufferSize_), bufferSize_);
-
-    if (sock_ && writeBuffer_) {
-      sock_->writeChain(
-          nullptr,
-          std::move(writeBuffer_),
-          zeroCopy_ ? WriteFlags::WRITE_MSG_ZEROCOPY : WriteFlags::NONE);
-    }
-
-    return true;
-  }
-
-  folly::EventBase* evb_;
-  int numLoops_{0};
-  int currLoop_{0};
-  bool zeroCopy_{false};
-
-  folly::AsyncSocket::UniquePtr sock_;
-  Callback callback_;
-
-  size_t bufferSize_{0};
-  size_t readOffset_{0};
-  char* readBuffer_{nullptr};
-  std::unique_ptr<folly::IOBuf> writeBuffer_;
-
-  bool client_;
-};
-
-class TestServer : public folly::AsyncServerSocket::AcceptCallback {
- public:
-  explicit TestServer(
-      folly::EventBase* evb,
-      int numLoops,
-      size_t bufferSize,
-      bool zeroCopy)
-      : evb_(evb),
-        numLoops_(numLoops),
-        bufferSize_(bufferSize),
-        zeroCopy_(zeroCopy) {}
-
-  void addCallbackToServerSocket(folly::AsyncServerSocket& sock) {
-    sock.addAcceptCallback(this, evb_);
-  }
-
-  void connectionAccepted(
-      int fd,
-      const folly::SocketAddress& /* unused */) noexcept override {
-    auto client = std::make_shared<TestAsyncSocket>(
-        evb_, fd, numLoops_, bufferSize_, zeroCopy_);
-    clients_[client.get()] = client;
-  }
-
-  void acceptError(const std::exception&) noexcept override {}
-
- private:
-  folly::EventBase* evb_;
-  int numLoops_;
-  size_t bufferSize_;
-  bool zeroCopy_;
-  std::unique_ptr<TestAsyncSocket> client_;
-  std::unordered_map<TestAsyncSocket*, std::shared_ptr<TestAsyncSocket>>
-      clients_;
-};
-
-class Test {
- public:
-  explicit Test(int numLoops, bool zeroCopy, size_t bufferSize)
-      : numLoops_(numLoops),
-        zeroCopy_(zeroCopy),
-        bufferSize_(bufferSize),
-        client_(new TestAsyncSocket(&evb_, numLoops_, bufferSize_, zeroCopy)),
-        listenSock_(new folly::AsyncServerSocket(&evb_)),
-        server_(&evb_, numLoops_, bufferSize_, zeroCopy) {
-    if (listenSock_) {
-      server_.addCallbackToServerSocket(*listenSock_);
-    }
-  }
-
-  void run() {
-    evb_.runInEventBaseThread([this]() {
-
-      if (listenSock_) {
-        listenSock_->bind(0);
-        listenSock_->setZeroCopy(zeroCopy_);
-        listenSock_->listen(10);
-        listenSock_->startAccepting();
-
-        connectOne();
-      }
-    });
-
-    evb_.loopForever();
-  }
-
- private:
-  void connectOne() {
-    SocketAddress addr = listenSock_->getAddress();
-    client_->connect(addr);
-  }
-
-  int numLoops_;
-  bool zeroCopy_;
-  size_t bufferSize_;
-
-  EventBase evb_;
-  std::unique_ptr<TestAsyncSocket> client_;
-  folly::AsyncServerSocket::UniquePtr listenSock_;
-  TestServer server_;
-};
-
+namespace {
 void runClient(
     const std::string& host,
     uint16_t port,
@@ -288,8 +31,8 @@ void runClient(
             << " bufferSize = " << bufferSize;
 
   EventBase evb;
-  std::unique_ptr<TestAsyncSocket> client(
-      new TestAsyncSocket(&evb, numLoops, bufferSize, zeroCopy));
+  std::unique_ptr<ZeroCopyTestAsyncSocket> client(
+      new ZeroCopyTestAsyncSocket(&evb, numLoops, bufferSize, zeroCopy));
   SocketAddress addr(host, port);
   evb.runInEventBaseThread([&]() { client->connect(addr); });
 
@@ -303,7 +46,7 @@ void runServer(uint16_t port, int numLoops, bool zeroCopy, size_t bufferSize) {
   EventBase evb;
   folly::AsyncServerSocket::UniquePtr listenSock(
       new folly::AsyncServerSocket(&evb));
-  TestServer server(&evb, numLoops, bufferSize, zeroCopy);
+  ZeroCopyTestServer server(&evb, numLoops, bufferSize, zeroCopy);
 
   server.addCallbackToServerSocket(*listenSock);
 
@@ -316,16 +59,17 @@ void runServer(uint16_t port, int numLoops, bool zeroCopy, size_t bufferSize) {
 
   evb.loopForever();
 }
+} // namespace
 
-static auto constexpr kMaxLoops = 200000;
+static auto constexpr kMaxLoops = 20000;
 
 void zeroCopyOn(unsigned /* unused */, size_t bufferSize) {
-  Test test(kMaxLoops, true, bufferSize);
+  ZeroCopyTest test(kMaxLoops, true, bufferSize);
   test.run();
 }
 
 void zeroCopyOff(unsigned /* unused */, size_t bufferSize) {
-  Test test(kMaxLoops, false, bufferSize);
+  ZeroCopyTest test(kMaxLoops, false, bufferSize);
   test.run();
 }