Change kDefaultZeroCopyThreshold to 0 to avoid a regression and avoid a failure while...
[folly.git] / folly / io / async / test / ZeroCopyBenchmark.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/Benchmark.h>
18
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBufQueue.h>
22 #include <folly/io/async/AsyncServerSocket.h>
23 #include <folly/io/async/AsyncSocket.h>
24 #include <folly/io/async/EventBase.h>
25
26 #include <folly/portability/GFlags.h>
27
28 using namespace folly;
29
30 static constexpr auto const kZeroCopyThreshold = 4096;
31
32 class TestAsyncSocket {
33  public:
34   explicit TestAsyncSocket(
35       folly::EventBase* evb,
36       int numLoops,
37       size_t bufferSize,
38       bool zeroCopy)
39       : evb_(evb),
40         numLoops_(numLoops),
41         sock_(new folly::AsyncSocket(evb)),
42         callback_(this),
43         client_(true) {
44     setBufferSize(bufferSize);
45     setZeroCopy(zeroCopy);
46   }
47
48   explicit TestAsyncSocket(
49       folly::EventBase* evb,
50       int fd,
51       int numLoops,
52       size_t bufferSize,
53       bool zeroCopy)
54       : evb_(evb),
55         numLoops_(numLoops),
56         sock_(new folly::AsyncSocket(evb, fd)),
57         callback_(this),
58         client_(false) {
59     setBufferSize(bufferSize);
60     setZeroCopy(zeroCopy);
61     // enable reads
62     if (sock_) {
63       sock_->setReadCB(&callback_);
64     }
65   }
66
67   ~TestAsyncSocket() {
68     clearBuffers();
69   }
70
71   void connect(const folly::SocketAddress& remote) {
72     if (sock_) {
73       sock_->connect(&callback_, remote);
74     }
75   }
76
77  private:
78   void setZeroCopy(bool enable) {
79     zeroCopy_ = enable;
80     if (sock_) {
81       sock_->setZeroCopy(zeroCopy_);
82       if (zeroCopy_) {
83         sock_->setZeroCopyWriteChainThreshold(kZeroCopyThreshold);
84       }
85     }
86   }
87
88   void setBufferSize(size_t bufferSize) {
89     clearBuffers();
90     bufferSize_ = bufferSize;
91
92     readBuffer_ = new char[bufferSize_];
93   }
94
95   class Callback : public folly::AsyncSocket::ReadCallback,
96                    public folly::AsyncSocket::ConnectCallback {
97    public:
98     explicit Callback(TestAsyncSocket* parent) : parent_(parent) {}
99
100     void connectSuccess() noexcept override {
101       parent_->sock_->setReadCB(this);
102       parent_->onConnected();
103     }
104
105     void connectErr(const folly::AsyncSocketException& ex) noexcept override {
106       LOG(ERROR) << "Connect error: " << ex.what();
107       parent_->onDataFinish(folly::exception_wrapper(ex));
108     }
109
110     void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
111       parent_->getReadBuffer(bufReturn, lenReturn);
112     }
113
114     void readDataAvailable(size_t len) noexcept override {
115       parent_->readDataAvailable(len);
116     }
117
118     void readEOF() noexcept override {
119       parent_->onDataFinish(folly::exception_wrapper());
120     }
121
122     void readErr(const folly::AsyncSocketException& ex) noexcept override {
123       parent_->onDataFinish(folly::exception_wrapper(ex));
124     }
125
126    private:
127     TestAsyncSocket* parent_{nullptr};
128   };
129
130   void clearBuffers() {
131     if (readBuffer_) {
132       delete[] readBuffer_;
133     }
134   }
135
136   void getReadBuffer(void** bufReturn, size_t* lenReturn) {
137     *bufReturn = readBuffer_ + readOffset_;
138     *lenReturn = bufferSize_ - readOffset_;
139   }
140
141   void readDataAvailable(size_t len) noexcept {
142     readOffset_ += len;
143     if (readOffset_ == bufferSize_) {
144       readOffset_ = 0;
145       onDataReady();
146     }
147   }
148
149   void onConnected() {
150     setZeroCopy(zeroCopy_);
151     writeBuffer();
152   }
153
154   void onDataReady() {
155     currLoop_++;
156     if (client_ && currLoop_ >= numLoops_) {
157       evb_->terminateLoopSoon();
158       return;
159     }
160     writeBuffer();
161   }
162
163   void onDataFinish(folly::exception_wrapper) {
164     if (client_) {
165       evb_->terminateLoopSoon();
166     }
167   }
168
169   bool writeBuffer() {
170     // use calloc to make sure the memory is touched
171     // if the memory is just malloc'd, running the zeroCopyOn
172     // and the zeroCopyOff back to back on a system that does not support
173     // zerocopy leads to the second test being much slower
174     writeBuffer_ =
175         folly::IOBuf::takeOwnership(::calloc(1, bufferSize_), bufferSize_);
176
177     if (sock_ && writeBuffer_) {
178       sock_->writeChain(
179           nullptr,
180           std::move(writeBuffer_),
181           zeroCopy_ ? WriteFlags::WRITE_MSG_ZEROCOPY : WriteFlags::NONE);
182     }
183
184     return true;
185   }
186
187   folly::EventBase* evb_;
188   int numLoops_{0};
189   int currLoop_{0};
190   bool zeroCopy_{false};
191
192   folly::AsyncSocket::UniquePtr sock_;
193   Callback callback_;
194
195   size_t bufferSize_{0};
196   size_t readOffset_{0};
197   char* readBuffer_{nullptr};
198   std::unique_ptr<folly::IOBuf> writeBuffer_;
199
200   bool client_;
201 };
202
203 class TestServer : public folly::AsyncServerSocket::AcceptCallback {
204  public:
205   explicit TestServer(
206       folly::EventBase* evb,
207       int numLoops,
208       size_t bufferSize,
209       bool zeroCopy)
210       : evb_(evb),
211         numLoops_(numLoops),
212         bufferSize_(bufferSize),
213         zeroCopy_(zeroCopy) {}
214
215   void addCallbackToServerSocket(folly::AsyncServerSocket& sock) {
216     sock.addAcceptCallback(this, evb_);
217   }
218
219   void connectionAccepted(
220       int fd,
221       const folly::SocketAddress& /* unused */) noexcept override {
222     auto client = std::make_shared<TestAsyncSocket>(
223         evb_, fd, numLoops_, bufferSize_, zeroCopy_);
224     clients_[client.get()] = client;
225   }
226
227   void acceptError(const std::exception&) noexcept override {}
228
229  private:
230   folly::EventBase* evb_;
231   int numLoops_;
232   size_t bufferSize_;
233   bool zeroCopy_;
234   std::unique_ptr<TestAsyncSocket> client_;
235   std::unordered_map<TestAsyncSocket*, std::shared_ptr<TestAsyncSocket>>
236       clients_;
237 };
238
239 class Test {
240  public:
241   explicit Test(int numLoops, bool zeroCopy, size_t bufferSize)
242       : numLoops_(numLoops),
243         zeroCopy_(zeroCopy),
244         bufferSize_(bufferSize),
245         client_(new TestAsyncSocket(&evb_, numLoops_, bufferSize_, zeroCopy)),
246         listenSock_(new folly::AsyncServerSocket(&evb_)),
247         server_(&evb_, numLoops_, bufferSize_, zeroCopy) {
248     if (listenSock_) {
249       server_.addCallbackToServerSocket(*listenSock_);
250     }
251   }
252
253   void run() {
254     evb_.runInEventBaseThread([this]() {
255
256       if (listenSock_) {
257         listenSock_->bind(0);
258         listenSock_->setZeroCopy(zeroCopy_);
259         listenSock_->listen(10);
260         listenSock_->startAccepting();
261
262         connectOne();
263       }
264     });
265
266     evb_.loopForever();
267   }
268
269  private:
270   void connectOne() {
271     SocketAddress addr = listenSock_->getAddress();
272     client_->connect(addr);
273   }
274
275   int numLoops_;
276   bool zeroCopy_;
277   size_t bufferSize_;
278
279   EventBase evb_;
280   std::unique_ptr<TestAsyncSocket> client_;
281   folly::AsyncServerSocket::UniquePtr listenSock_;
282   TestServer server_;
283 };
284
285 void runClient(
286     const std::string& host,
287     uint16_t port,
288     int numLoops,
289     bool zeroCopy,
290     size_t bufferSize) {
291   LOG(INFO) << "Running client. host = " << host << " port = " << port
292             << " numLoops = " << numLoops << " zeroCopy = " << zeroCopy
293             << " bufferSize = " << bufferSize;
294
295   EventBase evb;
296   std::unique_ptr<TestAsyncSocket> client(
297       new TestAsyncSocket(&evb, numLoops, bufferSize, zeroCopy));
298   SocketAddress addr(host, port);
299   evb.runInEventBaseThread([&]() { client->connect(addr); });
300
301   evb.loopForever();
302 }
303
304 void runServer(uint16_t port, int numLoops, bool zeroCopy, size_t bufferSize) {
305   LOG(INFO) << "Running server. port = " << port << " numLoops = " << numLoops
306             << " zeroCopy = " << zeroCopy << " bufferSize = " << bufferSize;
307
308   EventBase evb;
309   folly::AsyncServerSocket::UniquePtr listenSock(
310       new folly::AsyncServerSocket(&evb));
311   TestServer server(&evb, numLoops, bufferSize, zeroCopy);
312
313   server.addCallbackToServerSocket(*listenSock);
314
315   evb.runInEventBaseThread([&]() {
316     listenSock->bind(port);
317     listenSock->setZeroCopy(zeroCopy);
318     listenSock->listen(10);
319     listenSock->startAccepting();
320   });
321
322   evb.loopForever();
323 }
324
325 static auto constexpr kMaxLoops = 200000;
326
327 void zeroCopyOn(unsigned /* unused */, size_t bufferSize) {
328   Test test(kMaxLoops, true, bufferSize);
329   test.run();
330 }
331
332 void zeroCopyOff(unsigned /* unused */, size_t bufferSize) {
333   Test test(kMaxLoops, false, bufferSize);
334   test.run();
335 }
336
337 BENCHMARK_PARAM(zeroCopyOn, 4096);
338 BENCHMARK_PARAM(zeroCopyOff, 4096);
339 BENCHMARK_DRAW_LINE()
340 BENCHMARK_PARAM(zeroCopyOn, 8192);
341 BENCHMARK_PARAM(zeroCopyOff, 8192);
342 BENCHMARK_DRAW_LINE()
343 BENCHMARK_PARAM(zeroCopyOn, 16384);
344 BENCHMARK_PARAM(zeroCopyOff, 16384);
345 BENCHMARK_DRAW_LINE()
346 BENCHMARK_PARAM(zeroCopyOn, 32768);
347 BENCHMARK_PARAM(zeroCopyOff, 32768);
348 BENCHMARK_DRAW_LINE()
349 BENCHMARK_PARAM(zeroCopyOn, 65536);
350 BENCHMARK_PARAM(zeroCopyOff, 65536);
351 BENCHMARK_DRAW_LINE()
352 BENCHMARK_PARAM(zeroCopyOn, 131072);
353 BENCHMARK_PARAM(zeroCopyOff, 131072);
354 BENCHMARK_DRAW_LINE()
355 BENCHMARK_PARAM(zeroCopyOn, 262144);
356 BENCHMARK_PARAM(zeroCopyOff, 262144);
357 BENCHMARK_DRAW_LINE()
358 BENCHMARK_PARAM(zeroCopyOn, 524288);
359 BENCHMARK_PARAM(zeroCopyOff, 524288);
360 BENCHMARK_DRAW_LINE()
361 BENCHMARK_PARAM(zeroCopyOn, 1048576);
362 BENCHMARK_PARAM(zeroCopyOff, 1048576);
363 BENCHMARK_DRAW_LINE()
364
365 DEFINE_bool(client, false, "client mode");
366 DEFINE_bool(server, false, "server mode");
367 DEFINE_bool(zeroCopy, false, "use zerocopy");
368 DEFINE_int32(numLoops, kMaxLoops, "number of loops");
369 DEFINE_int32(bufferSize, 524288, "buffer size");
370 DEFINE_int32(port, 33130, "port");
371 DEFINE_string(host, "::1", "host");
372
373 int main(int argc, char** argv) {
374   gflags::ParseCommandLineFlags(&argc, &argv, true);
375
376   if (FLAGS_client) {
377     runClient(
378         FLAGS_host,
379         FLAGS_port,
380         FLAGS_numLoops,
381         FLAGS_zeroCopy,
382         FLAGS_bufferSize);
383   } else if (FLAGS_server) {
384     runServer(FLAGS_port, FLAGS_numLoops, FLAGS_zeroCopy, FLAGS_bufferSize);
385   } else {
386     runBenchmarks();
387   }
388 }