Add SO_ZEROCOPY support
[folly.git] / folly / io / async / test / ZeroCopyBenchmark.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/Benchmark.h>
18
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBufQueue.h>
22 #include <folly/io/async/AsyncServerSocket.h>
23 #include <folly/io/async/AsyncSocket.h>
24 #include <folly/io/async/EventBase.h>
25
26 #include <folly/portability/GFlags.h>
27
28 using namespace folly;
29
30 class TestAsyncSocket {
31  public:
32   explicit TestAsyncSocket(
33       folly::EventBase* evb,
34       int numLoops,
35       size_t bufferSize,
36       bool zeroCopy)
37       : evb_(evb),
38         numLoops_(numLoops),
39         sock_(new folly::AsyncSocket(evb)),
40         callback_(this),
41         client_(true) {
42     setBufferSize(bufferSize);
43     setZeroCopy(zeroCopy);
44   }
45
46   explicit TestAsyncSocket(
47       folly::EventBase* evb,
48       int fd,
49       int numLoops,
50       size_t bufferSize,
51       bool zeroCopy)
52       : evb_(evb),
53         numLoops_(numLoops),
54         sock_(new folly::AsyncSocket(evb, fd)),
55         callback_(this),
56         client_(false) {
57     setBufferSize(bufferSize);
58     setZeroCopy(zeroCopy);
59     // enable reads
60     if (sock_) {
61       sock_->setReadCB(&callback_);
62     }
63   }
64
65   ~TestAsyncSocket() {
66     clearBuffers();
67   }
68
69   void connect(const folly::SocketAddress& remote) {
70     if (sock_) {
71       sock_->connect(&callback_, remote);
72     }
73   }
74
75  private:
76   void setZeroCopy(bool enable) {
77     zeroCopy_ = enable;
78     if (sock_) {
79       sock_->setZeroCopy(zeroCopy_);
80     }
81   }
82
83   void setBufferSize(size_t bufferSize) {
84     clearBuffers();
85     bufferSize_ = bufferSize;
86
87     readBuffer_ = new char[bufferSize_];
88   }
89
90   class Callback : public folly::AsyncSocket::ReadCallback,
91                    public folly::AsyncSocket::ConnectCallback {
92    public:
93     explicit Callback(TestAsyncSocket* parent) : parent_(parent) {}
94
95     void connectSuccess() noexcept override {
96       parent_->sock_->setReadCB(this);
97       parent_->onConnected();
98     }
99
100     void connectErr(const folly::AsyncSocketException& ex) noexcept override {
101       LOG(ERROR) << "Connect error: " << ex.what();
102       parent_->onDataFinish(folly::exception_wrapper(ex));
103     }
104
105     void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
106       parent_->getReadBuffer(bufReturn, lenReturn);
107     }
108
109     void readDataAvailable(size_t len) noexcept override {
110       parent_->readDataAvailable(len);
111     }
112
113     void readEOF() noexcept override {
114       parent_->onDataFinish(folly::exception_wrapper());
115     }
116
117     void readErr(const folly::AsyncSocketException& ex) noexcept override {
118       parent_->onDataFinish(folly::exception_wrapper(ex));
119     }
120
121    private:
122     TestAsyncSocket* parent_{nullptr};
123   };
124
125   void clearBuffers() {
126     if (readBuffer_) {
127       delete[] readBuffer_;
128     }
129   }
130
131   void getReadBuffer(void** bufReturn, size_t* lenReturn) {
132     *bufReturn = readBuffer_ + readOffset_;
133     *lenReturn = bufferSize_ - readOffset_;
134   }
135
136   void readDataAvailable(size_t len) noexcept {
137     readOffset_ += len;
138     if (readOffset_ == bufferSize_) {
139       readOffset_ = 0;
140       onDataReady();
141     }
142   }
143
144   void onConnected() {
145     setZeroCopy(zeroCopy_);
146     writeBuffer();
147   }
148
149   void onDataReady() {
150     currLoop_++;
151     if (client_ && currLoop_ >= numLoops_) {
152       evb_->terminateLoopSoon();
153       return;
154     }
155     writeBuffer();
156   }
157
158   void onDataFinish(folly::exception_wrapper) {
159     if (client_) {
160       evb_->terminateLoopSoon();
161     }
162   }
163
164   bool writeBuffer() {
165     writeBuffer_ =
166         folly::IOBuf::takeOwnership(::malloc(bufferSize_), bufferSize_);
167
168     if (sock_ && writeBuffer_) {
169       sock_->writeChain(
170           nullptr,
171           std::move(writeBuffer_),
172           zeroCopy_ ? WriteFlags::WRITE_MSG_ZEROCOPY : WriteFlags::NONE);
173     }
174
175     return true;
176   }
177
178   folly::EventBase* evb_;
179   int numLoops_{0};
180   int currLoop_{0};
181   bool zeroCopy_{false};
182
183   folly::AsyncSocket::UniquePtr sock_;
184   Callback callback_;
185
186   size_t bufferSize_{0};
187   size_t readOffset_{0};
188   char* readBuffer_{nullptr};
189   std::unique_ptr<folly::IOBuf> writeBuffer_;
190
191   bool client_;
192 };
193
194 class TestServer : public folly::AsyncServerSocket::AcceptCallback {
195  public:
196   explicit TestServer(
197       folly::EventBase* evb,
198       int numLoops,
199       size_t bufferSize,
200       bool zeroCopy)
201       : evb_(evb),
202         numLoops_(numLoops),
203         bufferSize_(bufferSize),
204         zeroCopy_(zeroCopy) {}
205
206   void addCallbackToServerSocket(folly::AsyncServerSocket& sock) {
207     sock.addAcceptCallback(this, evb_);
208   }
209
210   void connectionAccepted(
211       int fd,
212       const folly::SocketAddress& /* unused */) noexcept override {
213     auto client = std::make_shared<TestAsyncSocket>(
214         evb_, fd, numLoops_, bufferSize_, zeroCopy_);
215     clients_[client.get()] = client;
216   }
217
218   void acceptError(const std::exception&) noexcept override {}
219
220  private:
221   folly::EventBase* evb_;
222   int numLoops_;
223   size_t bufferSize_;
224   bool zeroCopy_;
225   std::unique_ptr<TestAsyncSocket> client_;
226   std::unordered_map<TestAsyncSocket*, std::shared_ptr<TestAsyncSocket>>
227       clients_;
228 };
229
230 class Test {
231  public:
232   explicit Test(int numLoops, bool zeroCopy, size_t bufferSize)
233       : numLoops_(numLoops),
234         zeroCopy_(zeroCopy),
235         bufferSize_(bufferSize),
236         client_(new TestAsyncSocket(&evb_, numLoops_, bufferSize_, zeroCopy)),
237         listenSock_(new folly::AsyncServerSocket(&evb_)),
238         server_(&evb_, numLoops_, bufferSize_, zeroCopy) {
239     if (listenSock_) {
240       server_.addCallbackToServerSocket(*listenSock_);
241     }
242   }
243
244   void run() {
245     evb_.runInEventBaseThread([this]() {
246
247       if (listenSock_) {
248         listenSock_->bind(0);
249         listenSock_->setZeroCopy(zeroCopy_);
250         listenSock_->listen(10);
251         listenSock_->startAccepting();
252
253         connectOne();
254       }
255     });
256
257     evb_.loopForever();
258   }
259
260  private:
261   void connectOne() {
262     SocketAddress addr = listenSock_->getAddress();
263     client_->connect(addr);
264   }
265
266   int numLoops_;
267   bool zeroCopy_;
268   size_t bufferSize_;
269
270   EventBase evb_;
271   std::unique_ptr<TestAsyncSocket> client_;
272   folly::AsyncServerSocket::UniquePtr listenSock_;
273   TestServer server_;
274 };
275
276 void runClient(
277     const std::string& host,
278     uint16_t port,
279     int numLoops,
280     bool zeroCopy,
281     size_t bufferSize) {
282   LOG(INFO) << "Running client. host = " << host << " port = " << port
283             << " numLoops = " << numLoops << " zeroCopy = " << zeroCopy
284             << " bufferSize = " << bufferSize;
285
286   EventBase evb;
287   std::unique_ptr<TestAsyncSocket> client(
288       new TestAsyncSocket(&evb, numLoops, bufferSize, zeroCopy));
289   SocketAddress addr(host, port);
290   evb.runInEventBaseThread([&]() { client->connect(addr); });
291
292   evb.loopForever();
293 }
294
295 void runServer(uint16_t port, int numLoops, bool zeroCopy, size_t bufferSize) {
296   LOG(INFO) << "Running server. port = " << port << " numLoops = " << numLoops
297             << " zeroCopy = " << zeroCopy << " bufferSize = " << bufferSize;
298
299   EventBase evb;
300   folly::AsyncServerSocket::UniquePtr listenSock(
301       new folly::AsyncServerSocket(&evb));
302   TestServer server(&evb, numLoops, bufferSize, zeroCopy);
303
304   server.addCallbackToServerSocket(*listenSock);
305
306   evb.runInEventBaseThread([&]() {
307     listenSock->bind(port);
308     listenSock->setZeroCopy(zeroCopy);
309     listenSock->listen(10);
310     listenSock->startAccepting();
311   });
312
313   evb.loopForever();
314 }
315
316 static auto constexpr kMaxLoops = 200000;
317
318 void zeroCopyOn(unsigned /* unused */, size_t bufferSize) {
319   Test test(kMaxLoops, true, bufferSize);
320   test.run();
321 }
322
323 void zeroCopyOff(unsigned /* unused */, size_t bufferSize) {
324   Test test(kMaxLoops, false, bufferSize);
325   test.run();
326 }
327
328 BENCHMARK_PARAM(zeroCopyOn, 4096);
329 BENCHMARK_PARAM(zeroCopyOff, 4096);
330 BENCHMARK_DRAW_LINE()
331 BENCHMARK_PARAM(zeroCopyOn, 8192);
332 BENCHMARK_PARAM(zeroCopyOff, 8192);
333 BENCHMARK_DRAW_LINE()
334 BENCHMARK_PARAM(zeroCopyOn, 16384);
335 BENCHMARK_PARAM(zeroCopyOff, 16384);
336 BENCHMARK_DRAW_LINE()
337 BENCHMARK_PARAM(zeroCopyOn, 32768);
338 BENCHMARK_PARAM(zeroCopyOff, 32768);
339 BENCHMARK_DRAW_LINE()
340 BENCHMARK_PARAM(zeroCopyOn, 65536);
341 BENCHMARK_PARAM(zeroCopyOff, 65536);
342 BENCHMARK_DRAW_LINE()
343 BENCHMARK_PARAM(zeroCopyOn, 131072);
344 BENCHMARK_PARAM(zeroCopyOff, 131072);
345 BENCHMARK_DRAW_LINE()
346 BENCHMARK_PARAM(zeroCopyOn, 262144);
347 BENCHMARK_PARAM(zeroCopyOff, 262144);
348 BENCHMARK_DRAW_LINE()
349 BENCHMARK_PARAM(zeroCopyOn, 524288);
350 BENCHMARK_PARAM(zeroCopyOff, 524288);
351 BENCHMARK_DRAW_LINE()
352 BENCHMARK_PARAM(zeroCopyOn, 1048576);
353 BENCHMARK_PARAM(zeroCopyOff, 1048576);
354 BENCHMARK_DRAW_LINE()
355
356 DEFINE_bool(client, false, "client mode");
357 DEFINE_bool(server, false, "server mode");
358 DEFINE_bool(zeroCopy, false, "use zerocopy");
359 DEFINE_int32(numLoops, kMaxLoops, "number of loops");
360 DEFINE_int32(bufferSize, 524288, "buffer size");
361 DEFINE_int32(port, 33130, "port");
362 DEFINE_string(host, "::1", "host");
363
364 int main(int argc, char** argv) {
365   gflags::ParseCommandLineFlags(&argc, &argv, true);
366
367   if (FLAGS_client) {
368     runClient(
369         FLAGS_host,
370         FLAGS_port,
371         FLAGS_numLoops,
372         FLAGS_zeroCopy,
373         FLAGS_bufferSize);
374   } else if (FLAGS_server) {
375     runServer(FLAGS_port, FLAGS_numLoops, FLAGS_zeroCopy, FLAGS_bufferSize);
376   } else {
377     runBenchmarks();
378   }
379 }