Add handshake and connect times
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/SocketAddress.h>
21
22 #include <folly/io/IOBuf.h>
23 #include <folly/io/async/test/AsyncSocketTest.h>
24 #include <folly/io/async/test/Util.h>
25 #include <folly/test/SocketAddressTestHelper.h>
26
27 #include <gtest/gtest.h>
28 #include <boost/scoped_array.hpp>
29 #include <iostream>
30 #include <unistd.h>
31 #include <fcntl.h>
32 #include <poll.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/tcp.h>
36 #include <thread>
37
38 using namespace boost;
39
40 using std::string;
41 using std::vector;
42 using std::min;
43 using std::cerr;
44 using std::endl;
45 using std::unique_ptr;
46 using std::chrono::milliseconds;
47 using boost::scoped_array;
48
49 using namespace folly;
50
51 class DelayedWrite: public AsyncTimeout {
52  public:
53   DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
54       unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
55       bool cork, bool lastWrite = false):
56     AsyncTimeout(socket->getEventBase()),
57     socket_(socket),
58     bufs_(std::move(bufs)),
59     wcb_(wcb),
60     cork_(cork),
61     lastWrite_(lastWrite) {}
62
63  private:
64   void timeoutExpired() noexcept override {
65     WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
66     socket_->writeChain(wcb_, std::move(bufs_), flags);
67     if (lastWrite_) {
68       socket_->shutdownWrite();
69     }
70   }
71
72   std::shared_ptr<AsyncSocket> socket_;
73   unique_ptr<IOBuf> bufs_;
74   AsyncTransportWrapper::WriteCallback* wcb_;
75   bool cork_;
76   bool lastWrite_;
77 };
78
79 ///////////////////////////////////////////////////////////////////////////
80 // connect() tests
81 ///////////////////////////////////////////////////////////////////////////
82
83 /**
84  * Test connecting to a server
85  */
86 TEST(AsyncSocketTest, Connect) {
87   // Start listening on a local port
88   TestServer server;
89
90   // Connect using a AsyncSocket
91   EventBase evb;
92   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
93   ConnCallback cb;
94   socket->connect(&cb, server.getAddress(), 30);
95
96   evb.loop();
97
98   CHECK_EQ(cb.state, STATE_SUCCEEDED);
99   EXPECT_LE(0, socket->getConnectTime().count());
100 }
101
102 /**
103  * Test connecting to a server that isn't listening
104  */
105 TEST(AsyncSocketTest, ConnectRefused) {
106   EventBase evb;
107
108   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
109
110   // Hopefully nothing is actually listening on this address
111   folly::SocketAddress addr("127.0.0.1", 65535);
112   ConnCallback cb;
113   socket->connect(&cb, addr, 30);
114
115   evb.loop();
116
117   CHECK_EQ(cb.state, STATE_FAILED);
118   CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
119   EXPECT_LE(0, socket->getConnectTime().count());
120 }
121
122 /**
123  * Test connection timeout
124  */
125 TEST(AsyncSocketTest, ConnectTimeout) {
126   EventBase evb;
127
128   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
129
130   // Try connecting to server that won't respond.
131   //
132   // This depends somewhat on the network where this test is run.
133   // Hopefully this IP will be routable but unresponsive.
134   // (Alternatively, we could try listening on a local raw socket, but that
135   // normally requires root privileges.)
136   auto host =
137       SocketAddressTestHelper::isIPv6Enabled() ?
138       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
139       SocketAddressTestHelper::isIPv4Enabled() ?
140       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
141       nullptr;
142   SocketAddress addr(host, 65535);
143   ConnCallback cb;
144   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
145
146   evb.loop();
147
148   CHECK_EQ(cb.state, STATE_FAILED);
149   CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
150
151   // Verify that we can still get the peer address after a timeout.
152   // Use case is if the client was created from a client pool, and we want
153   // to log which peer failed.
154   folly::SocketAddress peer;
155   socket->getPeerAddress(&peer);
156   CHECK_EQ(peer, addr);
157   EXPECT_LE(0, socket->getConnectTime().count());
158 }
159
160 /**
161  * Test writing immediately after connecting, without waiting for connect
162  * to finish.
163  */
164 TEST(AsyncSocketTest, ConnectAndWrite) {
165   TestServer server;
166
167   // connect()
168   EventBase evb;
169   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
170   ConnCallback ccb;
171   socket->connect(&ccb, server.getAddress(), 30);
172
173   // write()
174   char buf[128];
175   memset(buf, 'a', sizeof(buf));
176   WriteCallback wcb;
177   socket->write(&wcb, buf, sizeof(buf));
178
179   // Loop.  We don't bother accepting on the server socket yet.
180   // The kernel should be able to buffer the write request so it can succeed.
181   evb.loop();
182
183   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
184   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
185
186   // Make sure the server got a connection and received the data
187   socket->close();
188   server.verifyConnection(buf, sizeof(buf));
189
190   ASSERT_TRUE(socket->isClosedBySelf());
191   ASSERT_FALSE(socket->isClosedByPeer());
192 }
193
194 /**
195  * Test connecting using a nullptr connect callback.
196  */
197 TEST(AsyncSocketTest, ConnectNullCallback) {
198   TestServer server;
199
200   // connect()
201   EventBase evb;
202   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
203   socket->connect(nullptr, server.getAddress(), 30);
204
205   // write some data, just so we have some way of verifing
206   // that the socket works correctly after connecting
207   char buf[128];
208   memset(buf, 'a', sizeof(buf));
209   WriteCallback wcb;
210   socket->write(&wcb, buf, sizeof(buf));
211
212   evb.loop();
213
214   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
215
216   // Make sure the server got a connection and received the data
217   socket->close();
218   server.verifyConnection(buf, sizeof(buf));
219
220   ASSERT_TRUE(socket->isClosedBySelf());
221   ASSERT_FALSE(socket->isClosedByPeer());
222 }
223
224 /**
225  * Test calling both write() and close() immediately after connecting, without
226  * waiting for connect to finish.
227  *
228  * This exercises the STATE_CONNECTING_CLOSING code.
229  */
230 TEST(AsyncSocketTest, ConnectWriteAndClose) {
231   TestServer server;
232
233   // connect()
234   EventBase evb;
235   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
236   ConnCallback ccb;
237   socket->connect(&ccb, server.getAddress(), 30);
238
239   // write()
240   char buf[128];
241   memset(buf, 'a', sizeof(buf));
242   WriteCallback wcb;
243   socket->write(&wcb, buf, sizeof(buf));
244
245   // close()
246   socket->close();
247
248   // Loop.  We don't bother accepting on the server socket yet.
249   // The kernel should be able to buffer the write request so it can succeed.
250   evb.loop();
251
252   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
253   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
254
255   // Make sure the server got a connection and received the data
256   server.verifyConnection(buf, sizeof(buf));
257
258   ASSERT_TRUE(socket->isClosedBySelf());
259   ASSERT_FALSE(socket->isClosedByPeer());
260 }
261
262 /**
263  * Test calling close() immediately after connect()
264  */
265 TEST(AsyncSocketTest, ConnectAndClose) {
266   TestServer server;
267
268   // Connect using a AsyncSocket
269   EventBase evb;
270   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
271   ConnCallback ccb;
272   socket->connect(&ccb, server.getAddress(), 30);
273
274   // Hopefully the connect didn't succeed immediately.
275   // If it did, we can't exercise the close-while-connecting code path.
276   if (ccb.state == STATE_SUCCEEDED) {
277     LOG(INFO) << "connect() succeeded immediately; aborting test "
278                        "of close-during-connect behavior";
279     return;
280   }
281
282   socket->close();
283
284   // Loop, although there shouldn't be anything to do.
285   evb.loop();
286
287   // Make sure the connection was aborted
288   CHECK_EQ(ccb.state, STATE_FAILED);
289
290   ASSERT_TRUE(socket->isClosedBySelf());
291   ASSERT_FALSE(socket->isClosedByPeer());
292 }
293
294 /**
295  * Test calling closeNow() immediately after connect()
296  *
297  * This should be identical to the normal close behavior.
298  */
299 TEST(AsyncSocketTest, ConnectAndCloseNow) {
300   TestServer server;
301
302   // Connect using a AsyncSocket
303   EventBase evb;
304   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
305   ConnCallback ccb;
306   socket->connect(&ccb, server.getAddress(), 30);
307
308   // Hopefully the connect didn't succeed immediately.
309   // If it did, we can't exercise the close-while-connecting code path.
310   if (ccb.state == STATE_SUCCEEDED) {
311     LOG(INFO) << "connect() succeeded immediately; aborting test "
312                        "of closeNow()-during-connect behavior";
313     return;
314   }
315
316   socket->closeNow();
317
318   // Loop, although there shouldn't be anything to do.
319   evb.loop();
320
321   // Make sure the connection was aborted
322   CHECK_EQ(ccb.state, STATE_FAILED);
323
324   ASSERT_TRUE(socket->isClosedBySelf());
325   ASSERT_FALSE(socket->isClosedByPeer());
326 }
327
328 /**
329  * Test calling both write() and closeNow() immediately after connecting,
330  * without waiting for connect to finish.
331  *
332  * This should abort the pending write.
333  */
334 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
335   TestServer server;
336
337   // connect()
338   EventBase evb;
339   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
340   ConnCallback ccb;
341   socket->connect(&ccb, server.getAddress(), 30);
342
343   // Hopefully the connect didn't succeed immediately.
344   // If it did, we can't exercise the close-while-connecting code path.
345   if (ccb.state == STATE_SUCCEEDED) {
346     LOG(INFO) << "connect() succeeded immediately; aborting test "
347                        "of write-during-connect behavior";
348     return;
349   }
350
351   // write()
352   char buf[128];
353   memset(buf, 'a', sizeof(buf));
354   WriteCallback wcb;
355   socket->write(&wcb, buf, sizeof(buf));
356
357   // close()
358   socket->closeNow();
359
360   // Loop, although there shouldn't be anything to do.
361   evb.loop();
362
363   CHECK_EQ(ccb.state, STATE_FAILED);
364   CHECK_EQ(wcb.state, STATE_FAILED);
365
366   ASSERT_TRUE(socket->isClosedBySelf());
367   ASSERT_FALSE(socket->isClosedByPeer());
368 }
369
370 /**
371  * Test installing a read callback immediately, before connect() finishes.
372  */
373 TEST(AsyncSocketTest, ConnectAndRead) {
374   TestServer server;
375
376   // connect()
377   EventBase evb;
378   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
379   ConnCallback ccb;
380   socket->connect(&ccb, server.getAddress(), 30);
381
382   ReadCallback rcb;
383   socket->setReadCB(&rcb);
384
385   // Even though we haven't looped yet, we should be able to accept
386   // the connection and send data to it.
387   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
388   uint8_t buf[128];
389   memset(buf, 'a', sizeof(buf));
390   acceptedSocket->write(buf, sizeof(buf));
391   acceptedSocket->flush();
392   acceptedSocket->close();
393
394   // Loop, although there shouldn't be anything to do.
395   evb.loop();
396
397   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
398   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
399   CHECK_EQ(rcb.buffers.size(), 1);
400   CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
401   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
402
403   ASSERT_FALSE(socket->isClosedBySelf());
404   ASSERT_FALSE(socket->isClosedByPeer());
405 }
406
407 /**
408  * Test installing a read callback and then closing immediately before the
409  * connect attempt finishes.
410  */
411 TEST(AsyncSocketTest, ConnectReadAndClose) {
412   TestServer server;
413
414   // connect()
415   EventBase evb;
416   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
417   ConnCallback ccb;
418   socket->connect(&ccb, server.getAddress(), 30);
419
420   // Hopefully the connect didn't succeed immediately.
421   // If it did, we can't exercise the close-while-connecting code path.
422   if (ccb.state == STATE_SUCCEEDED) {
423     LOG(INFO) << "connect() succeeded immediately; aborting test "
424                        "of read-during-connect behavior";
425     return;
426   }
427
428   ReadCallback rcb;
429   socket->setReadCB(&rcb);
430
431   // close()
432   socket->close();
433
434   // Loop, although there shouldn't be anything to do.
435   evb.loop();
436
437   CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
438   CHECK_EQ(rcb.buffers.size(), 0);
439   CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
440
441   ASSERT_TRUE(socket->isClosedBySelf());
442   ASSERT_FALSE(socket->isClosedByPeer());
443 }
444
445 /**
446  * Test both writing and installing a read callback immediately,
447  * before connect() finishes.
448  */
449 TEST(AsyncSocketTest, ConnectWriteAndRead) {
450   TestServer server;
451
452   // connect()
453   EventBase evb;
454   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
455   ConnCallback ccb;
456   socket->connect(&ccb, server.getAddress(), 30);
457
458   // write()
459   char buf1[128];
460   memset(buf1, 'a', sizeof(buf1));
461   WriteCallback wcb;
462   socket->write(&wcb, buf1, sizeof(buf1));
463
464   // set a read callback
465   ReadCallback rcb;
466   socket->setReadCB(&rcb);
467
468   // Even though we haven't looped yet, we should be able to accept
469   // the connection and send data to it.
470   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
471   uint8_t buf2[128];
472   memset(buf2, 'b', sizeof(buf2));
473   acceptedSocket->write(buf2, sizeof(buf2));
474   acceptedSocket->flush();
475
476   // shut down the write half of acceptedSocket, so that the AsyncSocket
477   // will stop reading and we can break out of the event loop.
478   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
479
480   // Loop
481   evb.loop();
482
483   // Make sure the connect succeeded
484   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
485
486   // Make sure the AsyncSocket read the data written by the accepted socket
487   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
488   CHECK_EQ(rcb.buffers.size(), 1);
489   CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
490   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
491
492   // Close the AsyncSocket so we'll see EOF on acceptedSocket
493   socket->close();
494
495   // Make sure the accepted socket saw the data written by the AsyncSocket
496   uint8_t readbuf[sizeof(buf1)];
497   acceptedSocket->readAll(readbuf, sizeof(readbuf));
498   CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
499   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
500   CHECK_EQ(bytesRead, 0);
501
502   ASSERT_FALSE(socket->isClosedBySelf());
503   ASSERT_TRUE(socket->isClosedByPeer());
504 }
505
506 /**
507  * Test writing to the socket then shutting down writes before the connect
508  * attempt finishes.
509  */
510 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
511   TestServer server;
512
513   // connect()
514   EventBase evb;
515   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
516   ConnCallback ccb;
517   socket->connect(&ccb, server.getAddress(), 30);
518
519   // Hopefully the connect didn't succeed immediately.
520   // If it did, we can't exercise the write-while-connecting code path.
521   if (ccb.state == STATE_SUCCEEDED) {
522     LOG(INFO) << "connect() succeeded immediately; skipping test";
523     return;
524   }
525
526   // Ask to write some data
527   char wbuf[128];
528   memset(wbuf, 'a', sizeof(wbuf));
529   WriteCallback wcb;
530   socket->write(&wcb, wbuf, sizeof(wbuf));
531   socket->shutdownWrite();
532
533   // Shutdown writes
534   socket->shutdownWrite();
535
536   // Even though we haven't looped yet, we should be able to accept
537   // the connection.
538   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
539
540   // Since the connection is still in progress, there should be no data to
541   // read yet.  Verify that the accepted socket is not readable.
542   struct pollfd fds[1];
543   fds[0].fd = acceptedSocket->getSocketFD();
544   fds[0].events = POLLIN;
545   fds[0].revents = 0;
546   int rc = poll(fds, 1, 0);
547   CHECK_EQ(rc, 0);
548
549   // Write data to the accepted socket
550   uint8_t acceptedWbuf[192];
551   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
552   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
553   acceptedSocket->flush();
554
555   // Loop
556   evb.loop();
557
558   // The loop should have completed the connection, written the queued data,
559   // and shutdown writes on the socket.
560   //
561   // Check that the connection was completed successfully and that the write
562   // callback succeeded.
563   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
564   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
565
566   // Check that we can read the data that was written to the socket, and that
567   // we see an EOF, since its socket was half-shutdown.
568   uint8_t readbuf[sizeof(wbuf)];
569   acceptedSocket->readAll(readbuf, sizeof(readbuf));
570   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
571   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
572   CHECK_EQ(bytesRead, 0);
573
574   // Close the accepted socket.  This will cause it to see EOF
575   // and uninstall the read callback when we loop next.
576   acceptedSocket->close();
577
578   // Install a read callback, then loop again.
579   ReadCallback rcb;
580   socket->setReadCB(&rcb);
581   evb.loop();
582
583   // This loop should have read the data and seen the EOF
584   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
585   CHECK_EQ(rcb.buffers.size(), 1);
586   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
587   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
588                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
589
590   ASSERT_FALSE(socket->isClosedBySelf());
591   ASSERT_FALSE(socket->isClosedByPeer());
592 }
593
594 /**
595  * Test reading, writing, and shutting down writes before the connect attempt
596  * finishes.
597  */
598 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
599   TestServer server;
600
601   // connect()
602   EventBase evb;
603   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
604   ConnCallback ccb;
605   socket->connect(&ccb, server.getAddress(), 30);
606
607   // Hopefully the connect didn't succeed immediately.
608   // If it did, we can't exercise the write-while-connecting code path.
609   if (ccb.state == STATE_SUCCEEDED) {
610     LOG(INFO) << "connect() succeeded immediately; skipping test";
611     return;
612   }
613
614   // Install a read callback
615   ReadCallback rcb;
616   socket->setReadCB(&rcb);
617
618   // Ask to write some data
619   char wbuf[128];
620   memset(wbuf, 'a', sizeof(wbuf));
621   WriteCallback wcb;
622   socket->write(&wcb, wbuf, sizeof(wbuf));
623
624   // Shutdown writes
625   socket->shutdownWrite();
626
627   // Even though we haven't looped yet, we should be able to accept
628   // the connection.
629   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
630
631   // Since the connection is still in progress, there should be no data to
632   // read yet.  Verify that the accepted socket is not readable.
633   struct pollfd fds[1];
634   fds[0].fd = acceptedSocket->getSocketFD();
635   fds[0].events = POLLIN;
636   fds[0].revents = 0;
637   int rc = poll(fds, 1, 0);
638   CHECK_EQ(rc, 0);
639
640   // Write data to the accepted socket
641   uint8_t acceptedWbuf[192];
642   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
643   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
644   acceptedSocket->flush();
645   // Shutdown writes to the accepted socket.  This will cause it to see EOF
646   // and uninstall the read callback.
647   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
648
649   // Loop
650   evb.loop();
651
652   // The loop should have completed the connection, written the queued data,
653   // shutdown writes on the socket, read the data we wrote to it, and see the
654   // EOF.
655   //
656   // Check that the connection was completed successfully and that the read
657   // and write callbacks were invoked as expected.
658   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
659   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
660   CHECK_EQ(rcb.buffers.size(), 1);
661   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
662   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
663                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
664   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
665
666   // Check that we can read the data that was written to the socket, and that
667   // we see an EOF, since its socket was half-shutdown.
668   uint8_t readbuf[sizeof(wbuf)];
669   acceptedSocket->readAll(readbuf, sizeof(readbuf));
670   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
671   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
672   CHECK_EQ(bytesRead, 0);
673
674   // Fully close both sockets
675   acceptedSocket->close();
676   socket->close();
677
678   ASSERT_FALSE(socket->isClosedBySelf());
679   ASSERT_TRUE(socket->isClosedByPeer());
680 }
681
682 /**
683  * Test reading, writing, and calling shutdownWriteNow() before the
684  * connect attempt finishes.
685  */
686 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
687   TestServer server;
688
689   // connect()
690   EventBase evb;
691   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
692   ConnCallback ccb;
693   socket->connect(&ccb, server.getAddress(), 30);
694
695   // Hopefully the connect didn't succeed immediately.
696   // If it did, we can't exercise the write-while-connecting code path.
697   if (ccb.state == STATE_SUCCEEDED) {
698     LOG(INFO) << "connect() succeeded immediately; skipping test";
699     return;
700   }
701
702   // Install a read callback
703   ReadCallback rcb;
704   socket->setReadCB(&rcb);
705
706   // Ask to write some data
707   char wbuf[128];
708   memset(wbuf, 'a', sizeof(wbuf));
709   WriteCallback wcb;
710   socket->write(&wcb, wbuf, sizeof(wbuf));
711
712   // Shutdown writes immediately.
713   // This should immediately discard the data that we just tried to write.
714   socket->shutdownWriteNow();
715
716   // Verify that writeError() was invoked on the write callback.
717   CHECK_EQ(wcb.state, STATE_FAILED);
718   CHECK_EQ(wcb.bytesWritten, 0);
719
720   // Even though we haven't looped yet, we should be able to accept
721   // the connection.
722   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
723
724   // Since the connection is still in progress, there should be no data to
725   // read yet.  Verify that the accepted socket is not readable.
726   struct pollfd fds[1];
727   fds[0].fd = acceptedSocket->getSocketFD();
728   fds[0].events = POLLIN;
729   fds[0].revents = 0;
730   int rc = poll(fds, 1, 0);
731   CHECK_EQ(rc, 0);
732
733   // Write data to the accepted socket
734   uint8_t acceptedWbuf[192];
735   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
736   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
737   acceptedSocket->flush();
738   // Shutdown writes to the accepted socket.  This will cause it to see EOF
739   // and uninstall the read callback.
740   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
741
742   // Loop
743   evb.loop();
744
745   // The loop should have completed the connection, written the queued data,
746   // shutdown writes on the socket, read the data we wrote to it, and see the
747   // EOF.
748   //
749   // Check that the connection was completed successfully and that the read
750   // callback was invoked as expected.
751   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
752   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
753   CHECK_EQ(rcb.buffers.size(), 1);
754   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
755   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
756                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
757
758   // Since we used shutdownWriteNow(), it should have discarded all pending
759   // write data.  Verify we see an immediate EOF when reading from the accepted
760   // socket.
761   uint8_t readbuf[sizeof(wbuf)];
762   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
763   CHECK_EQ(bytesRead, 0);
764
765   // Fully close both sockets
766   acceptedSocket->close();
767   socket->close();
768
769   ASSERT_FALSE(socket->isClosedBySelf());
770   ASSERT_TRUE(socket->isClosedByPeer());
771 }
772
773 // Helper function for use in testConnectOptWrite()
774 // Temporarily disable the read callback
775 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
776   // Uninstall the read callback
777   socket->setReadCB(nullptr);
778   // Schedule the read callback to be reinstalled after 1ms
779   socket->getEventBase()->runInLoop(
780       std::bind(&AsyncSocket::setReadCB, socket, rcb));
781 }
782
783 /**
784  * Test connect+write, then have the connect callback perform another write.
785  *
786  * This tests interaction of the optimistic writing after connect with
787  * additional write attempts that occur in the connect callback.
788  */
789 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
790   TestServer server;
791   EventBase evb;
792   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
793
794   // connect()
795   ConnCallback ccb;
796   socket->connect(&ccb, server.getAddress(), 30);
797
798   // Hopefully the connect didn't succeed immediately.
799   // If it did, we can't exercise the optimistic write code path.
800   if (ccb.state == STATE_SUCCEEDED) {
801     LOG(INFO) << "connect() succeeded immediately; aborting test "
802                        "of optimistic write behavior";
803     return;
804   }
805
806   // Tell the connect callback to perform a write when the connect succeeds
807   WriteCallback wcb2;
808   scoped_array<char> buf2(new char[size2]);
809   memset(buf2.get(), 'b', size2);
810   if (size2 > 0) {
811     ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
812     // Tell the second write callback to close the connection when it is done
813     wcb2.successCallback = [&] { socket->closeNow(); };
814   }
815
816   // Schedule one write() immediately, before the connect finishes
817   scoped_array<char> buf1(new char[size1]);
818   memset(buf1.get(), 'a', size1);
819   WriteCallback wcb1;
820   if (size1 > 0) {
821     socket->write(&wcb1, buf1.get(), size1);
822   }
823
824   if (close) {
825     // immediately perform a close, before connect() completes
826     socket->close();
827   }
828
829   // Start reading from the other endpoint after 10ms.
830   // If we're using large buffers, we have to read so that the writes don't
831   // block forever.
832   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
833   ReadCallback rcb;
834   rcb.dataAvailableCallback = std::bind(tmpDisableReads,
835                                         acceptedSocket.get(), &rcb);
836   socket->getEventBase()->tryRunAfterDelay(
837       std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
838       10);
839
840   // Loop.  We don't bother accepting on the server socket yet.
841   // The kernel should be able to buffer the write request so it can succeed.
842   evb.loop();
843
844   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
845   if (size1 > 0) {
846     CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
847   }
848   if (size2 > 0) {
849     CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
850   }
851
852   socket->close();
853
854   // Make sure the read callback received all of the data
855   size_t bytesRead = 0;
856   for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
857        it != rcb.buffers.end();
858        ++it) {
859     size_t start = bytesRead;
860     bytesRead += it->length;
861     size_t end = bytesRead;
862     if (start < size1) {
863       size_t cmpLen = min(size1, end) - start;
864       CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
865     }
866     if (end > size1 && end <= size1 + size2) {
867       size_t itOffset;
868       size_t buf2Offset;
869       size_t cmpLen;
870       if (start >= size1) {
871         itOffset = 0;
872         buf2Offset = start - size1;
873         cmpLen = end - start;
874       } else {
875         itOffset = size1 - start;
876         buf2Offset = 0;
877         cmpLen = end - size1;
878       }
879       CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
880                                cmpLen),
881                         0);
882     }
883   }
884   CHECK_EQ(bytesRead, size1 + size2);
885 }
886
887 TEST(AsyncSocketTest, ConnectCallbackWrite) {
888   // Test using small writes that should both succeed immediately
889   testConnectOptWrite(100, 200);
890
891   // Test using a large buffer in the connect callback, that should block
892   const size_t largeSize = 8*1024*1024;
893   testConnectOptWrite(100, largeSize);
894
895   // Test using a large initial write
896   testConnectOptWrite(largeSize, 100);
897
898   // Test using two large buffers
899   testConnectOptWrite(largeSize, largeSize);
900
901   // Test a small write in the connect callback,
902   // but no immediate write before connect completes
903   testConnectOptWrite(0, 64);
904
905   // Test a large write in the connect callback,
906   // but no immediate write before connect completes
907   testConnectOptWrite(0, largeSize);
908
909   // Test connect, a small write, then immediately call close() before connect
910   // completes
911   testConnectOptWrite(211, 0, true);
912
913   // Test connect, a large immediate write (that will block), then immediately
914   // call close() before connect completes
915   testConnectOptWrite(largeSize, 0, true);
916 }
917
918 ///////////////////////////////////////////////////////////////////////////
919 // write() related tests
920 ///////////////////////////////////////////////////////////////////////////
921
922 /**
923  * Test writing using a nullptr callback
924  */
925 TEST(AsyncSocketTest, WriteNullCallback) {
926   TestServer server;
927
928   // connect()
929   EventBase evb;
930   std::shared_ptr<AsyncSocket> socket =
931     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
932   evb.loop(); // loop until the socket is connected
933
934   // write() with a nullptr callback
935   char buf[128];
936   memset(buf, 'a', sizeof(buf));
937   socket->write(nullptr, buf, sizeof(buf));
938
939   evb.loop(); // loop until the data is sent
940
941   // Make sure the server got a connection and received the data
942   socket->close();
943   server.verifyConnection(buf, sizeof(buf));
944
945   ASSERT_TRUE(socket->isClosedBySelf());
946   ASSERT_FALSE(socket->isClosedByPeer());
947 }
948
949 /**
950  * Test writing with a send timeout
951  */
952 TEST(AsyncSocketTest, WriteTimeout) {
953   TestServer server;
954
955   // connect()
956   EventBase evb;
957   std::shared_ptr<AsyncSocket> socket =
958     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
959   evb.loop(); // loop until the socket is connected
960
961   // write() a large chunk of data, with no-one on the other end reading
962   size_t writeLength = 8*1024*1024;
963   uint32_t timeout = 200;
964   socket->setSendTimeout(timeout);
965   scoped_array<char> buf(new char[writeLength]);
966   memset(buf.get(), 'a', writeLength);
967   WriteCallback wcb;
968   socket->write(&wcb, buf.get(), writeLength);
969
970   TimePoint start;
971   evb.loop();
972   TimePoint end;
973
974   // Make sure the write attempt timed out as requested
975   CHECK_EQ(wcb.state, STATE_FAILED);
976   CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
977
978   // Check that the write timed out within a reasonable period of time.
979   // We don't check for exactly the specified timeout, since AsyncSocket only
980   // times out when it hasn't made progress for that period of time.
981   //
982   // On linux, the first write sends a few hundred kb of data, then blocks for
983   // writability, and then unblocks again after 40ms and is able to write
984   // another smaller of data before blocking permanently.  Therefore it doesn't
985   // time out until 40ms + timeout.
986   //
987   // I haven't fully verified the cause of this, but I believe it probably
988   // occurs because the receiving end delays sending an ack for up to 40ms.
989   // (This is the default value for TCP_DELACK_MIN.)  Once the sender receives
990   // the ack, it can send some more data.  However, after that point the
991   // receiver's kernel buffer is full.  This 40ms delay happens even with
992   // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints.  However, the
993   // kernel may be automatically disabling TCP_QUICKACK after receiving some
994   // data.
995   //
996   // For now, we simply check that the timeout occurred within 160ms of
997   // the requested value.
998   T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
999 }
1000
1001 /**
1002  * Test writing to a socket that the remote endpoint has closed
1003  */
1004 TEST(AsyncSocketTest, WritePipeError) {
1005   TestServer server;
1006
1007   // connect()
1008   EventBase evb;
1009   std::shared_ptr<AsyncSocket> socket =
1010     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1011   socket->setSendTimeout(1000);
1012   evb.loop(); // loop until the socket is connected
1013
1014   // accept and immediately close the socket
1015   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1016   acceptedSocket.reset();
1017
1018   // write() a large chunk of data
1019   size_t writeLength = 8*1024*1024;
1020   scoped_array<char> buf(new char[writeLength]);
1021   memset(buf.get(), 'a', writeLength);
1022   WriteCallback wcb;
1023   socket->write(&wcb, buf.get(), writeLength);
1024
1025   evb.loop();
1026
1027   // Make sure the write failed.
1028   // It would be nice if AsyncSocketException could convey the errno value,
1029   // so that we could check for EPIPE
1030   CHECK_EQ(wcb.state, STATE_FAILED);
1031   CHECK_EQ(wcb.exception.getType(),
1032                     AsyncSocketException::INTERNAL_ERROR);
1033
1034   ASSERT_FALSE(socket->isClosedBySelf());
1035   ASSERT_FALSE(socket->isClosedByPeer());
1036 }
1037
1038 /**
1039  * Test writing a mix of simple buffers and IOBufs
1040  */
1041 TEST(AsyncSocketTest, WriteIOBuf) {
1042   TestServer server;
1043
1044   // connect()
1045   EventBase evb;
1046   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1047   ConnCallback ccb;
1048   socket->connect(&ccb, server.getAddress(), 30);
1049
1050   // Accept the connection
1051   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1052   ReadCallback rcb;
1053   acceptedSocket->setReadCB(&rcb);
1054
1055   // Write a simple buffer to the socket
1056   size_t simpleBufLength = 5;
1057   char simpleBuf[simpleBufLength];
1058   memset(simpleBuf, 'a', simpleBufLength);
1059   WriteCallback wcb;
1060   socket->write(&wcb, simpleBuf, simpleBufLength);
1061
1062   // Write a single-element IOBuf chain
1063   size_t buf1Length = 7;
1064   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1065   memset(buf1->writableData(), 'b', buf1Length);
1066   buf1->append(buf1Length);
1067   unique_ptr<IOBuf> buf1Copy(buf1->clone());
1068   WriteCallback wcb2;
1069   socket->writeChain(&wcb2, std::move(buf1));
1070
1071   // Write a multiple-element IOBuf chain
1072   size_t buf2Length = 11;
1073   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1074   memset(buf2->writableData(), 'c', buf2Length);
1075   buf2->append(buf2Length);
1076   size_t buf3Length = 13;
1077   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1078   memset(buf3->writableData(), 'd', buf3Length);
1079   buf3->append(buf3Length);
1080   buf2->appendChain(std::move(buf3));
1081   unique_ptr<IOBuf> buf2Copy(buf2->clone());
1082   buf2Copy->coalesce();
1083   WriteCallback wcb3;
1084   socket->writeChain(&wcb3, std::move(buf2));
1085   socket->shutdownWrite();
1086
1087   // Let the reads and writes run to completion
1088   evb.loop();
1089
1090   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1091   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1092   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1093
1094   // Make sure the reader got the right data in the right order
1095   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1096   CHECK_EQ(rcb.buffers.size(), 1);
1097   CHECK_EQ(rcb.buffers[0].length,
1098       simpleBufLength + buf1Length + buf2Length + buf3Length);
1099   CHECK_EQ(
1100       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1101   CHECK_EQ(
1102       memcmp(rcb.buffers[0].buffer + simpleBufLength,
1103           buf1Copy->data(), buf1Copy->length()), 0);
1104   CHECK_EQ(
1105       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1106           buf2Copy->data(), buf2Copy->length()), 0);
1107
1108   acceptedSocket->close();
1109   socket->close();
1110
1111   ASSERT_TRUE(socket->isClosedBySelf());
1112   ASSERT_FALSE(socket->isClosedByPeer());
1113 }
1114
1115 TEST(AsyncSocketTest, WriteIOBufCorked) {
1116   TestServer server;
1117
1118   // connect()
1119   EventBase evb;
1120   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1121   ConnCallback ccb;
1122   socket->connect(&ccb, server.getAddress(), 30);
1123
1124   // Accept the connection
1125   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1126   ReadCallback rcb;
1127   acceptedSocket->setReadCB(&rcb);
1128
1129   // Do three writes, 100ms apart, with the "cork" flag set
1130   // on the second write.  The reader should see the first write
1131   // arrive by itself, followed by the second and third writes
1132   // arriving together.
1133   size_t buf1Length = 5;
1134   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1135   memset(buf1->writableData(), 'a', buf1Length);
1136   buf1->append(buf1Length);
1137   size_t buf2Length = 7;
1138   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1139   memset(buf2->writableData(), 'b', buf2Length);
1140   buf2->append(buf2Length);
1141   size_t buf3Length = 11;
1142   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1143   memset(buf3->writableData(), 'c', buf3Length);
1144   buf3->append(buf3Length);
1145   WriteCallback wcb1;
1146   socket->writeChain(&wcb1, std::move(buf1));
1147   WriteCallback wcb2;
1148   DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1149   write2.scheduleTimeout(100);
1150   WriteCallback wcb3;
1151   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1152   write3.scheduleTimeout(200);
1153
1154   evb.loop();
1155   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1156   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1157   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1158   if (wcb3.state != STATE_SUCCEEDED) {
1159     throw(wcb3.exception);
1160   }
1161   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1162
1163   // Make sure the reader got the data with the right grouping
1164   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1165   CHECK_EQ(rcb.buffers.size(), 2);
1166   CHECK_EQ(rcb.buffers[0].length, buf1Length);
1167   CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1168
1169   acceptedSocket->close();
1170   socket->close();
1171
1172   ASSERT_TRUE(socket->isClosedBySelf());
1173   ASSERT_FALSE(socket->isClosedByPeer());
1174 }
1175
1176 /**
1177  * Test performing a zero-length write
1178  */
1179 TEST(AsyncSocketTest, ZeroLengthWrite) {
1180   TestServer server;
1181
1182   // connect()
1183   EventBase evb;
1184   std::shared_ptr<AsyncSocket> socket =
1185     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1186   evb.loop(); // loop until the socket is connected
1187
1188   auto acceptedSocket = server.acceptAsync(&evb);
1189   ReadCallback rcb;
1190   acceptedSocket->setReadCB(&rcb);
1191
1192   size_t len1 = 1024*1024;
1193   size_t len2 = 1024*1024;
1194   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1195   memset(buf.get(), 'a', len1);
1196   memset(buf.get(), 'b', len2);
1197
1198   WriteCallback wcb1;
1199   WriteCallback wcb2;
1200   WriteCallback wcb3;
1201   WriteCallback wcb4;
1202   socket->write(&wcb1, buf.get(), 0);
1203   socket->write(&wcb2, buf.get(), len1);
1204   socket->write(&wcb3, buf.get() + len1, 0);
1205   socket->write(&wcb4, buf.get() + len1, len2);
1206   socket->close();
1207
1208   evb.loop(); // loop until the data is sent
1209
1210   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1211   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1212   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1213   CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1214   rcb.verifyData(buf.get(), len1 + len2);
1215
1216   ASSERT_TRUE(socket->isClosedBySelf());
1217   ASSERT_FALSE(socket->isClosedByPeer());
1218 }
1219
1220 TEST(AsyncSocketTest, ZeroLengthWritev) {
1221   TestServer server;
1222
1223   // connect()
1224   EventBase evb;
1225   std::shared_ptr<AsyncSocket> socket =
1226     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1227   evb.loop(); // loop until the socket is connected
1228
1229   auto acceptedSocket = server.acceptAsync(&evb);
1230   ReadCallback rcb;
1231   acceptedSocket->setReadCB(&rcb);
1232
1233   size_t len1 = 1024*1024;
1234   size_t len2 = 1024*1024;
1235   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1236   memset(buf.get(), 'a', len1);
1237   memset(buf.get(), 'b', len2);
1238
1239   WriteCallback wcb;
1240   size_t iovCount = 4;
1241   struct iovec iov[iovCount];
1242   iov[0].iov_base = buf.get();
1243   iov[0].iov_len = len1;
1244   iov[1].iov_base = buf.get() + len1;
1245   iov[1].iov_len = 0;
1246   iov[2].iov_base = buf.get() + len1;
1247   iov[2].iov_len = len2;
1248   iov[3].iov_base = buf.get() + len1 + len2;
1249   iov[3].iov_len = 0;
1250
1251   socket->writev(&wcb, iov, iovCount);
1252   socket->close();
1253   evb.loop(); // loop until the data is sent
1254
1255   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1256   rcb.verifyData(buf.get(), len1 + len2);
1257
1258   ASSERT_TRUE(socket->isClosedBySelf());
1259   ASSERT_FALSE(socket->isClosedByPeer());
1260 }
1261
1262 ///////////////////////////////////////////////////////////////////////////
1263 // close() related tests
1264 ///////////////////////////////////////////////////////////////////////////
1265
1266 /**
1267  * Test calling close() with pending writes when the socket is already closing.
1268  */
1269 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1270   TestServer server;
1271
1272   // connect()
1273   EventBase evb;
1274   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1275   ConnCallback ccb;
1276   socket->connect(&ccb, server.getAddress(), 30);
1277
1278   // accept the socket on the server side
1279   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1280
1281   // Loop to ensure the connect has completed
1282   evb.loop();
1283
1284   // Make sure we are connected
1285   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1286
1287   // Schedule pending writes, until several write attempts have blocked
1288   char buf[128];
1289   memset(buf, 'a', sizeof(buf));
1290   typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1291   WriteCallbackVector writeCallbacks;
1292
1293   writeCallbacks.reserve(5);
1294   while (writeCallbacks.size() < 5) {
1295     std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1296
1297     socket->write(wcb.get(), buf, sizeof(buf));
1298     if (wcb->state == STATE_SUCCEEDED) {
1299       // Succeeded immediately.  Keep performing more writes
1300       continue;
1301     }
1302
1303     // This write is blocked.
1304     // Have the write callback call close() when writeError() is invoked
1305     wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1306     writeCallbacks.push_back(wcb);
1307   }
1308
1309   // Call closeNow() to immediately fail the pending writes
1310   socket->closeNow();
1311
1312   // Make sure writeError() was invoked on all of the pending write callbacks
1313   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1314        it != writeCallbacks.end();
1315        ++it) {
1316     CHECK_EQ((*it)->state, STATE_FAILED);
1317   }
1318
1319   ASSERT_TRUE(socket->isClosedBySelf());
1320   ASSERT_FALSE(socket->isClosedByPeer());
1321 }
1322
1323 ///////////////////////////////////////////////////////////////////////////
1324 // ImmediateRead related tests
1325 ///////////////////////////////////////////////////////////////////////////
1326
1327 /* AsyncSocket use to verify immediate read works */
1328 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1329  public:
1330   bool immediateReadCalled = false;
1331   explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1332  protected:
1333   void checkForImmediateRead() noexcept override {
1334     immediateReadCalled = true;
1335     AsyncSocket::handleRead();
1336   }
1337 };
1338
1339 TEST(AsyncSocket, ConnectReadImmediateRead) {
1340   TestServer server;
1341
1342   const size_t maxBufferSz = 100;
1343   const size_t maxReadsPerEvent = 1;
1344   const size_t expectedDataSz = maxBufferSz * 3;
1345   char expectedData[expectedDataSz];
1346   memset(expectedData, 'j', expectedDataSz);
1347
1348   EventBase evb;
1349   ReadCallback rcb(maxBufferSz);
1350   AsyncSocketImmediateRead socket(&evb);
1351   socket.connect(nullptr, server.getAddress(), 30);
1352
1353   evb.loop(); // loop until the socket is connected
1354
1355   socket.setReadCB(&rcb);
1356   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1357   socket.immediateReadCalled = false;
1358
1359   auto acceptedSocket = server.acceptAsync(&evb);
1360
1361   ReadCallback rcbServer;
1362   WriteCallback wcbServer;
1363   rcbServer.dataAvailableCallback = [&]() {
1364     if (rcbServer.dataRead() == expectedDataSz) {
1365       // write back all data read
1366       rcbServer.verifyData(expectedData, expectedDataSz);
1367       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1368       acceptedSocket->close();
1369     }
1370   };
1371   acceptedSocket->setReadCB(&rcbServer);
1372
1373   // write data
1374   WriteCallback wcb1;
1375   socket.write(&wcb1, expectedData, expectedDataSz);
1376   evb.loop();
1377   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1378   rcb.verifyData(expectedData, expectedDataSz);
1379   CHECK_EQ(socket.immediateReadCalled, true);
1380
1381   ASSERT_FALSE(socket.isClosedBySelf());
1382   ASSERT_FALSE(socket.isClosedByPeer());
1383 }
1384
1385 TEST(AsyncSocket, ConnectReadUninstallRead) {
1386   TestServer server;
1387
1388   const size_t maxBufferSz = 100;
1389   const size_t maxReadsPerEvent = 1;
1390   const size_t expectedDataSz = maxBufferSz * 3;
1391   char expectedData[expectedDataSz];
1392   memset(expectedData, 'k', expectedDataSz);
1393
1394   EventBase evb;
1395   ReadCallback rcb(maxBufferSz);
1396   AsyncSocketImmediateRead socket(&evb);
1397   socket.connect(nullptr, server.getAddress(), 30);
1398
1399   evb.loop(); // loop until the socket is connected
1400
1401   socket.setReadCB(&rcb);
1402   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1403   socket.immediateReadCalled = false;
1404
1405   auto acceptedSocket = server.acceptAsync(&evb);
1406
1407   ReadCallback rcbServer;
1408   WriteCallback wcbServer;
1409   rcbServer.dataAvailableCallback = [&]() {
1410     if (rcbServer.dataRead() == expectedDataSz) {
1411       // write back all data read
1412       rcbServer.verifyData(expectedData, expectedDataSz);
1413       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1414       acceptedSocket->close();
1415     }
1416   };
1417   acceptedSocket->setReadCB(&rcbServer);
1418
1419   rcb.dataAvailableCallback = [&]() {
1420     // we read data and reset readCB
1421     socket.setReadCB(nullptr);
1422   };
1423
1424   // write data
1425   WriteCallback wcb;
1426   socket.write(&wcb, expectedData, expectedDataSz);
1427   evb.loop();
1428   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1429
1430   /* we shoud've only read maxBufferSz data since readCallback_
1431    * was reset in dataAvailableCallback */
1432   CHECK_EQ(rcb.dataRead(), maxBufferSz);
1433   CHECK_EQ(socket.immediateReadCalled, false);
1434
1435   ASSERT_FALSE(socket.isClosedBySelf());
1436   ASSERT_FALSE(socket.isClosedByPeer());
1437 }
1438
1439 // TODO:
1440 // - Test connect() and have the connect callback set the read callback
1441 // - Test connect() and have the connect callback unset the read callback
1442 // - Test reading/writing/closing/destroying the socket in the connect callback
1443 // - Test reading/writing/closing/destroying the socket in the read callback
1444 // - Test reading/writing/closing/destroying the socket in the write callback
1445 // - Test one-way shutdown behavior
1446 // - Test changing the EventBase
1447 //
1448 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1449 //   in connectSuccess(), readDataAvailable(), writeSuccess()
1450
1451
1452 ///////////////////////////////////////////////////////////////////////////
1453 // AsyncServerSocket tests
1454 ///////////////////////////////////////////////////////////////////////////
1455
1456 /**
1457  * Helper AcceptCallback class for the test code
1458  * It records the callbacks that were invoked, and also supports calling
1459  * generic std::function objects in each callback.
1460  */
1461 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1462  public:
1463   enum EventType {
1464     TYPE_START,
1465     TYPE_ACCEPT,
1466     TYPE_ERROR,
1467     TYPE_STOP
1468   };
1469   struct EventInfo {
1470     EventInfo(int fd, const folly::SocketAddress& addr)
1471       : type(TYPE_ACCEPT),
1472         fd(fd),
1473         address(addr),
1474         errorMsg() {}
1475     explicit EventInfo(const std::string& msg)
1476       : type(TYPE_ERROR),
1477         fd(-1),
1478         address(),
1479         errorMsg(msg) {}
1480     explicit EventInfo(EventType et)
1481       : type(et),
1482         fd(-1),
1483         address(),
1484         errorMsg() {}
1485
1486     EventType type;
1487     int fd;  // valid for TYPE_ACCEPT
1488     folly::SocketAddress address;  // valid for TYPE_ACCEPT
1489     string errorMsg;  // valid for TYPE_ERROR
1490   };
1491   typedef std::deque<EventInfo> EventList;
1492
1493   TestAcceptCallback()
1494     : connectionAcceptedFn_(),
1495       acceptErrorFn_(),
1496       acceptStoppedFn_(),
1497       events_() {}
1498
1499   std::deque<EventInfo>* getEvents() {
1500     return &events_;
1501   }
1502
1503   void setConnectionAcceptedFn(
1504       const std::function<void(int, const folly::SocketAddress&)>& fn) {
1505     connectionAcceptedFn_ = fn;
1506   }
1507   void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1508     acceptErrorFn_ = fn;
1509   }
1510   void setAcceptStartedFn(const std::function<void()>& fn) {
1511     acceptStartedFn_ = fn;
1512   }
1513   void setAcceptStoppedFn(const std::function<void()>& fn) {
1514     acceptStoppedFn_ = fn;
1515   }
1516
1517   void connectionAccepted(
1518       int fd, const folly::SocketAddress& clientAddr) noexcept override {
1519     events_.emplace_back(fd, clientAddr);
1520
1521     if (connectionAcceptedFn_) {
1522       connectionAcceptedFn_(fd, clientAddr);
1523     }
1524   }
1525   void acceptError(const std::exception& ex) noexcept override {
1526     events_.emplace_back(ex.what());
1527
1528     if (acceptErrorFn_) {
1529       acceptErrorFn_(ex);
1530     }
1531   }
1532   void acceptStarted() noexcept override {
1533     events_.emplace_back(TYPE_START);
1534
1535     if (acceptStartedFn_) {
1536       acceptStartedFn_();
1537     }
1538   }
1539   void acceptStopped() noexcept override {
1540     events_.emplace_back(TYPE_STOP);
1541
1542     if (acceptStoppedFn_) {
1543       acceptStoppedFn_();
1544     }
1545   }
1546
1547  private:
1548   std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1549   std::function<void(const std::exception&)> acceptErrorFn_;
1550   std::function<void()> acceptStartedFn_;
1551   std::function<void()> acceptStoppedFn_;
1552
1553   std::deque<EventInfo> events_;
1554 };
1555
1556 /**
1557  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1558  */
1559 TEST(AsyncSocketTest, ServerAcceptOptions) {
1560   EventBase eventBase;
1561
1562   // Create a server socket
1563   std::shared_ptr<AsyncServerSocket> serverSocket(
1564       AsyncServerSocket::newSocket(&eventBase));
1565   serverSocket->bind(0);
1566   serverSocket->listen(16);
1567   folly::SocketAddress serverAddress;
1568   serverSocket->getAddress(&serverAddress);
1569
1570   // Add a callback to accept one connection then stop the loop
1571   TestAcceptCallback acceptCallback;
1572   acceptCallback.setConnectionAcceptedFn(
1573     [&](int fd, const folly::SocketAddress& addr) {
1574       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1575     });
1576   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1577     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1578   });
1579   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1580   serverSocket->startAccepting();
1581
1582   // Connect to the server socket
1583   std::shared_ptr<AsyncSocket> socket(
1584       AsyncSocket::newSocket(&eventBase, serverAddress));
1585
1586   eventBase.loop();
1587
1588   // Verify that the server accepted a connection
1589   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1590   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1591                     TestAcceptCallback::TYPE_START);
1592   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1593                     TestAcceptCallback::TYPE_ACCEPT);
1594   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1595                     TestAcceptCallback::TYPE_STOP);
1596   int fd = acceptCallback.getEvents()->at(1).fd;
1597
1598   // The accepted connection should already be in non-blocking mode
1599   int flags = fcntl(fd, F_GETFL, 0);
1600   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1601
1602 #ifndef TCP_NOPUSH
1603   // The accepted connection should already have TCP_NODELAY set
1604   int value;
1605   socklen_t valueLength = sizeof(value);
1606   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1607   CHECK_EQ(rc, 0);
1608   CHECK_EQ(value, 1);
1609 #endif
1610 }
1611
1612 /**
1613  * Test AsyncServerSocket::removeAcceptCallback()
1614  */
1615 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1616   // Create a new AsyncServerSocket
1617   EventBase eventBase;
1618   std::shared_ptr<AsyncServerSocket> serverSocket(
1619       AsyncServerSocket::newSocket(&eventBase));
1620   serverSocket->bind(0);
1621   serverSocket->listen(16);
1622   folly::SocketAddress serverAddress;
1623   serverSocket->getAddress(&serverAddress);
1624
1625   // Add several accept callbacks
1626   TestAcceptCallback cb1;
1627   TestAcceptCallback cb2;
1628   TestAcceptCallback cb3;
1629   TestAcceptCallback cb4;
1630   TestAcceptCallback cb5;
1631   TestAcceptCallback cb6;
1632   TestAcceptCallback cb7;
1633
1634   // Test having callbacks remove other callbacks before them on the list,
1635   // after them on the list, or removing themselves.
1636   //
1637   // Have callback 2 remove callback 3 and callback 5 the first time it is
1638   // called.
1639   int cb2Count = 0;
1640   cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1641       std::shared_ptr<AsyncSocket> sock2(
1642         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1643       });
1644   cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1645     });
1646   cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1647       std::shared_ptr<AsyncSocket> sock3(
1648         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1649     });
1650   cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1651   std::shared_ptr<AsyncSocket> sock5(
1652       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1653
1654     });
1655   cb2.setConnectionAcceptedFn(
1656     [&](int fd, const folly::SocketAddress& addr) {
1657       if (cb2Count == 0) {
1658         serverSocket->removeAcceptCallback(&cb3, nullptr);
1659         serverSocket->removeAcceptCallback(&cb5, nullptr);
1660       }
1661       ++cb2Count;
1662     });
1663   // Have callback 6 remove callback 4 the first time it is called,
1664   // and destroy the server socket the second time it is called
1665   int cb6Count = 0;
1666   cb6.setConnectionAcceptedFn(
1667     [&](int fd, const folly::SocketAddress& addr) {
1668       if (cb6Count == 0) {
1669         serverSocket->removeAcceptCallback(&cb4, nullptr);
1670         std::shared_ptr<AsyncSocket> sock6(
1671           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1672         std::shared_ptr<AsyncSocket> sock7(
1673           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1674         std::shared_ptr<AsyncSocket> sock8(
1675           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1676
1677       } else {
1678         serverSocket.reset();
1679       }
1680       ++cb6Count;
1681     });
1682   // Have callback 7 remove itself
1683   cb7.setConnectionAcceptedFn(
1684     [&](int fd, const folly::SocketAddress& addr) {
1685       serverSocket->removeAcceptCallback(&cb7, nullptr);
1686     });
1687
1688   serverSocket->addAcceptCallback(&cb1, nullptr);
1689   serverSocket->addAcceptCallback(&cb2, nullptr);
1690   serverSocket->addAcceptCallback(&cb3, nullptr);
1691   serverSocket->addAcceptCallback(&cb4, nullptr);
1692   serverSocket->addAcceptCallback(&cb5, nullptr);
1693   serverSocket->addAcceptCallback(&cb6, nullptr);
1694   serverSocket->addAcceptCallback(&cb7, nullptr);
1695   serverSocket->startAccepting();
1696
1697   // Make several connections to the socket
1698   std::shared_ptr<AsyncSocket> sock1(
1699       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1700   std::shared_ptr<AsyncSocket> sock4(
1701       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1702
1703   // Loop until we are stopped
1704   eventBase.loop();
1705
1706   // Check to make sure that the expected callbacks were invoked.
1707   //
1708   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1709   // the AcceptCallbacks in round-robin fashion, in the order that they were
1710   // added.  The code is implemented this way right now, but the API doesn't
1711   // explicitly require it be done this way.  If we change the code not to be
1712   // exactly round robin in the future, we can simplify the test checks here.
1713   // (We'll also need to update the termination code, since we expect cb6 to
1714   // get called twice to terminate the loop.)
1715   CHECK_EQ(cb1.getEvents()->size(), 4);
1716   CHECK_EQ(cb1.getEvents()->at(0).type,
1717                     TestAcceptCallback::TYPE_START);
1718   CHECK_EQ(cb1.getEvents()->at(1).type,
1719                     TestAcceptCallback::TYPE_ACCEPT);
1720   CHECK_EQ(cb1.getEvents()->at(2).type,
1721                     TestAcceptCallback::TYPE_ACCEPT);
1722   CHECK_EQ(cb1.getEvents()->at(3).type,
1723                     TestAcceptCallback::TYPE_STOP);
1724
1725   CHECK_EQ(cb2.getEvents()->size(), 4);
1726   CHECK_EQ(cb2.getEvents()->at(0).type,
1727                     TestAcceptCallback::TYPE_START);
1728   CHECK_EQ(cb2.getEvents()->at(1).type,
1729                     TestAcceptCallback::TYPE_ACCEPT);
1730   CHECK_EQ(cb2.getEvents()->at(2).type,
1731                     TestAcceptCallback::TYPE_ACCEPT);
1732   CHECK_EQ(cb2.getEvents()->at(3).type,
1733                     TestAcceptCallback::TYPE_STOP);
1734
1735   CHECK_EQ(cb3.getEvents()->size(), 2);
1736   CHECK_EQ(cb3.getEvents()->at(0).type,
1737                     TestAcceptCallback::TYPE_START);
1738   CHECK_EQ(cb3.getEvents()->at(1).type,
1739                     TestAcceptCallback::TYPE_STOP);
1740
1741   CHECK_EQ(cb4.getEvents()->size(), 3);
1742   CHECK_EQ(cb4.getEvents()->at(0).type,
1743                     TestAcceptCallback::TYPE_START);
1744   CHECK_EQ(cb4.getEvents()->at(1).type,
1745                     TestAcceptCallback::TYPE_ACCEPT);
1746   CHECK_EQ(cb4.getEvents()->at(2).type,
1747                     TestAcceptCallback::TYPE_STOP);
1748
1749   CHECK_EQ(cb5.getEvents()->size(), 2);
1750   CHECK_EQ(cb5.getEvents()->at(0).type,
1751                     TestAcceptCallback::TYPE_START);
1752   CHECK_EQ(cb5.getEvents()->at(1).type,
1753                     TestAcceptCallback::TYPE_STOP);
1754
1755   CHECK_EQ(cb6.getEvents()->size(), 4);
1756   CHECK_EQ(cb6.getEvents()->at(0).type,
1757                     TestAcceptCallback::TYPE_START);
1758   CHECK_EQ(cb6.getEvents()->at(1).type,
1759                     TestAcceptCallback::TYPE_ACCEPT);
1760   CHECK_EQ(cb6.getEvents()->at(2).type,
1761                     TestAcceptCallback::TYPE_ACCEPT);
1762   CHECK_EQ(cb6.getEvents()->at(3).type,
1763                     TestAcceptCallback::TYPE_STOP);
1764
1765   CHECK_EQ(cb7.getEvents()->size(), 3);
1766   CHECK_EQ(cb7.getEvents()->at(0).type,
1767                     TestAcceptCallback::TYPE_START);
1768   CHECK_EQ(cb7.getEvents()->at(1).type,
1769                     TestAcceptCallback::TYPE_ACCEPT);
1770   CHECK_EQ(cb7.getEvents()->at(2).type,
1771                     TestAcceptCallback::TYPE_STOP);
1772 }
1773
1774 /**
1775  * Test AsyncServerSocket::removeAcceptCallback()
1776  */
1777 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1778   // Create a new AsyncServerSocket
1779   EventBase eventBase;
1780   std::shared_ptr<AsyncServerSocket> serverSocket(
1781       AsyncServerSocket::newSocket(&eventBase));
1782   serverSocket->bind(0);
1783   serverSocket->listen(16);
1784   folly::SocketAddress serverAddress;
1785   serverSocket->getAddress(&serverAddress);
1786
1787   // Add several accept callbacks
1788   TestAcceptCallback cb1;
1789   auto thread_id = pthread_self();
1790   cb1.setAcceptStartedFn([&](){
1791     CHECK_NE(thread_id, pthread_self());
1792     thread_id = pthread_self();
1793   });
1794   cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1795     CHECK_EQ(thread_id, pthread_self());
1796     serverSocket->removeAcceptCallback(&cb1, nullptr);
1797   });
1798   cb1.setAcceptStoppedFn([&](){
1799     CHECK_EQ(thread_id, pthread_self());
1800   });
1801
1802   // Test having callbacks remove other callbacks before them on the list,
1803   serverSocket->addAcceptCallback(&cb1, nullptr);
1804   serverSocket->startAccepting();
1805
1806   // Make several connections to the socket
1807   std::shared_ptr<AsyncSocket> sock1(
1808       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1809
1810   // Loop in another thread
1811   auto other = std::thread([&](){
1812     eventBase.loop();
1813   });
1814   other.join();
1815
1816   // Check to make sure that the expected callbacks were invoked.
1817   //
1818   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1819   // the AcceptCallbacks in round-robin fashion, in the order that they were
1820   // added.  The code is implemented this way right now, but the API doesn't
1821   // explicitly require it be done this way.  If we change the code not to be
1822   // exactly round robin in the future, we can simplify the test checks here.
1823   // (We'll also need to update the termination code, since we expect cb6 to
1824   // get called twice to terminate the loop.)
1825   CHECK_EQ(cb1.getEvents()->size(), 3);
1826   CHECK_EQ(cb1.getEvents()->at(0).type,
1827                     TestAcceptCallback::TYPE_START);
1828   CHECK_EQ(cb1.getEvents()->at(1).type,
1829                     TestAcceptCallback::TYPE_ACCEPT);
1830   CHECK_EQ(cb1.getEvents()->at(2).type,
1831                     TestAcceptCallback::TYPE_STOP);
1832
1833 }
1834
1835 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1836   // Add a callback to accept one connection then stop accepting
1837   TestAcceptCallback acceptCallback;
1838   acceptCallback.setConnectionAcceptedFn(
1839     [&](int fd, const folly::SocketAddress& addr) {
1840       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1841     });
1842   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1843     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1844   });
1845   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1846   serverSocket->startAccepting();
1847
1848   // Connect to the server socket
1849   EventBase* eventBase = serverSocket->getEventBase();
1850   folly::SocketAddress serverAddress;
1851   serverSocket->getAddress(&serverAddress);
1852   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1853
1854   // Loop to process all events
1855   eventBase->loop();
1856
1857   // Verify that the server accepted a connection
1858   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1859   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1860                     TestAcceptCallback::TYPE_START);
1861   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1862                     TestAcceptCallback::TYPE_ACCEPT);
1863   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1864                     TestAcceptCallback::TYPE_STOP);
1865 }
1866
1867 /* Verify that we don't leak sockets if we are destroyed()
1868  * and there are still writes pending
1869  *
1870  * If destroy() only calls close() instead of closeNow(),
1871  * it would shutdown(writes) on the socket, but it would
1872  * never be close()'d, and the socket would leak
1873  */
1874 TEST(AsyncSocketTest, DestroyCloseTest) {
1875   TestServer server;
1876
1877   // connect()
1878   EventBase clientEB;
1879   EventBase serverEB;
1880   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1881   ConnCallback ccb;
1882   socket->connect(&ccb, server.getAddress(), 30);
1883
1884   // Accept the connection
1885   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1886   ReadCallback rcb;
1887   acceptedSocket->setReadCB(&rcb);
1888
1889   // Write a large buffer to the socket that is larger than kernel buffer
1890   size_t simpleBufLength = 5000000;
1891   char* simpleBuf = new char[simpleBufLength];
1892   memset(simpleBuf, 'a', simpleBufLength);
1893   WriteCallback wcb;
1894
1895   // Let the reads and writes run to completion
1896   int fd = acceptedSocket->getFd();
1897
1898   acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1899   socket.reset();
1900   acceptedSocket.reset();
1901
1902   // Test that server socket was closed
1903   ssize_t sz = read(fd, simpleBuf, simpleBufLength);
1904   CHECK_EQ(sz, -1);
1905   CHECK_EQ(errno, 9);
1906   delete[] simpleBuf;
1907 }
1908
1909 /**
1910  * Test AsyncServerSocket::useExistingSocket()
1911  */
1912 TEST(AsyncSocketTest, ServerExistingSocket) {
1913   EventBase eventBase;
1914
1915   // Test creating a socket, and letting AsyncServerSocket bind and listen
1916   {
1917     // Manually create a socket
1918     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1919     ASSERT_GE(fd, 0);
1920
1921     // Create a server socket
1922     AsyncServerSocket::UniquePtr serverSocket(
1923         new AsyncServerSocket(&eventBase));
1924     serverSocket->useExistingSocket(fd);
1925     folly::SocketAddress address;
1926     serverSocket->getAddress(&address);
1927     address.setPort(0);
1928     serverSocket->bind(address);
1929     serverSocket->listen(16);
1930
1931     // Make sure the socket works
1932     serverSocketSanityTest(serverSocket.get());
1933   }
1934
1935   // Test creating a socket and binding manually,
1936   // then letting AsyncServerSocket listen
1937   {
1938     // Manually create a socket
1939     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1940     ASSERT_GE(fd, 0);
1941     // bind
1942     struct sockaddr_in addr;
1943     addr.sin_family = AF_INET;
1944     addr.sin_port = 0;
1945     addr.sin_addr.s_addr = INADDR_ANY;
1946     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1947                              sizeof(addr)), 0);
1948     // Look up the address that we bound to
1949     folly::SocketAddress boundAddress;
1950     boundAddress.setFromLocalAddress(fd);
1951
1952     // Create a server socket
1953     AsyncServerSocket::UniquePtr serverSocket(
1954         new AsyncServerSocket(&eventBase));
1955     serverSocket->useExistingSocket(fd);
1956     serverSocket->listen(16);
1957
1958     // Make sure AsyncServerSocket reports the same address that we bound to
1959     folly::SocketAddress serverSocketAddress;
1960     serverSocket->getAddress(&serverSocketAddress);
1961     CHECK_EQ(boundAddress, serverSocketAddress);
1962
1963     // Make sure the socket works
1964     serverSocketSanityTest(serverSocket.get());
1965   }
1966
1967   // Test creating a socket, binding and listening manually,
1968   // then giving it to AsyncServerSocket
1969   {
1970     // Manually create a socket
1971     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1972     ASSERT_GE(fd, 0);
1973     // bind
1974     struct sockaddr_in addr;
1975     addr.sin_family = AF_INET;
1976     addr.sin_port = 0;
1977     addr.sin_addr.s_addr = INADDR_ANY;
1978     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1979                              sizeof(addr)), 0);
1980     // Look up the address that we bound to
1981     folly::SocketAddress boundAddress;
1982     boundAddress.setFromLocalAddress(fd);
1983     // listen
1984     CHECK_EQ(listen(fd, 16), 0);
1985
1986     // Create a server socket
1987     AsyncServerSocket::UniquePtr serverSocket(
1988         new AsyncServerSocket(&eventBase));
1989     serverSocket->useExistingSocket(fd);
1990
1991     // Make sure AsyncServerSocket reports the same address that we bound to
1992     folly::SocketAddress serverSocketAddress;
1993     serverSocket->getAddress(&serverSocketAddress);
1994     CHECK_EQ(boundAddress, serverSocketAddress);
1995
1996     // Make sure the socket works
1997     serverSocketSanityTest(serverSocket.get());
1998   }
1999 }
2000
2001 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2002   EventBase eventBase;
2003
2004   // Create a server socket
2005   std::shared_ptr<AsyncServerSocket> serverSocket(
2006       AsyncServerSocket::newSocket(&eventBase));
2007   string path(1, 0);
2008   path.append("/anonymous");
2009   folly::SocketAddress serverAddress;
2010   serverAddress.setFromPath(path);
2011   serverSocket->bind(serverAddress);
2012   serverSocket->listen(16);
2013
2014   // Add a callback to accept one connection then stop the loop
2015   TestAcceptCallback acceptCallback;
2016   acceptCallback.setConnectionAcceptedFn(
2017     [&](int fd, const folly::SocketAddress& addr) {
2018       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2019     });
2020   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
2021     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2022   });
2023   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2024   serverSocket->startAccepting();
2025
2026   // Connect to the server socket
2027   std::shared_ptr<AsyncSocket> socket(
2028       AsyncSocket::newSocket(&eventBase, serverAddress));
2029
2030   eventBase.loop();
2031
2032   // Verify that the server accepted a connection
2033   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2034   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2035                     TestAcceptCallback::TYPE_START);
2036   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2037                     TestAcceptCallback::TYPE_ACCEPT);
2038   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2039                     TestAcceptCallback::TYPE_STOP);
2040   int fd = acceptCallback.getEvents()->at(1).fd;
2041
2042   // The accepted connection should already be in non-blocking mode
2043   int flags = fcntl(fd, F_GETFL, 0);
2044   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2045 }