Remove the zerocopy write threshold support, add support for ENOBUFS
[folly.git] / folly / io / async / test / ZeroCopyBenchmark.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/Benchmark.h>
18
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBufQueue.h>
22 #include <folly/io/async/AsyncServerSocket.h>
23 #include <folly/io/async/AsyncSocket.h>
24 #include <folly/io/async/EventBase.h>
25
26 #include <folly/portability/GFlags.h>
27
28 using namespace folly;
29
30 class TestAsyncSocket {
31  public:
32   explicit TestAsyncSocket(
33       folly::EventBase* evb,
34       int numLoops,
35       size_t bufferSize,
36       bool zeroCopy)
37       : evb_(evb),
38         numLoops_(numLoops),
39         sock_(new folly::AsyncSocket(evb)),
40         callback_(this),
41         client_(true) {
42     setBufferSize(bufferSize);
43     setZeroCopy(zeroCopy);
44   }
45
46   explicit TestAsyncSocket(
47       folly::EventBase* evb,
48       int fd,
49       int numLoops,
50       size_t bufferSize,
51       bool zeroCopy)
52       : evb_(evb),
53         numLoops_(numLoops),
54         sock_(new folly::AsyncSocket(evb, fd)),
55         callback_(this),
56         client_(false) {
57     setBufferSize(bufferSize);
58     setZeroCopy(zeroCopy);
59     // enable reads
60     if (sock_) {
61       sock_->setReadCB(&callback_);
62     }
63   }
64
65   ~TestAsyncSocket() {
66     clearBuffers();
67   }
68
69   void connect(const folly::SocketAddress& remote) {
70     if (sock_) {
71       sock_->connect(&callback_, remote);
72     }
73   }
74
75  private:
76   void setZeroCopy(bool enable) {
77     zeroCopy_ = enable;
78     if (sock_) {
79       sock_->setZeroCopy(zeroCopy_);
80     }
81   }
82
83   void setBufferSize(size_t bufferSize) {
84     clearBuffers();
85     bufferSize_ = bufferSize;
86
87     readBuffer_ = new char[bufferSize_];
88   }
89
90   class Callback : public folly::AsyncSocket::ReadCallback,
91                    public folly::AsyncSocket::ConnectCallback {
92    public:
93     explicit Callback(TestAsyncSocket* parent) : parent_(parent) {}
94
95     void connectSuccess() noexcept override {
96       parent_->sock_->setReadCB(this);
97       parent_->onConnected();
98     }
99
100     void connectErr(const folly::AsyncSocketException& ex) noexcept override {
101       LOG(ERROR) << "Connect error: " << ex.what();
102       parent_->onDataFinish(folly::exception_wrapper(ex));
103     }
104
105     void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
106       parent_->getReadBuffer(bufReturn, lenReturn);
107     }
108
109     void readDataAvailable(size_t len) noexcept override {
110       parent_->readDataAvailable(len);
111     }
112
113     void readEOF() noexcept override {
114       parent_->onDataFinish(folly::exception_wrapper());
115     }
116
117     void readErr(const folly::AsyncSocketException& ex) noexcept override {
118       parent_->onDataFinish(folly::exception_wrapper(ex));
119     }
120
121    private:
122     TestAsyncSocket* parent_{nullptr};
123   };
124
125   void clearBuffers() {
126     if (readBuffer_) {
127       delete[] readBuffer_;
128     }
129   }
130
131   void getReadBuffer(void** bufReturn, size_t* lenReturn) {
132     *bufReturn = readBuffer_ + readOffset_;
133     *lenReturn = bufferSize_ - readOffset_;
134   }
135
136   void readDataAvailable(size_t len) noexcept {
137     readOffset_ += len;
138     if (readOffset_ == bufferSize_) {
139       readOffset_ = 0;
140       onDataReady();
141     }
142   }
143
144   void onConnected() {
145     setZeroCopy(zeroCopy_);
146     writeBuffer();
147   }
148
149   void onDataReady() {
150     currLoop_++;
151     if (client_ && currLoop_ >= numLoops_) {
152       evb_->terminateLoopSoon();
153       return;
154     }
155     writeBuffer();
156   }
157
158   void onDataFinish(folly::exception_wrapper) {
159     if (client_) {
160       evb_->terminateLoopSoon();
161     }
162   }
163
164   bool writeBuffer() {
165     // use calloc to make sure the memory is touched
166     // if the memory is just malloc'd, running the zeroCopyOn
167     // and the zeroCopyOff back to back on a system that does not support
168     // zerocopy leads to the second test being much slower
169     writeBuffer_ =
170         folly::IOBuf::takeOwnership(::calloc(1, bufferSize_), bufferSize_);
171
172     if (sock_ && writeBuffer_) {
173       sock_->writeChain(
174           nullptr,
175           std::move(writeBuffer_),
176           zeroCopy_ ? WriteFlags::WRITE_MSG_ZEROCOPY : WriteFlags::NONE);
177     }
178
179     return true;
180   }
181
182   folly::EventBase* evb_;
183   int numLoops_{0};
184   int currLoop_{0};
185   bool zeroCopy_{false};
186
187   folly::AsyncSocket::UniquePtr sock_;
188   Callback callback_;
189
190   size_t bufferSize_{0};
191   size_t readOffset_{0};
192   char* readBuffer_{nullptr};
193   std::unique_ptr<folly::IOBuf> writeBuffer_;
194
195   bool client_;
196 };
197
198 class TestServer : public folly::AsyncServerSocket::AcceptCallback {
199  public:
200   explicit TestServer(
201       folly::EventBase* evb,
202       int numLoops,
203       size_t bufferSize,
204       bool zeroCopy)
205       : evb_(evb),
206         numLoops_(numLoops),
207         bufferSize_(bufferSize),
208         zeroCopy_(zeroCopy) {}
209
210   void addCallbackToServerSocket(folly::AsyncServerSocket& sock) {
211     sock.addAcceptCallback(this, evb_);
212   }
213
214   void connectionAccepted(
215       int fd,
216       const folly::SocketAddress& /* unused */) noexcept override {
217     auto client = std::make_shared<TestAsyncSocket>(
218         evb_, fd, numLoops_, bufferSize_, zeroCopy_);
219     clients_[client.get()] = client;
220   }
221
222   void acceptError(const std::exception&) noexcept override {}
223
224  private:
225   folly::EventBase* evb_;
226   int numLoops_;
227   size_t bufferSize_;
228   bool zeroCopy_;
229   std::unique_ptr<TestAsyncSocket> client_;
230   std::unordered_map<TestAsyncSocket*, std::shared_ptr<TestAsyncSocket>>
231       clients_;
232 };
233
234 class Test {
235  public:
236   explicit Test(int numLoops, bool zeroCopy, size_t bufferSize)
237       : numLoops_(numLoops),
238         zeroCopy_(zeroCopy),
239         bufferSize_(bufferSize),
240         client_(new TestAsyncSocket(&evb_, numLoops_, bufferSize_, zeroCopy)),
241         listenSock_(new folly::AsyncServerSocket(&evb_)),
242         server_(&evb_, numLoops_, bufferSize_, zeroCopy) {
243     if (listenSock_) {
244       server_.addCallbackToServerSocket(*listenSock_);
245     }
246   }
247
248   void run() {
249     evb_.runInEventBaseThread([this]() {
250
251       if (listenSock_) {
252         listenSock_->bind(0);
253         listenSock_->setZeroCopy(zeroCopy_);
254         listenSock_->listen(10);
255         listenSock_->startAccepting();
256
257         connectOne();
258       }
259     });
260
261     evb_.loopForever();
262   }
263
264  private:
265   void connectOne() {
266     SocketAddress addr = listenSock_->getAddress();
267     client_->connect(addr);
268   }
269
270   int numLoops_;
271   bool zeroCopy_;
272   size_t bufferSize_;
273
274   EventBase evb_;
275   std::unique_ptr<TestAsyncSocket> client_;
276   folly::AsyncServerSocket::UniquePtr listenSock_;
277   TestServer server_;
278 };
279
280 void runClient(
281     const std::string& host,
282     uint16_t port,
283     int numLoops,
284     bool zeroCopy,
285     size_t bufferSize) {
286   LOG(INFO) << "Running client. host = " << host << " port = " << port
287             << " numLoops = " << numLoops << " zeroCopy = " << zeroCopy
288             << " bufferSize = " << bufferSize;
289
290   EventBase evb;
291   std::unique_ptr<TestAsyncSocket> client(
292       new TestAsyncSocket(&evb, numLoops, bufferSize, zeroCopy));
293   SocketAddress addr(host, port);
294   evb.runInEventBaseThread([&]() { client->connect(addr); });
295
296   evb.loopForever();
297 }
298
299 void runServer(uint16_t port, int numLoops, bool zeroCopy, size_t bufferSize) {
300   LOG(INFO) << "Running server. port = " << port << " numLoops = " << numLoops
301             << " zeroCopy = " << zeroCopy << " bufferSize = " << bufferSize;
302
303   EventBase evb;
304   folly::AsyncServerSocket::UniquePtr listenSock(
305       new folly::AsyncServerSocket(&evb));
306   TestServer server(&evb, numLoops, bufferSize, zeroCopy);
307
308   server.addCallbackToServerSocket(*listenSock);
309
310   evb.runInEventBaseThread([&]() {
311     listenSock->bind(port);
312     listenSock->setZeroCopy(zeroCopy);
313     listenSock->listen(10);
314     listenSock->startAccepting();
315   });
316
317   evb.loopForever();
318 }
319
320 static auto constexpr kMaxLoops = 200000;
321
322 void zeroCopyOn(unsigned /* unused */, size_t bufferSize) {
323   Test test(kMaxLoops, true, bufferSize);
324   test.run();
325 }
326
327 void zeroCopyOff(unsigned /* unused */, size_t bufferSize) {
328   Test test(kMaxLoops, false, bufferSize);
329   test.run();
330 }
331
332 BENCHMARK_PARAM(zeroCopyOn, 4096);
333 BENCHMARK_PARAM(zeroCopyOff, 4096);
334 BENCHMARK_DRAW_LINE()
335 BENCHMARK_PARAM(zeroCopyOn, 8192);
336 BENCHMARK_PARAM(zeroCopyOff, 8192);
337 BENCHMARK_DRAW_LINE()
338 BENCHMARK_PARAM(zeroCopyOn, 16384);
339 BENCHMARK_PARAM(zeroCopyOff, 16384);
340 BENCHMARK_DRAW_LINE()
341 BENCHMARK_PARAM(zeroCopyOn, 32768);
342 BENCHMARK_PARAM(zeroCopyOff, 32768);
343 BENCHMARK_DRAW_LINE()
344 BENCHMARK_PARAM(zeroCopyOn, 65536);
345 BENCHMARK_PARAM(zeroCopyOff, 65536);
346 BENCHMARK_DRAW_LINE()
347 BENCHMARK_PARAM(zeroCopyOn, 131072);
348 BENCHMARK_PARAM(zeroCopyOff, 131072);
349 BENCHMARK_DRAW_LINE()
350 BENCHMARK_PARAM(zeroCopyOn, 262144);
351 BENCHMARK_PARAM(zeroCopyOff, 262144);
352 BENCHMARK_DRAW_LINE()
353 BENCHMARK_PARAM(zeroCopyOn, 524288);
354 BENCHMARK_PARAM(zeroCopyOff, 524288);
355 BENCHMARK_DRAW_LINE()
356 BENCHMARK_PARAM(zeroCopyOn, 1048576);
357 BENCHMARK_PARAM(zeroCopyOff, 1048576);
358 BENCHMARK_DRAW_LINE()
359
360 DEFINE_bool(client, false, "client mode");
361 DEFINE_bool(server, false, "server mode");
362 DEFINE_bool(zeroCopy, false, "use zerocopy");
363 DEFINE_int32(numLoops, kMaxLoops, "number of loops");
364 DEFINE_int32(bufferSize, 524288, "buffer size");
365 DEFINE_int32(port, 33130, "port");
366 DEFINE_string(host, "::1", "host");
367
368 int main(int argc, char** argv) {
369   gflags::ParseCommandLineFlags(&argc, &argv, true);
370
371   if (FLAGS_client) {
372     runClient(
373         FLAGS_host,
374         FLAGS_port,
375         FLAGS_numLoops,
376         FLAGS_zeroCopy,
377         FLAGS_bufferSize);
378   } else if (FLAGS_server) {
379     runServer(FLAGS_port, FLAGS_numLoops, FLAGS_zeroCopy, FLAGS_bufferSize);
380   } else {
381     runBenchmarks();
382   }
383 }