927d74f7d3b35dfac42ebe17bff4745078e0eec1
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/RWSpinLock.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/Random.h>
23
24 #include <folly/io/IOBuf.h>
25 #include <folly/io/async/test/AsyncSocketTest.h>
26 #include <folly/io/async/test/Util.h>
27 #include <folly/portability/Unistd.h>
28 #include <folly/test/SocketAddressTestHelper.h>
29
30 #include <gtest/gtest.h>
31 #include <boost/scoped_array.hpp>
32 #include <iostream>
33 #include <fcntl.h>
34 #include <poll.h>
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <netinet/tcp.h>
38 #include <thread>
39
40 using namespace boost;
41
42 using std::string;
43 using std::vector;
44 using std::min;
45 using std::cerr;
46 using std::endl;
47 using std::unique_ptr;
48 using std::chrono::milliseconds;
49 using boost::scoped_array;
50
51 using namespace folly;
52
53 class DelayedWrite: public AsyncTimeout {
54  public:
55   DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
56       unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
57       bool cork, bool lastWrite = false):
58     AsyncTimeout(socket->getEventBase()),
59     socket_(socket),
60     bufs_(std::move(bufs)),
61     wcb_(wcb),
62     cork_(cork),
63     lastWrite_(lastWrite) {}
64
65  private:
66   void timeoutExpired() noexcept override {
67     WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
68     socket_->writeChain(wcb_, std::move(bufs_), flags);
69     if (lastWrite_) {
70       socket_->shutdownWrite();
71     }
72   }
73
74   std::shared_ptr<AsyncSocket> socket_;
75   unique_ptr<IOBuf> bufs_;
76   AsyncTransportWrapper::WriteCallback* wcb_;
77   bool cork_;
78   bool lastWrite_;
79 };
80
81 ///////////////////////////////////////////////////////////////////////////
82 // connect() tests
83 ///////////////////////////////////////////////////////////////////////////
84
85 /**
86  * Test connecting to a server
87  */
88 TEST(AsyncSocketTest, Connect) {
89   // Start listening on a local port
90   TestServer server;
91
92   // Connect using a AsyncSocket
93   EventBase evb;
94   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
95   ConnCallback cb;
96   socket->connect(&cb, server.getAddress(), 30);
97
98   evb.loop();
99
100   CHECK_EQ(cb.state, STATE_SUCCEEDED);
101   EXPECT_LE(0, socket->getConnectTime().count());
102   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
103 }
104
105 /**
106  * Test connecting to a server that isn't listening
107  */
108 TEST(AsyncSocketTest, ConnectRefused) {
109   EventBase evb;
110
111   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
112
113   // Hopefully nothing is actually listening on this address
114   folly::SocketAddress addr("127.0.0.1", 65535);
115   ConnCallback cb;
116   socket->connect(&cb, addr, 30);
117
118   evb.loop();
119
120   CHECK_EQ(cb.state, STATE_FAILED);
121   CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
122   EXPECT_LE(0, socket->getConnectTime().count());
123   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
124 }
125
126 /**
127  * Test connection timeout
128  */
129 TEST(AsyncSocketTest, ConnectTimeout) {
130   EventBase evb;
131
132   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
133
134   // Try connecting to server that won't respond.
135   //
136   // This depends somewhat on the network where this test is run.
137   // Hopefully this IP will be routable but unresponsive.
138   // (Alternatively, we could try listening on a local raw socket, but that
139   // normally requires root privileges.)
140   auto host =
141       SocketAddressTestHelper::isIPv6Enabled() ?
142       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
143       SocketAddressTestHelper::isIPv4Enabled() ?
144       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
145       nullptr;
146   SocketAddress addr(host, 65535);
147   ConnCallback cb;
148   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
149
150   evb.loop();
151
152   CHECK_EQ(cb.state, STATE_FAILED);
153   CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
154
155   // Verify that we can still get the peer address after a timeout.
156   // Use case is if the client was created from a client pool, and we want
157   // to log which peer failed.
158   folly::SocketAddress peer;
159   socket->getPeerAddress(&peer);
160   CHECK_EQ(peer, addr);
161   EXPECT_LE(0, socket->getConnectTime().count());
162   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
163 }
164
165 /**
166  * Test writing immediately after connecting, without waiting for connect
167  * to finish.
168  */
169 TEST(AsyncSocketTest, ConnectAndWrite) {
170   TestServer server;
171
172   // connect()
173   EventBase evb;
174   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
175   ConnCallback ccb;
176   socket->connect(&ccb, server.getAddress(), 30);
177
178   // write()
179   char buf[128];
180   memset(buf, 'a', sizeof(buf));
181   WriteCallback wcb;
182   socket->write(&wcb, buf, sizeof(buf));
183
184   // Loop.  We don't bother accepting on the server socket yet.
185   // The kernel should be able to buffer the write request so it can succeed.
186   evb.loop();
187
188   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
189   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
190
191   // Make sure the server got a connection and received the data
192   socket->close();
193   server.verifyConnection(buf, sizeof(buf));
194
195   ASSERT_TRUE(socket->isClosedBySelf());
196   ASSERT_FALSE(socket->isClosedByPeer());
197   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
198 }
199
200 /**
201  * Test connecting using a nullptr connect callback.
202  */
203 TEST(AsyncSocketTest, ConnectNullCallback) {
204   TestServer server;
205
206   // connect()
207   EventBase evb;
208   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
209   socket->connect(nullptr, server.getAddress(), 30);
210
211   // write some data, just so we have some way of verifing
212   // that the socket works correctly after connecting
213   char buf[128];
214   memset(buf, 'a', sizeof(buf));
215   WriteCallback wcb;
216   socket->write(&wcb, buf, sizeof(buf));
217
218   evb.loop();
219
220   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
221
222   // Make sure the server got a connection and received the data
223   socket->close();
224   server.verifyConnection(buf, sizeof(buf));
225
226   ASSERT_TRUE(socket->isClosedBySelf());
227   ASSERT_FALSE(socket->isClosedByPeer());
228 }
229
230 /**
231  * Test calling both write() and close() immediately after connecting, without
232  * waiting for connect to finish.
233  *
234  * This exercises the STATE_CONNECTING_CLOSING code.
235  */
236 TEST(AsyncSocketTest, ConnectWriteAndClose) {
237   TestServer server;
238
239   // connect()
240   EventBase evb;
241   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
242   ConnCallback ccb;
243   socket->connect(&ccb, server.getAddress(), 30);
244
245   // write()
246   char buf[128];
247   memset(buf, 'a', sizeof(buf));
248   WriteCallback wcb;
249   socket->write(&wcb, buf, sizeof(buf));
250
251   // close()
252   socket->close();
253
254   // Loop.  We don't bother accepting on the server socket yet.
255   // The kernel should be able to buffer the write request so it can succeed.
256   evb.loop();
257
258   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
259   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
260
261   // Make sure the server got a connection and received the data
262   server.verifyConnection(buf, sizeof(buf));
263
264   ASSERT_TRUE(socket->isClosedBySelf());
265   ASSERT_FALSE(socket->isClosedByPeer());
266 }
267
268 /**
269  * Test calling close() immediately after connect()
270  */
271 TEST(AsyncSocketTest, ConnectAndClose) {
272   TestServer server;
273
274   // Connect using a AsyncSocket
275   EventBase evb;
276   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
277   ConnCallback ccb;
278   socket->connect(&ccb, server.getAddress(), 30);
279
280   // Hopefully the connect didn't succeed immediately.
281   // If it did, we can't exercise the close-while-connecting code path.
282   if (ccb.state == STATE_SUCCEEDED) {
283     LOG(INFO) << "connect() succeeded immediately; aborting test "
284                        "of close-during-connect behavior";
285     return;
286   }
287
288   socket->close();
289
290   // Loop, although there shouldn't be anything to do.
291   evb.loop();
292
293   // Make sure the connection was aborted
294   CHECK_EQ(ccb.state, STATE_FAILED);
295
296   ASSERT_TRUE(socket->isClosedBySelf());
297   ASSERT_FALSE(socket->isClosedByPeer());
298 }
299
300 /**
301  * Test calling closeNow() immediately after connect()
302  *
303  * This should be identical to the normal close behavior.
304  */
305 TEST(AsyncSocketTest, ConnectAndCloseNow) {
306   TestServer server;
307
308   // Connect using a AsyncSocket
309   EventBase evb;
310   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
311   ConnCallback ccb;
312   socket->connect(&ccb, server.getAddress(), 30);
313
314   // Hopefully the connect didn't succeed immediately.
315   // If it did, we can't exercise the close-while-connecting code path.
316   if (ccb.state == STATE_SUCCEEDED) {
317     LOG(INFO) << "connect() succeeded immediately; aborting test "
318                        "of closeNow()-during-connect behavior";
319     return;
320   }
321
322   socket->closeNow();
323
324   // Loop, although there shouldn't be anything to do.
325   evb.loop();
326
327   // Make sure the connection was aborted
328   CHECK_EQ(ccb.state, STATE_FAILED);
329
330   ASSERT_TRUE(socket->isClosedBySelf());
331   ASSERT_FALSE(socket->isClosedByPeer());
332 }
333
334 /**
335  * Test calling both write() and closeNow() immediately after connecting,
336  * without waiting for connect to finish.
337  *
338  * This should abort the pending write.
339  */
340 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
341   TestServer server;
342
343   // connect()
344   EventBase evb;
345   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
346   ConnCallback ccb;
347   socket->connect(&ccb, server.getAddress(), 30);
348
349   // Hopefully the connect didn't succeed immediately.
350   // If it did, we can't exercise the close-while-connecting code path.
351   if (ccb.state == STATE_SUCCEEDED) {
352     LOG(INFO) << "connect() succeeded immediately; aborting test "
353                        "of write-during-connect behavior";
354     return;
355   }
356
357   // write()
358   char buf[128];
359   memset(buf, 'a', sizeof(buf));
360   WriteCallback wcb;
361   socket->write(&wcb, buf, sizeof(buf));
362
363   // close()
364   socket->closeNow();
365
366   // Loop, although there shouldn't be anything to do.
367   evb.loop();
368
369   CHECK_EQ(ccb.state, STATE_FAILED);
370   CHECK_EQ(wcb.state, STATE_FAILED);
371
372   ASSERT_TRUE(socket->isClosedBySelf());
373   ASSERT_FALSE(socket->isClosedByPeer());
374 }
375
376 /**
377  * Test installing a read callback immediately, before connect() finishes.
378  */
379 TEST(AsyncSocketTest, ConnectAndRead) {
380   TestServer server;
381
382   // connect()
383   EventBase evb;
384   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
385   ConnCallback ccb;
386   socket->connect(&ccb, server.getAddress(), 30);
387
388   ReadCallback rcb;
389   socket->setReadCB(&rcb);
390
391   // Even though we haven't looped yet, we should be able to accept
392   // the connection and send data to it.
393   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
394   uint8_t buf[128];
395   memset(buf, 'a', sizeof(buf));
396   acceptedSocket->write(buf, sizeof(buf));
397   acceptedSocket->flush();
398   acceptedSocket->close();
399
400   // Loop, although there shouldn't be anything to do.
401   evb.loop();
402
403   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
404   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
405   CHECK_EQ(rcb.buffers.size(), 1);
406   CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
407   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
408
409   ASSERT_FALSE(socket->isClosedBySelf());
410   ASSERT_FALSE(socket->isClosedByPeer());
411 }
412
413 /**
414  * Test installing a read callback and then closing immediately before the
415  * connect attempt finishes.
416  */
417 TEST(AsyncSocketTest, ConnectReadAndClose) {
418   TestServer server;
419
420   // connect()
421   EventBase evb;
422   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
423   ConnCallback ccb;
424   socket->connect(&ccb, server.getAddress(), 30);
425
426   // Hopefully the connect didn't succeed immediately.
427   // If it did, we can't exercise the close-while-connecting code path.
428   if (ccb.state == STATE_SUCCEEDED) {
429     LOG(INFO) << "connect() succeeded immediately; aborting test "
430                        "of read-during-connect behavior";
431     return;
432   }
433
434   ReadCallback rcb;
435   socket->setReadCB(&rcb);
436
437   // close()
438   socket->close();
439
440   // Loop, although there shouldn't be anything to do.
441   evb.loop();
442
443   CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
444   CHECK_EQ(rcb.buffers.size(), 0);
445   CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
446
447   ASSERT_TRUE(socket->isClosedBySelf());
448   ASSERT_FALSE(socket->isClosedByPeer());
449 }
450
451 /**
452  * Test both writing and installing a read callback immediately,
453  * before connect() finishes.
454  */
455 TEST(AsyncSocketTest, ConnectWriteAndRead) {
456   TestServer server;
457
458   // connect()
459   EventBase evb;
460   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
461   ConnCallback ccb;
462   socket->connect(&ccb, server.getAddress(), 30);
463
464   // write()
465   char buf1[128];
466   memset(buf1, 'a', sizeof(buf1));
467   WriteCallback wcb;
468   socket->write(&wcb, buf1, sizeof(buf1));
469
470   // set a read callback
471   ReadCallback rcb;
472   socket->setReadCB(&rcb);
473
474   // Even though we haven't looped yet, we should be able to accept
475   // the connection and send data to it.
476   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
477   uint8_t buf2[128];
478   memset(buf2, 'b', sizeof(buf2));
479   acceptedSocket->write(buf2, sizeof(buf2));
480   acceptedSocket->flush();
481
482   // shut down the write half of acceptedSocket, so that the AsyncSocket
483   // will stop reading and we can break out of the event loop.
484   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
485
486   // Loop
487   evb.loop();
488
489   // Make sure the connect succeeded
490   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
491
492   // Make sure the AsyncSocket read the data written by the accepted socket
493   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
494   CHECK_EQ(rcb.buffers.size(), 1);
495   CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
496   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
497
498   // Close the AsyncSocket so we'll see EOF on acceptedSocket
499   socket->close();
500
501   // Make sure the accepted socket saw the data written by the AsyncSocket
502   uint8_t readbuf[sizeof(buf1)];
503   acceptedSocket->readAll(readbuf, sizeof(readbuf));
504   CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
505   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
506   CHECK_EQ(bytesRead, 0);
507
508   ASSERT_FALSE(socket->isClosedBySelf());
509   ASSERT_TRUE(socket->isClosedByPeer());
510 }
511
512 /**
513  * Test writing to the socket then shutting down writes before the connect
514  * attempt finishes.
515  */
516 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
517   TestServer server;
518
519   // connect()
520   EventBase evb;
521   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
522   ConnCallback ccb;
523   socket->connect(&ccb, server.getAddress(), 30);
524
525   // Hopefully the connect didn't succeed immediately.
526   // If it did, we can't exercise the write-while-connecting code path.
527   if (ccb.state == STATE_SUCCEEDED) {
528     LOG(INFO) << "connect() succeeded immediately; skipping test";
529     return;
530   }
531
532   // Ask to write some data
533   char wbuf[128];
534   memset(wbuf, 'a', sizeof(wbuf));
535   WriteCallback wcb;
536   socket->write(&wcb, wbuf, sizeof(wbuf));
537   socket->shutdownWrite();
538
539   // Shutdown writes
540   socket->shutdownWrite();
541
542   // Even though we haven't looped yet, we should be able to accept
543   // the connection.
544   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
545
546   // Since the connection is still in progress, there should be no data to
547   // read yet.  Verify that the accepted socket is not readable.
548   struct pollfd fds[1];
549   fds[0].fd = acceptedSocket->getSocketFD();
550   fds[0].events = POLLIN;
551   fds[0].revents = 0;
552   int rc = poll(fds, 1, 0);
553   CHECK_EQ(rc, 0);
554
555   // Write data to the accepted socket
556   uint8_t acceptedWbuf[192];
557   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
558   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
559   acceptedSocket->flush();
560
561   // Loop
562   evb.loop();
563
564   // The loop should have completed the connection, written the queued data,
565   // and shutdown writes on the socket.
566   //
567   // Check that the connection was completed successfully and that the write
568   // callback succeeded.
569   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
570   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
571
572   // Check that we can read the data that was written to the socket, and that
573   // we see an EOF, since its socket was half-shutdown.
574   uint8_t readbuf[sizeof(wbuf)];
575   acceptedSocket->readAll(readbuf, sizeof(readbuf));
576   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
577   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
578   CHECK_EQ(bytesRead, 0);
579
580   // Close the accepted socket.  This will cause it to see EOF
581   // and uninstall the read callback when we loop next.
582   acceptedSocket->close();
583
584   // Install a read callback, then loop again.
585   ReadCallback rcb;
586   socket->setReadCB(&rcb);
587   evb.loop();
588
589   // This loop should have read the data and seen the EOF
590   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
591   CHECK_EQ(rcb.buffers.size(), 1);
592   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
593   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
594                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
595
596   ASSERT_FALSE(socket->isClosedBySelf());
597   ASSERT_FALSE(socket->isClosedByPeer());
598 }
599
600 /**
601  * Test reading, writing, and shutting down writes before the connect attempt
602  * finishes.
603  */
604 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
605   TestServer server;
606
607   // connect()
608   EventBase evb;
609   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
610   ConnCallback ccb;
611   socket->connect(&ccb, server.getAddress(), 30);
612
613   // Hopefully the connect didn't succeed immediately.
614   // If it did, we can't exercise the write-while-connecting code path.
615   if (ccb.state == STATE_SUCCEEDED) {
616     LOG(INFO) << "connect() succeeded immediately; skipping test";
617     return;
618   }
619
620   // Install a read callback
621   ReadCallback rcb;
622   socket->setReadCB(&rcb);
623
624   // Ask to write some data
625   char wbuf[128];
626   memset(wbuf, 'a', sizeof(wbuf));
627   WriteCallback wcb;
628   socket->write(&wcb, wbuf, sizeof(wbuf));
629
630   // Shutdown writes
631   socket->shutdownWrite();
632
633   // Even though we haven't looped yet, we should be able to accept
634   // the connection.
635   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
636
637   // Since the connection is still in progress, there should be no data to
638   // read yet.  Verify that the accepted socket is not readable.
639   struct pollfd fds[1];
640   fds[0].fd = acceptedSocket->getSocketFD();
641   fds[0].events = POLLIN;
642   fds[0].revents = 0;
643   int rc = poll(fds, 1, 0);
644   CHECK_EQ(rc, 0);
645
646   // Write data to the accepted socket
647   uint8_t acceptedWbuf[192];
648   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
649   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
650   acceptedSocket->flush();
651   // Shutdown writes to the accepted socket.  This will cause it to see EOF
652   // and uninstall the read callback.
653   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
654
655   // Loop
656   evb.loop();
657
658   // The loop should have completed the connection, written the queued data,
659   // shutdown writes on the socket, read the data we wrote to it, and see the
660   // EOF.
661   //
662   // Check that the connection was completed successfully and that the read
663   // and write callbacks were invoked as expected.
664   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
665   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
666   CHECK_EQ(rcb.buffers.size(), 1);
667   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
668   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
669                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
670   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
671
672   // Check that we can read the data that was written to the socket, and that
673   // we see an EOF, since its socket was half-shutdown.
674   uint8_t readbuf[sizeof(wbuf)];
675   acceptedSocket->readAll(readbuf, sizeof(readbuf));
676   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
677   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
678   CHECK_EQ(bytesRead, 0);
679
680   // Fully close both sockets
681   acceptedSocket->close();
682   socket->close();
683
684   ASSERT_FALSE(socket->isClosedBySelf());
685   ASSERT_TRUE(socket->isClosedByPeer());
686 }
687
688 /**
689  * Test reading, writing, and calling shutdownWriteNow() before the
690  * connect attempt finishes.
691  */
692 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
693   TestServer server;
694
695   // connect()
696   EventBase evb;
697   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
698   ConnCallback ccb;
699   socket->connect(&ccb, server.getAddress(), 30);
700
701   // Hopefully the connect didn't succeed immediately.
702   // If it did, we can't exercise the write-while-connecting code path.
703   if (ccb.state == STATE_SUCCEEDED) {
704     LOG(INFO) << "connect() succeeded immediately; skipping test";
705     return;
706   }
707
708   // Install a read callback
709   ReadCallback rcb;
710   socket->setReadCB(&rcb);
711
712   // Ask to write some data
713   char wbuf[128];
714   memset(wbuf, 'a', sizeof(wbuf));
715   WriteCallback wcb;
716   socket->write(&wcb, wbuf, sizeof(wbuf));
717
718   // Shutdown writes immediately.
719   // This should immediately discard the data that we just tried to write.
720   socket->shutdownWriteNow();
721
722   // Verify that writeError() was invoked on the write callback.
723   CHECK_EQ(wcb.state, STATE_FAILED);
724   CHECK_EQ(wcb.bytesWritten, 0);
725
726   // Even though we haven't looped yet, we should be able to accept
727   // the connection.
728   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
729
730   // Since the connection is still in progress, there should be no data to
731   // read yet.  Verify that the accepted socket is not readable.
732   struct pollfd fds[1];
733   fds[0].fd = acceptedSocket->getSocketFD();
734   fds[0].events = POLLIN;
735   fds[0].revents = 0;
736   int rc = poll(fds, 1, 0);
737   CHECK_EQ(rc, 0);
738
739   // Write data to the accepted socket
740   uint8_t acceptedWbuf[192];
741   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
742   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
743   acceptedSocket->flush();
744   // Shutdown writes to the accepted socket.  This will cause it to see EOF
745   // and uninstall the read callback.
746   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
747
748   // Loop
749   evb.loop();
750
751   // The loop should have completed the connection, written the queued data,
752   // shutdown writes on the socket, read the data we wrote to it, and see the
753   // EOF.
754   //
755   // Check that the connection was completed successfully and that the read
756   // callback was invoked as expected.
757   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
758   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
759   CHECK_EQ(rcb.buffers.size(), 1);
760   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
761   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
762                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
763
764   // Since we used shutdownWriteNow(), it should have discarded all pending
765   // write data.  Verify we see an immediate EOF when reading from the accepted
766   // socket.
767   uint8_t readbuf[sizeof(wbuf)];
768   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
769   CHECK_EQ(bytesRead, 0);
770
771   // Fully close both sockets
772   acceptedSocket->close();
773   socket->close();
774
775   ASSERT_FALSE(socket->isClosedBySelf());
776   ASSERT_TRUE(socket->isClosedByPeer());
777 }
778
779 // Helper function for use in testConnectOptWrite()
780 // Temporarily disable the read callback
781 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
782   // Uninstall the read callback
783   socket->setReadCB(nullptr);
784   // Schedule the read callback to be reinstalled after 1ms
785   socket->getEventBase()->runInLoop(
786       std::bind(&AsyncSocket::setReadCB, socket, rcb));
787 }
788
789 /**
790  * Test connect+write, then have the connect callback perform another write.
791  *
792  * This tests interaction of the optimistic writing after connect with
793  * additional write attempts that occur in the connect callback.
794  */
795 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
796   TestServer server;
797   EventBase evb;
798   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
799
800   // connect()
801   ConnCallback ccb;
802   socket->connect(&ccb, server.getAddress(), 30);
803
804   // Hopefully the connect didn't succeed immediately.
805   // If it did, we can't exercise the optimistic write code path.
806   if (ccb.state == STATE_SUCCEEDED) {
807     LOG(INFO) << "connect() succeeded immediately; aborting test "
808                        "of optimistic write behavior";
809     return;
810   }
811
812   // Tell the connect callback to perform a write when the connect succeeds
813   WriteCallback wcb2;
814   scoped_array<char> buf2(new char[size2]);
815   memset(buf2.get(), 'b', size2);
816   if (size2 > 0) {
817     ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
818     // Tell the second write callback to close the connection when it is done
819     wcb2.successCallback = [&] { socket->closeNow(); };
820   }
821
822   // Schedule one write() immediately, before the connect finishes
823   scoped_array<char> buf1(new char[size1]);
824   memset(buf1.get(), 'a', size1);
825   WriteCallback wcb1;
826   if (size1 > 0) {
827     socket->write(&wcb1, buf1.get(), size1);
828   }
829
830   if (close) {
831     // immediately perform a close, before connect() completes
832     socket->close();
833   }
834
835   // Start reading from the other endpoint after 10ms.
836   // If we're using large buffers, we have to read so that the writes don't
837   // block forever.
838   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
839   ReadCallback rcb;
840   rcb.dataAvailableCallback = std::bind(tmpDisableReads,
841                                         acceptedSocket.get(), &rcb);
842   socket->getEventBase()->tryRunAfterDelay(
843       std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
844       10);
845
846   // Loop.  We don't bother accepting on the server socket yet.
847   // The kernel should be able to buffer the write request so it can succeed.
848   evb.loop();
849
850   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
851   if (size1 > 0) {
852     CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
853   }
854   if (size2 > 0) {
855     CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
856   }
857
858   socket->close();
859
860   // Make sure the read callback received all of the data
861   size_t bytesRead = 0;
862   for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
863        it != rcb.buffers.end();
864        ++it) {
865     size_t start = bytesRead;
866     bytesRead += it->length;
867     size_t end = bytesRead;
868     if (start < size1) {
869       size_t cmpLen = min(size1, end) - start;
870       CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
871     }
872     if (end > size1 && end <= size1 + size2) {
873       size_t itOffset;
874       size_t buf2Offset;
875       size_t cmpLen;
876       if (start >= size1) {
877         itOffset = 0;
878         buf2Offset = start - size1;
879         cmpLen = end - start;
880       } else {
881         itOffset = size1 - start;
882         buf2Offset = 0;
883         cmpLen = end - size1;
884       }
885       CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
886                                cmpLen),
887                         0);
888     }
889   }
890   CHECK_EQ(bytesRead, size1 + size2);
891 }
892
893 TEST(AsyncSocketTest, ConnectCallbackWrite) {
894   // Test using small writes that should both succeed immediately
895   testConnectOptWrite(100, 200);
896
897   // Test using a large buffer in the connect callback, that should block
898   const size_t largeSize = 8*1024*1024;
899   testConnectOptWrite(100, largeSize);
900
901   // Test using a large initial write
902   testConnectOptWrite(largeSize, 100);
903
904   // Test using two large buffers
905   testConnectOptWrite(largeSize, largeSize);
906
907   // Test a small write in the connect callback,
908   // but no immediate write before connect completes
909   testConnectOptWrite(0, 64);
910
911   // Test a large write in the connect callback,
912   // but no immediate write before connect completes
913   testConnectOptWrite(0, largeSize);
914
915   // Test connect, a small write, then immediately call close() before connect
916   // completes
917   testConnectOptWrite(211, 0, true);
918
919   // Test connect, a large immediate write (that will block), then immediately
920   // call close() before connect completes
921   testConnectOptWrite(largeSize, 0, true);
922 }
923
924 ///////////////////////////////////////////////////////////////////////////
925 // write() related tests
926 ///////////////////////////////////////////////////////////////////////////
927
928 /**
929  * Test writing using a nullptr callback
930  */
931 TEST(AsyncSocketTest, WriteNullCallback) {
932   TestServer server;
933
934   // connect()
935   EventBase evb;
936   std::shared_ptr<AsyncSocket> socket =
937     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
938   evb.loop(); // loop until the socket is connected
939
940   // write() with a nullptr callback
941   char buf[128];
942   memset(buf, 'a', sizeof(buf));
943   socket->write(nullptr, buf, sizeof(buf));
944
945   evb.loop(); // loop until the data is sent
946
947   // Make sure the server got a connection and received the data
948   socket->close();
949   server.verifyConnection(buf, sizeof(buf));
950
951   ASSERT_TRUE(socket->isClosedBySelf());
952   ASSERT_FALSE(socket->isClosedByPeer());
953 }
954
955 /**
956  * Test writing with a send timeout
957  */
958 TEST(AsyncSocketTest, WriteTimeout) {
959   TestServer server;
960
961   // connect()
962   EventBase evb;
963   std::shared_ptr<AsyncSocket> socket =
964     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
965   evb.loop(); // loop until the socket is connected
966
967   // write() a large chunk of data, with no-one on the other end reading
968   size_t writeLength = 8*1024*1024;
969   uint32_t timeout = 200;
970   socket->setSendTimeout(timeout);
971   scoped_array<char> buf(new char[writeLength]);
972   memset(buf.get(), 'a', writeLength);
973   WriteCallback wcb;
974   socket->write(&wcb, buf.get(), writeLength);
975
976   TimePoint start;
977   evb.loop();
978   TimePoint end;
979
980   // Make sure the write attempt timed out as requested
981   CHECK_EQ(wcb.state, STATE_FAILED);
982   CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
983
984   // Check that the write timed out within a reasonable period of time.
985   // We don't check for exactly the specified timeout, since AsyncSocket only
986   // times out when it hasn't made progress for that period of time.
987   //
988   // On linux, the first write sends a few hundred kb of data, then blocks for
989   // writability, and then unblocks again after 40ms and is able to write
990   // another smaller of data before blocking permanently.  Therefore it doesn't
991   // time out until 40ms + timeout.
992   //
993   // I haven't fully verified the cause of this, but I believe it probably
994   // occurs because the receiving end delays sending an ack for up to 40ms.
995   // (This is the default value for TCP_DELACK_MIN.)  Once the sender receives
996   // the ack, it can send some more data.  However, after that point the
997   // receiver's kernel buffer is full.  This 40ms delay happens even with
998   // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints.  However, the
999   // kernel may be automatically disabling TCP_QUICKACK after receiving some
1000   // data.
1001   //
1002   // For now, we simply check that the timeout occurred within 160ms of
1003   // the requested value.
1004   T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1005 }
1006
1007 /**
1008  * Test writing to a socket that the remote endpoint has closed
1009  */
1010 TEST(AsyncSocketTest, WritePipeError) {
1011   TestServer server;
1012
1013   // connect()
1014   EventBase evb;
1015   std::shared_ptr<AsyncSocket> socket =
1016     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1017   socket->setSendTimeout(1000);
1018   evb.loop(); // loop until the socket is connected
1019
1020   // accept and immediately close the socket
1021   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1022   acceptedSocket.reset();
1023
1024   // write() a large chunk of data
1025   size_t writeLength = 8*1024*1024;
1026   scoped_array<char> buf(new char[writeLength]);
1027   memset(buf.get(), 'a', writeLength);
1028   WriteCallback wcb;
1029   socket->write(&wcb, buf.get(), writeLength);
1030
1031   evb.loop();
1032
1033   // Make sure the write failed.
1034   // It would be nice if AsyncSocketException could convey the errno value,
1035   // so that we could check for EPIPE
1036   CHECK_EQ(wcb.state, STATE_FAILED);
1037   CHECK_EQ(wcb.exception.getType(),
1038                     AsyncSocketException::INTERNAL_ERROR);
1039
1040   ASSERT_FALSE(socket->isClosedBySelf());
1041   ASSERT_FALSE(socket->isClosedByPeer());
1042 }
1043
1044 /**
1045  * Test writing a mix of simple buffers and IOBufs
1046  */
1047 TEST(AsyncSocketTest, WriteIOBuf) {
1048   TestServer server;
1049
1050   // connect()
1051   EventBase evb;
1052   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1053   ConnCallback ccb;
1054   socket->connect(&ccb, server.getAddress(), 30);
1055
1056   // Accept the connection
1057   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1058   ReadCallback rcb;
1059   acceptedSocket->setReadCB(&rcb);
1060
1061   // Write a simple buffer to the socket
1062   size_t simpleBufLength = 5;
1063   char simpleBuf[simpleBufLength];
1064   memset(simpleBuf, 'a', simpleBufLength);
1065   WriteCallback wcb;
1066   socket->write(&wcb, simpleBuf, simpleBufLength);
1067
1068   // Write a single-element IOBuf chain
1069   size_t buf1Length = 7;
1070   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1071   memset(buf1->writableData(), 'b', buf1Length);
1072   buf1->append(buf1Length);
1073   unique_ptr<IOBuf> buf1Copy(buf1->clone());
1074   WriteCallback wcb2;
1075   socket->writeChain(&wcb2, std::move(buf1));
1076
1077   // Write a multiple-element IOBuf chain
1078   size_t buf2Length = 11;
1079   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1080   memset(buf2->writableData(), 'c', buf2Length);
1081   buf2->append(buf2Length);
1082   size_t buf3Length = 13;
1083   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1084   memset(buf3->writableData(), 'd', buf3Length);
1085   buf3->append(buf3Length);
1086   buf2->appendChain(std::move(buf3));
1087   unique_ptr<IOBuf> buf2Copy(buf2->clone());
1088   buf2Copy->coalesce();
1089   WriteCallback wcb3;
1090   socket->writeChain(&wcb3, std::move(buf2));
1091   socket->shutdownWrite();
1092
1093   // Let the reads and writes run to completion
1094   evb.loop();
1095
1096   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1097   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1098   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1099
1100   // Make sure the reader got the right data in the right order
1101   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1102   CHECK_EQ(rcb.buffers.size(), 1);
1103   CHECK_EQ(rcb.buffers[0].length,
1104       simpleBufLength + buf1Length + buf2Length + buf3Length);
1105   CHECK_EQ(
1106       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1107   CHECK_EQ(
1108       memcmp(rcb.buffers[0].buffer + simpleBufLength,
1109           buf1Copy->data(), buf1Copy->length()), 0);
1110   CHECK_EQ(
1111       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1112           buf2Copy->data(), buf2Copy->length()), 0);
1113
1114   acceptedSocket->close();
1115   socket->close();
1116
1117   ASSERT_TRUE(socket->isClosedBySelf());
1118   ASSERT_FALSE(socket->isClosedByPeer());
1119 }
1120
1121 TEST(AsyncSocketTest, WriteIOBufCorked) {
1122   TestServer server;
1123
1124   // connect()
1125   EventBase evb;
1126   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1127   ConnCallback ccb;
1128   socket->connect(&ccb, server.getAddress(), 30);
1129
1130   // Accept the connection
1131   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1132   ReadCallback rcb;
1133   acceptedSocket->setReadCB(&rcb);
1134
1135   // Do three writes, 100ms apart, with the "cork" flag set
1136   // on the second write.  The reader should see the first write
1137   // arrive by itself, followed by the second and third writes
1138   // arriving together.
1139   size_t buf1Length = 5;
1140   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1141   memset(buf1->writableData(), 'a', buf1Length);
1142   buf1->append(buf1Length);
1143   size_t buf2Length = 7;
1144   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1145   memset(buf2->writableData(), 'b', buf2Length);
1146   buf2->append(buf2Length);
1147   size_t buf3Length = 11;
1148   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1149   memset(buf3->writableData(), 'c', buf3Length);
1150   buf3->append(buf3Length);
1151   WriteCallback wcb1;
1152   socket->writeChain(&wcb1, std::move(buf1));
1153   WriteCallback wcb2;
1154   DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1155   write2.scheduleTimeout(100);
1156   WriteCallback wcb3;
1157   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1158   write3.scheduleTimeout(140);
1159
1160   evb.loop();
1161   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1162   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1163   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1164   if (wcb3.state != STATE_SUCCEEDED) {
1165     throw(wcb3.exception);
1166   }
1167   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1168
1169   // Make sure the reader got the data with the right grouping
1170   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1171   CHECK_EQ(rcb.buffers.size(), 2);
1172   CHECK_EQ(rcb.buffers[0].length, buf1Length);
1173   CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1174
1175   acceptedSocket->close();
1176   socket->close();
1177
1178   ASSERT_TRUE(socket->isClosedBySelf());
1179   ASSERT_FALSE(socket->isClosedByPeer());
1180 }
1181
1182 /**
1183  * Test performing a zero-length write
1184  */
1185 TEST(AsyncSocketTest, ZeroLengthWrite) {
1186   TestServer server;
1187
1188   // connect()
1189   EventBase evb;
1190   std::shared_ptr<AsyncSocket> socket =
1191     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1192   evb.loop(); // loop until the socket is connected
1193
1194   auto acceptedSocket = server.acceptAsync(&evb);
1195   ReadCallback rcb;
1196   acceptedSocket->setReadCB(&rcb);
1197
1198   size_t len1 = 1024*1024;
1199   size_t len2 = 1024*1024;
1200   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1201   memset(buf.get(), 'a', len1);
1202   memset(buf.get(), 'b', len2);
1203
1204   WriteCallback wcb1;
1205   WriteCallback wcb2;
1206   WriteCallback wcb3;
1207   WriteCallback wcb4;
1208   socket->write(&wcb1, buf.get(), 0);
1209   socket->write(&wcb2, buf.get(), len1);
1210   socket->write(&wcb3, buf.get() + len1, 0);
1211   socket->write(&wcb4, buf.get() + len1, len2);
1212   socket->close();
1213
1214   evb.loop(); // loop until the data is sent
1215
1216   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1217   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1218   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1219   CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1220   rcb.verifyData(buf.get(), len1 + len2);
1221
1222   ASSERT_TRUE(socket->isClosedBySelf());
1223   ASSERT_FALSE(socket->isClosedByPeer());
1224 }
1225
1226 TEST(AsyncSocketTest, ZeroLengthWritev) {
1227   TestServer server;
1228
1229   // connect()
1230   EventBase evb;
1231   std::shared_ptr<AsyncSocket> socket =
1232     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1233   evb.loop(); // loop until the socket is connected
1234
1235   auto acceptedSocket = server.acceptAsync(&evb);
1236   ReadCallback rcb;
1237   acceptedSocket->setReadCB(&rcb);
1238
1239   size_t len1 = 1024*1024;
1240   size_t len2 = 1024*1024;
1241   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1242   memset(buf.get(), 'a', len1);
1243   memset(buf.get(), 'b', len2);
1244
1245   WriteCallback wcb;
1246   size_t iovCount = 4;
1247   struct iovec iov[iovCount];
1248   iov[0].iov_base = buf.get();
1249   iov[0].iov_len = len1;
1250   iov[1].iov_base = buf.get() + len1;
1251   iov[1].iov_len = 0;
1252   iov[2].iov_base = buf.get() + len1;
1253   iov[2].iov_len = len2;
1254   iov[3].iov_base = buf.get() + len1 + len2;
1255   iov[3].iov_len = 0;
1256
1257   socket->writev(&wcb, iov, iovCount);
1258   socket->close();
1259   evb.loop(); // loop until the data is sent
1260
1261   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1262   rcb.verifyData(buf.get(), len1 + len2);
1263
1264   ASSERT_TRUE(socket->isClosedBySelf());
1265   ASSERT_FALSE(socket->isClosedByPeer());
1266 }
1267
1268 ///////////////////////////////////////////////////////////////////////////
1269 // close() related tests
1270 ///////////////////////////////////////////////////////////////////////////
1271
1272 /**
1273  * Test calling close() with pending writes when the socket is already closing.
1274  */
1275 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1276   TestServer server;
1277
1278   // connect()
1279   EventBase evb;
1280   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1281   ConnCallback ccb;
1282   socket->connect(&ccb, server.getAddress(), 30);
1283
1284   // accept the socket on the server side
1285   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1286
1287   // Loop to ensure the connect has completed
1288   evb.loop();
1289
1290   // Make sure we are connected
1291   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1292
1293   // Schedule pending writes, until several write attempts have blocked
1294   char buf[128];
1295   memset(buf, 'a', sizeof(buf));
1296   typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1297   WriteCallbackVector writeCallbacks;
1298
1299   writeCallbacks.reserve(5);
1300   while (writeCallbacks.size() < 5) {
1301     std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1302
1303     socket->write(wcb.get(), buf, sizeof(buf));
1304     if (wcb->state == STATE_SUCCEEDED) {
1305       // Succeeded immediately.  Keep performing more writes
1306       continue;
1307     }
1308
1309     // This write is blocked.
1310     // Have the write callback call close() when writeError() is invoked
1311     wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1312     writeCallbacks.push_back(wcb);
1313   }
1314
1315   // Call closeNow() to immediately fail the pending writes
1316   socket->closeNow();
1317
1318   // Make sure writeError() was invoked on all of the pending write callbacks
1319   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1320        it != writeCallbacks.end();
1321        ++it) {
1322     CHECK_EQ((*it)->state, STATE_FAILED);
1323   }
1324
1325   ASSERT_TRUE(socket->isClosedBySelf());
1326   ASSERT_FALSE(socket->isClosedByPeer());
1327 }
1328
1329 ///////////////////////////////////////////////////////////////////////////
1330 // ImmediateRead related tests
1331 ///////////////////////////////////////////////////////////////////////////
1332
1333 /* AsyncSocket use to verify immediate read works */
1334 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1335  public:
1336   bool immediateReadCalled = false;
1337   explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1338  protected:
1339   void checkForImmediateRead() noexcept override {
1340     immediateReadCalled = true;
1341     AsyncSocket::handleRead();
1342   }
1343 };
1344
1345 TEST(AsyncSocket, ConnectReadImmediateRead) {
1346   TestServer server;
1347
1348   const size_t maxBufferSz = 100;
1349   const size_t maxReadsPerEvent = 1;
1350   const size_t expectedDataSz = maxBufferSz * 3;
1351   char expectedData[expectedDataSz];
1352   memset(expectedData, 'j', expectedDataSz);
1353
1354   EventBase evb;
1355   ReadCallback rcb(maxBufferSz);
1356   AsyncSocketImmediateRead socket(&evb);
1357   socket.connect(nullptr, server.getAddress(), 30);
1358
1359   evb.loop(); // loop until the socket is connected
1360
1361   socket.setReadCB(&rcb);
1362   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1363   socket.immediateReadCalled = false;
1364
1365   auto acceptedSocket = server.acceptAsync(&evb);
1366
1367   ReadCallback rcbServer;
1368   WriteCallback wcbServer;
1369   rcbServer.dataAvailableCallback = [&]() {
1370     if (rcbServer.dataRead() == expectedDataSz) {
1371       // write back all data read
1372       rcbServer.verifyData(expectedData, expectedDataSz);
1373       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1374       acceptedSocket->close();
1375     }
1376   };
1377   acceptedSocket->setReadCB(&rcbServer);
1378
1379   // write data
1380   WriteCallback wcb1;
1381   socket.write(&wcb1, expectedData, expectedDataSz);
1382   evb.loop();
1383   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1384   rcb.verifyData(expectedData, expectedDataSz);
1385   CHECK_EQ(socket.immediateReadCalled, true);
1386
1387   ASSERT_FALSE(socket.isClosedBySelf());
1388   ASSERT_FALSE(socket.isClosedByPeer());
1389 }
1390
1391 TEST(AsyncSocket, ConnectReadUninstallRead) {
1392   TestServer server;
1393
1394   const size_t maxBufferSz = 100;
1395   const size_t maxReadsPerEvent = 1;
1396   const size_t expectedDataSz = maxBufferSz * 3;
1397   char expectedData[expectedDataSz];
1398   memset(expectedData, 'k', expectedDataSz);
1399
1400   EventBase evb;
1401   ReadCallback rcb(maxBufferSz);
1402   AsyncSocketImmediateRead socket(&evb);
1403   socket.connect(nullptr, server.getAddress(), 30);
1404
1405   evb.loop(); // loop until the socket is connected
1406
1407   socket.setReadCB(&rcb);
1408   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1409   socket.immediateReadCalled = false;
1410
1411   auto acceptedSocket = server.acceptAsync(&evb);
1412
1413   ReadCallback rcbServer;
1414   WriteCallback wcbServer;
1415   rcbServer.dataAvailableCallback = [&]() {
1416     if (rcbServer.dataRead() == expectedDataSz) {
1417       // write back all data read
1418       rcbServer.verifyData(expectedData, expectedDataSz);
1419       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1420       acceptedSocket->close();
1421     }
1422   };
1423   acceptedSocket->setReadCB(&rcbServer);
1424
1425   rcb.dataAvailableCallback = [&]() {
1426     // we read data and reset readCB
1427     socket.setReadCB(nullptr);
1428   };
1429
1430   // write data
1431   WriteCallback wcb;
1432   socket.write(&wcb, expectedData, expectedDataSz);
1433   evb.loop();
1434   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1435
1436   /* we shoud've only read maxBufferSz data since readCallback_
1437    * was reset in dataAvailableCallback */
1438   CHECK_EQ(rcb.dataRead(), maxBufferSz);
1439   CHECK_EQ(socket.immediateReadCalled, false);
1440
1441   ASSERT_FALSE(socket.isClosedBySelf());
1442   ASSERT_FALSE(socket.isClosedByPeer());
1443 }
1444
1445 // TODO:
1446 // - Test connect() and have the connect callback set the read callback
1447 // - Test connect() and have the connect callback unset the read callback
1448 // - Test reading/writing/closing/destroying the socket in the connect callback
1449 // - Test reading/writing/closing/destroying the socket in the read callback
1450 // - Test reading/writing/closing/destroying the socket in the write callback
1451 // - Test one-way shutdown behavior
1452 // - Test changing the EventBase
1453 //
1454 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1455 //   in connectSuccess(), readDataAvailable(), writeSuccess()
1456
1457
1458 ///////////////////////////////////////////////////////////////////////////
1459 // AsyncServerSocket tests
1460 ///////////////////////////////////////////////////////////////////////////
1461 namespace {
1462 /**
1463  * Helper ConnectionEventCallback class for the test code.
1464  * It maintains counters protected by a spin lock.
1465  */
1466 class TestConnectionEventCallback :
1467   public AsyncServerSocket::ConnectionEventCallback {
1468  public:
1469   virtual void onConnectionAccepted(
1470       const int /* socket */,
1471       const SocketAddress& /* addr */) noexcept override {
1472     folly::RWSpinLock::WriteHolder holder(spinLock_);
1473     connectionAccepted_++;
1474   }
1475
1476   virtual void onConnectionAcceptError(const int /* err */) noexcept override {
1477     folly::RWSpinLock::WriteHolder holder(spinLock_);
1478     connectionAcceptedError_++;
1479   }
1480
1481   virtual void onConnectionDropped(
1482       const int /* socket */,
1483       const SocketAddress& /* addr */) noexcept override {
1484     folly::RWSpinLock::WriteHolder holder(spinLock_);
1485     connectionDropped_++;
1486   }
1487
1488   virtual void onConnectionEnqueuedForAcceptorCallback(
1489       const int /* socket */,
1490       const SocketAddress& /* addr */) noexcept override {
1491     folly::RWSpinLock::WriteHolder holder(spinLock_);
1492     connectionEnqueuedForAcceptCallback_++;
1493   }
1494
1495   virtual void onConnectionDequeuedByAcceptorCallback(
1496       const int /* socket */,
1497       const SocketAddress& /* addr */) noexcept override {
1498     folly::RWSpinLock::WriteHolder holder(spinLock_);
1499     connectionDequeuedByAcceptCallback_++;
1500   }
1501
1502   virtual void onBackoffStarted() noexcept override {
1503     folly::RWSpinLock::WriteHolder holder(spinLock_);
1504     backoffStarted_++;
1505   }
1506
1507   virtual void onBackoffEnded() noexcept override {
1508     folly::RWSpinLock::WriteHolder holder(spinLock_);
1509     backoffEnded_++;
1510   }
1511
1512   virtual void onBackoffError() noexcept override {
1513     folly::RWSpinLock::WriteHolder holder(spinLock_);
1514     backoffError_++;
1515   }
1516
1517   unsigned int getConnectionAccepted() const {
1518     folly::RWSpinLock::ReadHolder holder(spinLock_);
1519     return connectionAccepted_;
1520   }
1521
1522   unsigned int getConnectionAcceptedError() const {
1523     folly::RWSpinLock::ReadHolder holder(spinLock_);
1524     return connectionAcceptedError_;
1525   }
1526
1527   unsigned int getConnectionDropped() const {
1528     folly::RWSpinLock::ReadHolder holder(spinLock_);
1529     return connectionDropped_;
1530   }
1531
1532   unsigned int getConnectionEnqueuedForAcceptCallback() const {
1533     folly::RWSpinLock::ReadHolder holder(spinLock_);
1534     return connectionEnqueuedForAcceptCallback_;
1535   }
1536
1537   unsigned int getConnectionDequeuedByAcceptCallback() const {
1538     folly::RWSpinLock::ReadHolder holder(spinLock_);
1539     return connectionDequeuedByAcceptCallback_;
1540   }
1541
1542   unsigned int getBackoffStarted() const {
1543     folly::RWSpinLock::ReadHolder holder(spinLock_);
1544     return backoffStarted_;
1545   }
1546
1547   unsigned int getBackoffEnded() const {
1548     folly::RWSpinLock::ReadHolder holder(spinLock_);
1549     return backoffEnded_;
1550   }
1551
1552   unsigned int getBackoffError() const {
1553     folly::RWSpinLock::ReadHolder holder(spinLock_);
1554     return backoffError_;
1555   }
1556
1557  private:
1558   mutable folly::RWSpinLock spinLock_;
1559   unsigned int connectionAccepted_{0};
1560   unsigned int connectionAcceptedError_{0};
1561   unsigned int connectionDropped_{0};
1562   unsigned int connectionEnqueuedForAcceptCallback_{0};
1563   unsigned int connectionDequeuedByAcceptCallback_{0};
1564   unsigned int backoffStarted_{0};
1565   unsigned int backoffEnded_{0};
1566   unsigned int backoffError_{0};
1567 };
1568
1569 /**
1570  * Helper AcceptCallback class for the test code
1571  * It records the callbacks that were invoked, and also supports calling
1572  * generic std::function objects in each callback.
1573  */
1574 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1575  public:
1576   enum EventType {
1577     TYPE_START,
1578     TYPE_ACCEPT,
1579     TYPE_ERROR,
1580     TYPE_STOP
1581   };
1582   struct EventInfo {
1583     EventInfo(int fd, const folly::SocketAddress& addr)
1584       : type(TYPE_ACCEPT),
1585         fd(fd),
1586         address(addr),
1587         errorMsg() {}
1588     explicit EventInfo(const std::string& msg)
1589       : type(TYPE_ERROR),
1590         fd(-1),
1591         address(),
1592         errorMsg(msg) {}
1593     explicit EventInfo(EventType et)
1594       : type(et),
1595         fd(-1),
1596         address(),
1597         errorMsg() {}
1598
1599     EventType type;
1600     int fd;  // valid for TYPE_ACCEPT
1601     folly::SocketAddress address;  // valid for TYPE_ACCEPT
1602     string errorMsg;  // valid for TYPE_ERROR
1603   };
1604   typedef std::deque<EventInfo> EventList;
1605
1606   TestAcceptCallback()
1607     : connectionAcceptedFn_(),
1608       acceptErrorFn_(),
1609       acceptStoppedFn_(),
1610       events_() {}
1611
1612   std::deque<EventInfo>* getEvents() {
1613     return &events_;
1614   }
1615
1616   void setConnectionAcceptedFn(
1617       const std::function<void(int, const folly::SocketAddress&)>& fn) {
1618     connectionAcceptedFn_ = fn;
1619   }
1620   void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1621     acceptErrorFn_ = fn;
1622   }
1623   void setAcceptStartedFn(const std::function<void()>& fn) {
1624     acceptStartedFn_ = fn;
1625   }
1626   void setAcceptStoppedFn(const std::function<void()>& fn) {
1627     acceptStoppedFn_ = fn;
1628   }
1629
1630   void connectionAccepted(
1631       int fd, const folly::SocketAddress& clientAddr) noexcept override {
1632     events_.emplace_back(fd, clientAddr);
1633
1634     if (connectionAcceptedFn_) {
1635       connectionAcceptedFn_(fd, clientAddr);
1636     }
1637   }
1638   void acceptError(const std::exception& ex) noexcept override {
1639     events_.emplace_back(ex.what());
1640
1641     if (acceptErrorFn_) {
1642       acceptErrorFn_(ex);
1643     }
1644   }
1645   void acceptStarted() noexcept override {
1646     events_.emplace_back(TYPE_START);
1647
1648     if (acceptStartedFn_) {
1649       acceptStartedFn_();
1650     }
1651   }
1652   void acceptStopped() noexcept override {
1653     events_.emplace_back(TYPE_STOP);
1654
1655     if (acceptStoppedFn_) {
1656       acceptStoppedFn_();
1657     }
1658   }
1659
1660  private:
1661   std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1662   std::function<void(const std::exception&)> acceptErrorFn_;
1663   std::function<void()> acceptStartedFn_;
1664   std::function<void()> acceptStoppedFn_;
1665
1666   std::deque<EventInfo> events_;
1667 };
1668 }
1669
1670 /**
1671  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1672  */
1673 TEST(AsyncSocketTest, ServerAcceptOptions) {
1674   EventBase eventBase;
1675
1676   // Create a server socket
1677   std::shared_ptr<AsyncServerSocket> serverSocket(
1678       AsyncServerSocket::newSocket(&eventBase));
1679   serverSocket->bind(0);
1680   serverSocket->listen(16);
1681   folly::SocketAddress serverAddress;
1682   serverSocket->getAddress(&serverAddress);
1683
1684   // Add a callback to accept one connection then stop the loop
1685   TestAcceptCallback acceptCallback;
1686   acceptCallback.setConnectionAcceptedFn(
1687       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1688         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1689       });
1690   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1691     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1692   });
1693   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1694   serverSocket->startAccepting();
1695
1696   // Connect to the server socket
1697   std::shared_ptr<AsyncSocket> socket(
1698       AsyncSocket::newSocket(&eventBase, serverAddress));
1699
1700   eventBase.loop();
1701
1702   // Verify that the server accepted a connection
1703   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1704   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1705                     TestAcceptCallback::TYPE_START);
1706   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1707                     TestAcceptCallback::TYPE_ACCEPT);
1708   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1709                     TestAcceptCallback::TYPE_STOP);
1710   int fd = acceptCallback.getEvents()->at(1).fd;
1711
1712   // The accepted connection should already be in non-blocking mode
1713   int flags = fcntl(fd, F_GETFL, 0);
1714   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1715
1716 #ifndef TCP_NOPUSH
1717   // The accepted connection should already have TCP_NODELAY set
1718   int value;
1719   socklen_t valueLength = sizeof(value);
1720   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1721   CHECK_EQ(rc, 0);
1722   CHECK_EQ(value, 1);
1723 #endif
1724 }
1725
1726 /**
1727  * Test AsyncServerSocket::removeAcceptCallback()
1728  */
1729 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1730   // Create a new AsyncServerSocket
1731   EventBase eventBase;
1732   std::shared_ptr<AsyncServerSocket> serverSocket(
1733       AsyncServerSocket::newSocket(&eventBase));
1734   serverSocket->bind(0);
1735   serverSocket->listen(16);
1736   folly::SocketAddress serverAddress;
1737   serverSocket->getAddress(&serverAddress);
1738
1739   // Add several accept callbacks
1740   TestAcceptCallback cb1;
1741   TestAcceptCallback cb2;
1742   TestAcceptCallback cb3;
1743   TestAcceptCallback cb4;
1744   TestAcceptCallback cb5;
1745   TestAcceptCallback cb6;
1746   TestAcceptCallback cb7;
1747
1748   // Test having callbacks remove other callbacks before them on the list,
1749   // after them on the list, or removing themselves.
1750   //
1751   // Have callback 2 remove callback 3 and callback 5 the first time it is
1752   // called.
1753   int cb2Count = 0;
1754   cb1.setConnectionAcceptedFn([&](int /* fd */,
1755                                   const folly::SocketAddress& /* addr */) {
1756     std::shared_ptr<AsyncSocket> sock2(
1757         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1758   });
1759   cb3.setConnectionAcceptedFn(
1760       [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1761   cb4.setConnectionAcceptedFn(
1762       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1763         std::shared_ptr<AsyncSocket> sock3(
1764             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1765       });
1766   cb5.setConnectionAcceptedFn(
1767       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1768         std::shared_ptr<AsyncSocket> sock5(
1769             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1770
1771       });
1772   cb2.setConnectionAcceptedFn(
1773       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1774         if (cb2Count == 0) {
1775           serverSocket->removeAcceptCallback(&cb3, nullptr);
1776           serverSocket->removeAcceptCallback(&cb5, nullptr);
1777         }
1778         ++cb2Count;
1779       });
1780   // Have callback 6 remove callback 4 the first time it is called,
1781   // and destroy the server socket the second time it is called
1782   int cb6Count = 0;
1783   cb6.setConnectionAcceptedFn(
1784       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1785         if (cb6Count == 0) {
1786           serverSocket->removeAcceptCallback(&cb4, nullptr);
1787           std::shared_ptr<AsyncSocket> sock6(
1788               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1789           std::shared_ptr<AsyncSocket> sock7(
1790               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1791           std::shared_ptr<AsyncSocket> sock8(
1792               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1793
1794         } else {
1795           serverSocket.reset();
1796         }
1797         ++cb6Count;
1798       });
1799   // Have callback 7 remove itself
1800   cb7.setConnectionAcceptedFn(
1801       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1802         serverSocket->removeAcceptCallback(&cb7, nullptr);
1803       });
1804
1805   serverSocket->addAcceptCallback(&cb1, nullptr);
1806   serverSocket->addAcceptCallback(&cb2, nullptr);
1807   serverSocket->addAcceptCallback(&cb3, nullptr);
1808   serverSocket->addAcceptCallback(&cb4, nullptr);
1809   serverSocket->addAcceptCallback(&cb5, nullptr);
1810   serverSocket->addAcceptCallback(&cb6, nullptr);
1811   serverSocket->addAcceptCallback(&cb7, nullptr);
1812   serverSocket->startAccepting();
1813
1814   // Make several connections to the socket
1815   std::shared_ptr<AsyncSocket> sock1(
1816       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1817   std::shared_ptr<AsyncSocket> sock4(
1818       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1819
1820   // Loop until we are stopped
1821   eventBase.loop();
1822
1823   // Check to make sure that the expected callbacks were invoked.
1824   //
1825   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1826   // the AcceptCallbacks in round-robin fashion, in the order that they were
1827   // added.  The code is implemented this way right now, but the API doesn't
1828   // explicitly require it be done this way.  If we change the code not to be
1829   // exactly round robin in the future, we can simplify the test checks here.
1830   // (We'll also need to update the termination code, since we expect cb6 to
1831   // get called twice to terminate the loop.)
1832   CHECK_EQ(cb1.getEvents()->size(), 4);
1833   CHECK_EQ(cb1.getEvents()->at(0).type,
1834                     TestAcceptCallback::TYPE_START);
1835   CHECK_EQ(cb1.getEvents()->at(1).type,
1836                     TestAcceptCallback::TYPE_ACCEPT);
1837   CHECK_EQ(cb1.getEvents()->at(2).type,
1838                     TestAcceptCallback::TYPE_ACCEPT);
1839   CHECK_EQ(cb1.getEvents()->at(3).type,
1840                     TestAcceptCallback::TYPE_STOP);
1841
1842   CHECK_EQ(cb2.getEvents()->size(), 4);
1843   CHECK_EQ(cb2.getEvents()->at(0).type,
1844                     TestAcceptCallback::TYPE_START);
1845   CHECK_EQ(cb2.getEvents()->at(1).type,
1846                     TestAcceptCallback::TYPE_ACCEPT);
1847   CHECK_EQ(cb2.getEvents()->at(2).type,
1848                     TestAcceptCallback::TYPE_ACCEPT);
1849   CHECK_EQ(cb2.getEvents()->at(3).type,
1850                     TestAcceptCallback::TYPE_STOP);
1851
1852   CHECK_EQ(cb3.getEvents()->size(), 2);
1853   CHECK_EQ(cb3.getEvents()->at(0).type,
1854                     TestAcceptCallback::TYPE_START);
1855   CHECK_EQ(cb3.getEvents()->at(1).type,
1856                     TestAcceptCallback::TYPE_STOP);
1857
1858   CHECK_EQ(cb4.getEvents()->size(), 3);
1859   CHECK_EQ(cb4.getEvents()->at(0).type,
1860                     TestAcceptCallback::TYPE_START);
1861   CHECK_EQ(cb4.getEvents()->at(1).type,
1862                     TestAcceptCallback::TYPE_ACCEPT);
1863   CHECK_EQ(cb4.getEvents()->at(2).type,
1864                     TestAcceptCallback::TYPE_STOP);
1865
1866   CHECK_EQ(cb5.getEvents()->size(), 2);
1867   CHECK_EQ(cb5.getEvents()->at(0).type,
1868                     TestAcceptCallback::TYPE_START);
1869   CHECK_EQ(cb5.getEvents()->at(1).type,
1870                     TestAcceptCallback::TYPE_STOP);
1871
1872   CHECK_EQ(cb6.getEvents()->size(), 4);
1873   CHECK_EQ(cb6.getEvents()->at(0).type,
1874                     TestAcceptCallback::TYPE_START);
1875   CHECK_EQ(cb6.getEvents()->at(1).type,
1876                     TestAcceptCallback::TYPE_ACCEPT);
1877   CHECK_EQ(cb6.getEvents()->at(2).type,
1878                     TestAcceptCallback::TYPE_ACCEPT);
1879   CHECK_EQ(cb6.getEvents()->at(3).type,
1880                     TestAcceptCallback::TYPE_STOP);
1881
1882   CHECK_EQ(cb7.getEvents()->size(), 3);
1883   CHECK_EQ(cb7.getEvents()->at(0).type,
1884                     TestAcceptCallback::TYPE_START);
1885   CHECK_EQ(cb7.getEvents()->at(1).type,
1886                     TestAcceptCallback::TYPE_ACCEPT);
1887   CHECK_EQ(cb7.getEvents()->at(2).type,
1888                     TestAcceptCallback::TYPE_STOP);
1889 }
1890
1891 /**
1892  * Test AsyncServerSocket::removeAcceptCallback()
1893  */
1894 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1895   // Create a new AsyncServerSocket
1896   EventBase eventBase;
1897   std::shared_ptr<AsyncServerSocket> serverSocket(
1898       AsyncServerSocket::newSocket(&eventBase));
1899   serverSocket->bind(0);
1900   serverSocket->listen(16);
1901   folly::SocketAddress serverAddress;
1902   serverSocket->getAddress(&serverAddress);
1903
1904   // Add several accept callbacks
1905   TestAcceptCallback cb1;
1906   auto thread_id = pthread_self();
1907   cb1.setAcceptStartedFn([&](){
1908     CHECK_NE(thread_id, pthread_self());
1909     thread_id = pthread_self();
1910   });
1911   cb1.setConnectionAcceptedFn(
1912       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1913         CHECK_EQ(thread_id, pthread_self());
1914         serverSocket->removeAcceptCallback(&cb1, nullptr);
1915       });
1916   cb1.setAcceptStoppedFn([&](){
1917     CHECK_EQ(thread_id, pthread_self());
1918   });
1919
1920   // Test having callbacks remove other callbacks before them on the list,
1921   serverSocket->addAcceptCallback(&cb1, nullptr);
1922   serverSocket->startAccepting();
1923
1924   // Make several connections to the socket
1925   std::shared_ptr<AsyncSocket> sock1(
1926       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1927
1928   // Loop in another thread
1929   auto other = std::thread([&](){
1930     eventBase.loop();
1931   });
1932   other.join();
1933
1934   // Check to make sure that the expected callbacks were invoked.
1935   //
1936   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1937   // the AcceptCallbacks in round-robin fashion, in the order that they were
1938   // added.  The code is implemented this way right now, but the API doesn't
1939   // explicitly require it be done this way.  If we change the code not to be
1940   // exactly round robin in the future, we can simplify the test checks here.
1941   // (We'll also need to update the termination code, since we expect cb6 to
1942   // get called twice to terminate the loop.)
1943   CHECK_EQ(cb1.getEvents()->size(), 3);
1944   CHECK_EQ(cb1.getEvents()->at(0).type,
1945                     TestAcceptCallback::TYPE_START);
1946   CHECK_EQ(cb1.getEvents()->at(1).type,
1947                     TestAcceptCallback::TYPE_ACCEPT);
1948   CHECK_EQ(cb1.getEvents()->at(2).type,
1949                     TestAcceptCallback::TYPE_STOP);
1950
1951 }
1952
1953 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1954   // Add a callback to accept one connection then stop accepting
1955   TestAcceptCallback acceptCallback;
1956   acceptCallback.setConnectionAcceptedFn(
1957       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1958         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1959       });
1960   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1961     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1962   });
1963   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1964   serverSocket->startAccepting();
1965
1966   // Connect to the server socket
1967   EventBase* eventBase = serverSocket->getEventBase();
1968   folly::SocketAddress serverAddress;
1969   serverSocket->getAddress(&serverAddress);
1970   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1971
1972   // Loop to process all events
1973   eventBase->loop();
1974
1975   // Verify that the server accepted a connection
1976   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1977   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1978                     TestAcceptCallback::TYPE_START);
1979   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1980                     TestAcceptCallback::TYPE_ACCEPT);
1981   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1982                     TestAcceptCallback::TYPE_STOP);
1983 }
1984
1985 /* Verify that we don't leak sockets if we are destroyed()
1986  * and there are still writes pending
1987  *
1988  * If destroy() only calls close() instead of closeNow(),
1989  * it would shutdown(writes) on the socket, but it would
1990  * never be close()'d, and the socket would leak
1991  */
1992 TEST(AsyncSocketTest, DestroyCloseTest) {
1993   TestServer server;
1994
1995   // connect()
1996   EventBase clientEB;
1997   EventBase serverEB;
1998   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1999   ConnCallback ccb;
2000   socket->connect(&ccb, server.getAddress(), 30);
2001
2002   // Accept the connection
2003   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
2004   ReadCallback rcb;
2005   acceptedSocket->setReadCB(&rcb);
2006
2007   // Write a large buffer to the socket that is larger than kernel buffer
2008   size_t simpleBufLength = 5000000;
2009   char* simpleBuf = new char[simpleBufLength];
2010   memset(simpleBuf, 'a', simpleBufLength);
2011   WriteCallback wcb;
2012
2013   // Let the reads and writes run to completion
2014   int fd = acceptedSocket->getFd();
2015
2016   acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2017   socket.reset();
2018   acceptedSocket.reset();
2019
2020   // Test that server socket was closed
2021   ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2022   CHECK_EQ(sz, -1);
2023   CHECK_EQ(errno, 9);
2024   delete[] simpleBuf;
2025 }
2026
2027 /**
2028  * Test AsyncServerSocket::useExistingSocket()
2029  */
2030 TEST(AsyncSocketTest, ServerExistingSocket) {
2031   EventBase eventBase;
2032
2033   // Test creating a socket, and letting AsyncServerSocket bind and listen
2034   {
2035     // Manually create a socket
2036     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2037     ASSERT_GE(fd, 0);
2038
2039     // Create a server socket
2040     AsyncServerSocket::UniquePtr serverSocket(
2041         new AsyncServerSocket(&eventBase));
2042     serverSocket->useExistingSocket(fd);
2043     folly::SocketAddress address;
2044     serverSocket->getAddress(&address);
2045     address.setPort(0);
2046     serverSocket->bind(address);
2047     serverSocket->listen(16);
2048
2049     // Make sure the socket works
2050     serverSocketSanityTest(serverSocket.get());
2051   }
2052
2053   // Test creating a socket and binding manually,
2054   // then letting AsyncServerSocket listen
2055   {
2056     // Manually create a socket
2057     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2058     ASSERT_GE(fd, 0);
2059     // bind
2060     struct sockaddr_in addr;
2061     addr.sin_family = AF_INET;
2062     addr.sin_port = 0;
2063     addr.sin_addr.s_addr = INADDR_ANY;
2064     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2065                              sizeof(addr)), 0);
2066     // Look up the address that we bound to
2067     folly::SocketAddress boundAddress;
2068     boundAddress.setFromLocalAddress(fd);
2069
2070     // Create a server socket
2071     AsyncServerSocket::UniquePtr serverSocket(
2072         new AsyncServerSocket(&eventBase));
2073     serverSocket->useExistingSocket(fd);
2074     serverSocket->listen(16);
2075
2076     // Make sure AsyncServerSocket reports the same address that we bound to
2077     folly::SocketAddress serverSocketAddress;
2078     serverSocket->getAddress(&serverSocketAddress);
2079     CHECK_EQ(boundAddress, serverSocketAddress);
2080
2081     // Make sure the socket works
2082     serverSocketSanityTest(serverSocket.get());
2083   }
2084
2085   // Test creating a socket, binding and listening manually,
2086   // then giving it to AsyncServerSocket
2087   {
2088     // Manually create a socket
2089     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2090     ASSERT_GE(fd, 0);
2091     // bind
2092     struct sockaddr_in addr;
2093     addr.sin_family = AF_INET;
2094     addr.sin_port = 0;
2095     addr.sin_addr.s_addr = INADDR_ANY;
2096     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2097                              sizeof(addr)), 0);
2098     // Look up the address that we bound to
2099     folly::SocketAddress boundAddress;
2100     boundAddress.setFromLocalAddress(fd);
2101     // listen
2102     CHECK_EQ(listen(fd, 16), 0);
2103
2104     // Create a server socket
2105     AsyncServerSocket::UniquePtr serverSocket(
2106         new AsyncServerSocket(&eventBase));
2107     serverSocket->useExistingSocket(fd);
2108
2109     // Make sure AsyncServerSocket reports the same address that we bound to
2110     folly::SocketAddress serverSocketAddress;
2111     serverSocket->getAddress(&serverSocketAddress);
2112     CHECK_EQ(boundAddress, serverSocketAddress);
2113
2114     // Make sure the socket works
2115     serverSocketSanityTest(serverSocket.get());
2116   }
2117 }
2118
2119 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2120   EventBase eventBase;
2121
2122   // Create a server socket
2123   std::shared_ptr<AsyncServerSocket> serverSocket(
2124       AsyncServerSocket::newSocket(&eventBase));
2125   string path(1, 0);
2126   path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2127   folly::SocketAddress serverAddress;
2128   serverAddress.setFromPath(path);
2129   serverSocket->bind(serverAddress);
2130   serverSocket->listen(16);
2131
2132   // Add a callback to accept one connection then stop the loop
2133   TestAcceptCallback acceptCallback;
2134   acceptCallback.setConnectionAcceptedFn(
2135       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2136         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2137       });
2138   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2139     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2140   });
2141   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2142   serverSocket->startAccepting();
2143
2144   // Connect to the server socket
2145   std::shared_ptr<AsyncSocket> socket(
2146       AsyncSocket::newSocket(&eventBase, serverAddress));
2147
2148   eventBase.loop();
2149
2150   // Verify that the server accepted a connection
2151   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2152   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2153                     TestAcceptCallback::TYPE_START);
2154   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2155                     TestAcceptCallback::TYPE_ACCEPT);
2156   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2157                     TestAcceptCallback::TYPE_STOP);
2158   int fd = acceptCallback.getEvents()->at(1).fd;
2159
2160   // The accepted connection should already be in non-blocking mode
2161   int flags = fcntl(fd, F_GETFL, 0);
2162   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2163 }
2164
2165 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2166   EventBase eventBase;
2167   TestConnectionEventCallback connectionEventCallback;
2168
2169   // Create a server socket
2170   std::shared_ptr<AsyncServerSocket> serverSocket(
2171       AsyncServerSocket::newSocket(&eventBase));
2172   serverSocket->setConnectionEventCallback(&connectionEventCallback);
2173   serverSocket->bind(0);
2174   serverSocket->listen(16);
2175   folly::SocketAddress serverAddress;
2176   serverSocket->getAddress(&serverAddress);
2177
2178   // Add a callback to accept one connection then stop the loop
2179   TestAcceptCallback acceptCallback;
2180   acceptCallback.setConnectionAcceptedFn(
2181       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2182         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2183       });
2184   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2185     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2186   });
2187   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2188   serverSocket->startAccepting();
2189
2190   // Connect to the server socket
2191   std::shared_ptr<AsyncSocket> socket(
2192       AsyncSocket::newSocket(&eventBase, serverAddress));
2193
2194   eventBase.loop();
2195
2196   // Validate the connection event counters
2197   ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2198   ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2199   ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2200   ASSERT_EQ(
2201       connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2202   ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2203   ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2204   ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2205   ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2206 }
2207
2208 /**
2209  * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2210  */
2211 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2212   EventBase eventBase;
2213
2214   // Counter of how many connections have been accepted
2215   int count = 0;
2216
2217   // Create a server socket
2218   auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2219   serverSocket->bind(0);
2220   serverSocket->listen(16);
2221   folly::SocketAddress serverAddress;
2222   serverSocket->getAddress(&serverAddress);
2223
2224   // Add a callback to accept connections
2225   TestAcceptCallback acceptCallback;
2226   acceptCallback.setConnectionAcceptedFn(
2227       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2228         count++;
2229         CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2230
2231         if (count == 4) {
2232           // all messages are processed, remove accept callback
2233           serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2234         }
2235       });
2236   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2237     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2238   });
2239   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2240   serverSocket->startAccepting();
2241
2242   // Connect to the server socket, 4 clients, there are 4 connections
2243   auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2244   auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2245   auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2246   auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2247
2248   eventBase.loop();
2249 }
2250
2251 /**
2252  * Test AsyncTransport::BufferCallback
2253  */
2254 TEST(AsyncSocketTest, BufferTest) {
2255   TestServer server;
2256
2257   EventBase evb;
2258   AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2259   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2260   ConnCallback ccb;
2261   socket->connect(&ccb, server.getAddress(), 30, option);
2262
2263   char buf[100 * 1024];
2264   memset(buf, 'c', sizeof(buf));
2265   WriteCallback wcb;
2266   BufferCallback bcb;
2267   socket->setBufferCallback(&bcb);
2268   socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2269
2270   evb.loop();
2271   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2272   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
2273
2274   ASSERT_TRUE(bcb.hasBuffered());
2275   ASSERT_TRUE(bcb.hasBufferCleared());
2276
2277   socket->close();
2278   server.verifyConnection(buf, sizeof(buf));
2279
2280   ASSERT_TRUE(socket->isClosedBySelf());
2281   ASSERT_FALSE(socket->isClosedByPeer());
2282 }
2283
2284 TEST(AsyncSocketTest, BufferCallbackKill) {
2285   TestServer server;
2286   EventBase evb;
2287   AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2288   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2289   ConnCallback ccb;
2290   socket->connect(&ccb, server.getAddress(), 30, option);
2291   evb.loopOnce();
2292
2293   char buf[100 * 1024];
2294   memset(buf, 'c', sizeof(buf));
2295   BufferCallback* bcb = new BufferCallback;
2296   socket->setBufferCallback(bcb);
2297   WriteCallback wcb;
2298   wcb.successCallback = [&] {
2299     ASSERT_TRUE(socket.unique());
2300     socket.reset();
2301   };
2302
2303   // This will trigger AsyncSocket::handleWrite,
2304   // which calls WriteCallback::writeSuccess,
2305   // which calls wcb.successCallback above,
2306   // which tries to delete socket
2307   // Then, the socket will also try to use this BufferCallback
2308   // And that should crash us, if there is no DestructorGuard on the stack
2309   socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2310
2311   evb.loop();
2312   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2313 }