2eae4a155259aee8a6e2d6adc94810f81d8c8e2e
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/SocketAddress.h>
21
22 #include <folly/io/IOBuf.h>
23 #include <folly/io/async/test/AsyncSocketTest.h>
24 #include <folly/io/async/test/Util.h>
25 #include <folly/test/SocketAddressTestHelper.h>
26
27 #include <gtest/gtest.h>
28 #include <boost/scoped_array.hpp>
29 #include <iostream>
30 #include <unistd.h>
31 #include <fcntl.h>
32 #include <poll.h>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/tcp.h>
36 #include <thread>
37
38 using namespace boost;
39
40 using std::string;
41 using std::vector;
42 using std::min;
43 using std::cerr;
44 using std::endl;
45 using std::unique_ptr;
46 using std::chrono::milliseconds;
47 using boost::scoped_array;
48
49 using namespace folly;
50
51 class DelayedWrite: public AsyncTimeout {
52  public:
53   DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
54       unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
55       bool cork, bool lastWrite = false):
56     AsyncTimeout(socket->getEventBase()),
57     socket_(socket),
58     bufs_(std::move(bufs)),
59     wcb_(wcb),
60     cork_(cork),
61     lastWrite_(lastWrite) {}
62
63  private:
64   void timeoutExpired() noexcept override {
65     WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
66     socket_->writeChain(wcb_, std::move(bufs_), flags);
67     if (lastWrite_) {
68       socket_->shutdownWrite();
69     }
70   }
71
72   std::shared_ptr<AsyncSocket> socket_;
73   unique_ptr<IOBuf> bufs_;
74   AsyncTransportWrapper::WriteCallback* wcb_;
75   bool cork_;
76   bool lastWrite_;
77 };
78
79 ///////////////////////////////////////////////////////////////////////////
80 // connect() tests
81 ///////////////////////////////////////////////////////////////////////////
82
83 /**
84  * Test connecting to a server
85  */
86 TEST(AsyncSocketTest, Connect) {
87   // Start listening on a local port
88   TestServer server;
89
90   // Connect using a AsyncSocket
91   EventBase evb;
92   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
93   ConnCallback cb;
94   socket->connect(&cb, server.getAddress(), 30);
95
96   evb.loop();
97
98   CHECK_EQ(cb.state, STATE_SUCCEEDED);
99 }
100
101 /**
102  * Test connecting to a server that isn't listening
103  */
104 TEST(AsyncSocketTest, ConnectRefused) {
105   EventBase evb;
106
107   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
108
109   // Hopefully nothing is actually listening on this address
110   folly::SocketAddress addr("127.0.0.1", 65535);
111   ConnCallback cb;
112   socket->connect(&cb, addr, 30);
113
114   evb.loop();
115
116   CHECK_EQ(cb.state, STATE_FAILED);
117   CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
118 }
119
120 /**
121  * Test connection timeout
122  */
123 TEST(AsyncSocketTest, ConnectTimeout) {
124   EventBase evb;
125
126   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
127
128   // Try connecting to server that won't respond.
129   //
130   // This depends somewhat on the network where this test is run.
131   // Hopefully this IP will be routable but unresponsive.
132   // (Alternatively, we could try listening on a local raw socket, but that
133   // normally requires root privileges.)
134   auto host =
135       SocketAddressTestHelper::isIPv6Enabled() ?
136       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
137       SocketAddressTestHelper::isIPv4Enabled() ?
138       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
139       nullptr;
140   SocketAddress addr(host, 65535);
141   ConnCallback cb;
142   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
143
144   evb.loop();
145
146   CHECK_EQ(cb.state, STATE_FAILED);
147   CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
148
149   // Verify that we can still get the peer address after a timeout.
150   // Use case is if the client was created from a client pool, and we want
151   // to log which peer failed.
152   folly::SocketAddress peer;
153   socket->getPeerAddress(&peer);
154   CHECK_EQ(peer, addr);
155 }
156
157 /**
158  * Test writing immediately after connecting, without waiting for connect
159  * to finish.
160  */
161 TEST(AsyncSocketTest, ConnectAndWrite) {
162   TestServer server;
163
164   // connect()
165   EventBase evb;
166   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
167   ConnCallback ccb;
168   socket->connect(&ccb, server.getAddress(), 30);
169
170   // write()
171   char buf[128];
172   memset(buf, 'a', sizeof(buf));
173   WriteCallback wcb;
174   socket->write(&wcb, buf, sizeof(buf));
175
176   // Loop.  We don't bother accepting on the server socket yet.
177   // The kernel should be able to buffer the write request so it can succeed.
178   evb.loop();
179
180   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
181   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
182
183   // Make sure the server got a connection and received the data
184   socket->close();
185   server.verifyConnection(buf, sizeof(buf));
186
187   ASSERT_TRUE(socket->isClosedBySelf());
188   ASSERT_FALSE(socket->isClosedByPeer());
189 }
190
191 /**
192  * Test connecting using a nullptr connect callback.
193  */
194 TEST(AsyncSocketTest, ConnectNullCallback) {
195   TestServer server;
196
197   // connect()
198   EventBase evb;
199   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
200   socket->connect(nullptr, server.getAddress(), 30);
201
202   // write some data, just so we have some way of verifing
203   // that the socket works correctly after connecting
204   char buf[128];
205   memset(buf, 'a', sizeof(buf));
206   WriteCallback wcb;
207   socket->write(&wcb, buf, sizeof(buf));
208
209   evb.loop();
210
211   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
212
213   // Make sure the server got a connection and received the data
214   socket->close();
215   server.verifyConnection(buf, sizeof(buf));
216
217   ASSERT_TRUE(socket->isClosedBySelf());
218   ASSERT_FALSE(socket->isClosedByPeer());
219 }
220
221 /**
222  * Test calling both write() and close() immediately after connecting, without
223  * waiting for connect to finish.
224  *
225  * This exercises the STATE_CONNECTING_CLOSING code.
226  */
227 TEST(AsyncSocketTest, ConnectWriteAndClose) {
228   TestServer server;
229
230   // connect()
231   EventBase evb;
232   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
233   ConnCallback ccb;
234   socket->connect(&ccb, server.getAddress(), 30);
235
236   // write()
237   char buf[128];
238   memset(buf, 'a', sizeof(buf));
239   WriteCallback wcb;
240   socket->write(&wcb, buf, sizeof(buf));
241
242   // close()
243   socket->close();
244
245   // Loop.  We don't bother accepting on the server socket yet.
246   // The kernel should be able to buffer the write request so it can succeed.
247   evb.loop();
248
249   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
250   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
251
252   // Make sure the server got a connection and received the data
253   server.verifyConnection(buf, sizeof(buf));
254
255   ASSERT_TRUE(socket->isClosedBySelf());
256   ASSERT_FALSE(socket->isClosedByPeer());
257 }
258
259 /**
260  * Test calling close() immediately after connect()
261  */
262 TEST(AsyncSocketTest, ConnectAndClose) {
263   TestServer server;
264
265   // Connect using a AsyncSocket
266   EventBase evb;
267   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
268   ConnCallback ccb;
269   socket->connect(&ccb, server.getAddress(), 30);
270
271   // Hopefully the connect didn't succeed immediately.
272   // If it did, we can't exercise the close-while-connecting code path.
273   if (ccb.state == STATE_SUCCEEDED) {
274     LOG(INFO) << "connect() succeeded immediately; aborting test "
275                        "of close-during-connect behavior";
276     return;
277   }
278
279   socket->close();
280
281   // Loop, although there shouldn't be anything to do.
282   evb.loop();
283
284   // Make sure the connection was aborted
285   CHECK_EQ(ccb.state, STATE_FAILED);
286
287   ASSERT_TRUE(socket->isClosedBySelf());
288   ASSERT_FALSE(socket->isClosedByPeer());
289 }
290
291 /**
292  * Test calling closeNow() immediately after connect()
293  *
294  * This should be identical to the normal close behavior.
295  */
296 TEST(AsyncSocketTest, ConnectAndCloseNow) {
297   TestServer server;
298
299   // Connect using a AsyncSocket
300   EventBase evb;
301   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
302   ConnCallback ccb;
303   socket->connect(&ccb, server.getAddress(), 30);
304
305   // Hopefully the connect didn't succeed immediately.
306   // If it did, we can't exercise the close-while-connecting code path.
307   if (ccb.state == STATE_SUCCEEDED) {
308     LOG(INFO) << "connect() succeeded immediately; aborting test "
309                        "of closeNow()-during-connect behavior";
310     return;
311   }
312
313   socket->closeNow();
314
315   // Loop, although there shouldn't be anything to do.
316   evb.loop();
317
318   // Make sure the connection was aborted
319   CHECK_EQ(ccb.state, STATE_FAILED);
320
321   ASSERT_TRUE(socket->isClosedBySelf());
322   ASSERT_FALSE(socket->isClosedByPeer());
323 }
324
325 /**
326  * Test calling both write() and closeNow() immediately after connecting,
327  * without waiting for connect to finish.
328  *
329  * This should abort the pending write.
330  */
331 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
332   TestServer server;
333
334   // connect()
335   EventBase evb;
336   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
337   ConnCallback ccb;
338   socket->connect(&ccb, server.getAddress(), 30);
339
340   // Hopefully the connect didn't succeed immediately.
341   // If it did, we can't exercise the close-while-connecting code path.
342   if (ccb.state == STATE_SUCCEEDED) {
343     LOG(INFO) << "connect() succeeded immediately; aborting test "
344                        "of write-during-connect behavior";
345     return;
346   }
347
348   // write()
349   char buf[128];
350   memset(buf, 'a', sizeof(buf));
351   WriteCallback wcb;
352   socket->write(&wcb, buf, sizeof(buf));
353
354   // close()
355   socket->closeNow();
356
357   // Loop, although there shouldn't be anything to do.
358   evb.loop();
359
360   CHECK_EQ(ccb.state, STATE_FAILED);
361   CHECK_EQ(wcb.state, STATE_FAILED);
362
363   ASSERT_TRUE(socket->isClosedBySelf());
364   ASSERT_FALSE(socket->isClosedByPeer());
365 }
366
367 /**
368  * Test installing a read callback immediately, before connect() finishes.
369  */
370 TEST(AsyncSocketTest, ConnectAndRead) {
371   TestServer server;
372
373   // connect()
374   EventBase evb;
375   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
376   ConnCallback ccb;
377   socket->connect(&ccb, server.getAddress(), 30);
378
379   ReadCallback rcb;
380   socket->setReadCB(&rcb);
381
382   // Even though we haven't looped yet, we should be able to accept
383   // the connection and send data to it.
384   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
385   uint8_t buf[128];
386   memset(buf, 'a', sizeof(buf));
387   acceptedSocket->write(buf, sizeof(buf));
388   acceptedSocket->flush();
389   acceptedSocket->close();
390
391   // Loop, although there shouldn't be anything to do.
392   evb.loop();
393
394   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
395   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
396   CHECK_EQ(rcb.buffers.size(), 1);
397   CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
398   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
399
400   ASSERT_FALSE(socket->isClosedBySelf());
401   ASSERT_FALSE(socket->isClosedByPeer());
402 }
403
404 /**
405  * Test installing a read callback and then closing immediately before the
406  * connect attempt finishes.
407  */
408 TEST(AsyncSocketTest, ConnectReadAndClose) {
409   TestServer server;
410
411   // connect()
412   EventBase evb;
413   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
414   ConnCallback ccb;
415   socket->connect(&ccb, server.getAddress(), 30);
416
417   // Hopefully the connect didn't succeed immediately.
418   // If it did, we can't exercise the close-while-connecting code path.
419   if (ccb.state == STATE_SUCCEEDED) {
420     LOG(INFO) << "connect() succeeded immediately; aborting test "
421                        "of read-during-connect behavior";
422     return;
423   }
424
425   ReadCallback rcb;
426   socket->setReadCB(&rcb);
427
428   // close()
429   socket->close();
430
431   // Loop, although there shouldn't be anything to do.
432   evb.loop();
433
434   CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
435   CHECK_EQ(rcb.buffers.size(), 0);
436   CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
437
438   ASSERT_TRUE(socket->isClosedBySelf());
439   ASSERT_FALSE(socket->isClosedByPeer());
440 }
441
442 /**
443  * Test both writing and installing a read callback immediately,
444  * before connect() finishes.
445  */
446 TEST(AsyncSocketTest, ConnectWriteAndRead) {
447   TestServer server;
448
449   // connect()
450   EventBase evb;
451   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
452   ConnCallback ccb;
453   socket->connect(&ccb, server.getAddress(), 30);
454
455   // write()
456   char buf1[128];
457   memset(buf1, 'a', sizeof(buf1));
458   WriteCallback wcb;
459   socket->write(&wcb, buf1, sizeof(buf1));
460
461   // set a read callback
462   ReadCallback rcb;
463   socket->setReadCB(&rcb);
464
465   // Even though we haven't looped yet, we should be able to accept
466   // the connection and send data to it.
467   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
468   uint8_t buf2[128];
469   memset(buf2, 'b', sizeof(buf2));
470   acceptedSocket->write(buf2, sizeof(buf2));
471   acceptedSocket->flush();
472
473   // shut down the write half of acceptedSocket, so that the AsyncSocket
474   // will stop reading and we can break out of the event loop.
475   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
476
477   // Loop
478   evb.loop();
479
480   // Make sure the connect succeeded
481   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
482
483   // Make sure the AsyncSocket read the data written by the accepted socket
484   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
485   CHECK_EQ(rcb.buffers.size(), 1);
486   CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
487   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
488
489   // Close the AsyncSocket so we'll see EOF on acceptedSocket
490   socket->close();
491
492   // Make sure the accepted socket saw the data written by the AsyncSocket
493   uint8_t readbuf[sizeof(buf1)];
494   acceptedSocket->readAll(readbuf, sizeof(readbuf));
495   CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
496   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
497   CHECK_EQ(bytesRead, 0);
498
499   ASSERT_FALSE(socket->isClosedBySelf());
500   ASSERT_TRUE(socket->isClosedByPeer());
501 }
502
503 /**
504  * Test writing to the socket then shutting down writes before the connect
505  * attempt finishes.
506  */
507 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
508   TestServer server;
509
510   // connect()
511   EventBase evb;
512   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
513   ConnCallback ccb;
514   socket->connect(&ccb, server.getAddress(), 30);
515
516   // Hopefully the connect didn't succeed immediately.
517   // If it did, we can't exercise the write-while-connecting code path.
518   if (ccb.state == STATE_SUCCEEDED) {
519     LOG(INFO) << "connect() succeeded immediately; skipping test";
520     return;
521   }
522
523   // Ask to write some data
524   char wbuf[128];
525   memset(wbuf, 'a', sizeof(wbuf));
526   WriteCallback wcb;
527   socket->write(&wcb, wbuf, sizeof(wbuf));
528   socket->shutdownWrite();
529
530   // Shutdown writes
531   socket->shutdownWrite();
532
533   // Even though we haven't looped yet, we should be able to accept
534   // the connection.
535   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
536
537   // Since the connection is still in progress, there should be no data to
538   // read yet.  Verify that the accepted socket is not readable.
539   struct pollfd fds[1];
540   fds[0].fd = acceptedSocket->getSocketFD();
541   fds[0].events = POLLIN;
542   fds[0].revents = 0;
543   int rc = poll(fds, 1, 0);
544   CHECK_EQ(rc, 0);
545
546   // Write data to the accepted socket
547   uint8_t acceptedWbuf[192];
548   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
549   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
550   acceptedSocket->flush();
551
552   // Loop
553   evb.loop();
554
555   // The loop should have completed the connection, written the queued data,
556   // and shutdown writes on the socket.
557   //
558   // Check that the connection was completed successfully and that the write
559   // callback succeeded.
560   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
561   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
562
563   // Check that we can read the data that was written to the socket, and that
564   // we see an EOF, since its socket was half-shutdown.
565   uint8_t readbuf[sizeof(wbuf)];
566   acceptedSocket->readAll(readbuf, sizeof(readbuf));
567   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
568   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
569   CHECK_EQ(bytesRead, 0);
570
571   // Close the accepted socket.  This will cause it to see EOF
572   // and uninstall the read callback when we loop next.
573   acceptedSocket->close();
574
575   // Install a read callback, then loop again.
576   ReadCallback rcb;
577   socket->setReadCB(&rcb);
578   evb.loop();
579
580   // This loop should have read the data and seen the EOF
581   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
582   CHECK_EQ(rcb.buffers.size(), 1);
583   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
584   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
585                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
586
587   ASSERT_FALSE(socket->isClosedBySelf());
588   ASSERT_FALSE(socket->isClosedByPeer());
589 }
590
591 /**
592  * Test reading, writing, and shutting down writes before the connect attempt
593  * finishes.
594  */
595 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
596   TestServer server;
597
598   // connect()
599   EventBase evb;
600   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
601   ConnCallback ccb;
602   socket->connect(&ccb, server.getAddress(), 30);
603
604   // Hopefully the connect didn't succeed immediately.
605   // If it did, we can't exercise the write-while-connecting code path.
606   if (ccb.state == STATE_SUCCEEDED) {
607     LOG(INFO) << "connect() succeeded immediately; skipping test";
608     return;
609   }
610
611   // Install a read callback
612   ReadCallback rcb;
613   socket->setReadCB(&rcb);
614
615   // Ask to write some data
616   char wbuf[128];
617   memset(wbuf, 'a', sizeof(wbuf));
618   WriteCallback wcb;
619   socket->write(&wcb, wbuf, sizeof(wbuf));
620
621   // Shutdown writes
622   socket->shutdownWrite();
623
624   // Even though we haven't looped yet, we should be able to accept
625   // the connection.
626   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
627
628   // Since the connection is still in progress, there should be no data to
629   // read yet.  Verify that the accepted socket is not readable.
630   struct pollfd fds[1];
631   fds[0].fd = acceptedSocket->getSocketFD();
632   fds[0].events = POLLIN;
633   fds[0].revents = 0;
634   int rc = poll(fds, 1, 0);
635   CHECK_EQ(rc, 0);
636
637   // Write data to the accepted socket
638   uint8_t acceptedWbuf[192];
639   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
640   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
641   acceptedSocket->flush();
642   // Shutdown writes to the accepted socket.  This will cause it to see EOF
643   // and uninstall the read callback.
644   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
645
646   // Loop
647   evb.loop();
648
649   // The loop should have completed the connection, written the queued data,
650   // shutdown writes on the socket, read the data we wrote to it, and see the
651   // EOF.
652   //
653   // Check that the connection was completed successfully and that the read
654   // and write callbacks were invoked as expected.
655   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
656   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
657   CHECK_EQ(rcb.buffers.size(), 1);
658   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
659   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
660                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
661   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
662
663   // Check that we can read the data that was written to the socket, and that
664   // we see an EOF, since its socket was half-shutdown.
665   uint8_t readbuf[sizeof(wbuf)];
666   acceptedSocket->readAll(readbuf, sizeof(readbuf));
667   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
668   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
669   CHECK_EQ(bytesRead, 0);
670
671   // Fully close both sockets
672   acceptedSocket->close();
673   socket->close();
674
675   ASSERT_FALSE(socket->isClosedBySelf());
676   ASSERT_TRUE(socket->isClosedByPeer());
677 }
678
679 /**
680  * Test reading, writing, and calling shutdownWriteNow() before the
681  * connect attempt finishes.
682  */
683 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
684   TestServer server;
685
686   // connect()
687   EventBase evb;
688   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
689   ConnCallback ccb;
690   socket->connect(&ccb, server.getAddress(), 30);
691
692   // Hopefully the connect didn't succeed immediately.
693   // If it did, we can't exercise the write-while-connecting code path.
694   if (ccb.state == STATE_SUCCEEDED) {
695     LOG(INFO) << "connect() succeeded immediately; skipping test";
696     return;
697   }
698
699   // Install a read callback
700   ReadCallback rcb;
701   socket->setReadCB(&rcb);
702
703   // Ask to write some data
704   char wbuf[128];
705   memset(wbuf, 'a', sizeof(wbuf));
706   WriteCallback wcb;
707   socket->write(&wcb, wbuf, sizeof(wbuf));
708
709   // Shutdown writes immediately.
710   // This should immediately discard the data that we just tried to write.
711   socket->shutdownWriteNow();
712
713   // Verify that writeError() was invoked on the write callback.
714   CHECK_EQ(wcb.state, STATE_FAILED);
715   CHECK_EQ(wcb.bytesWritten, 0);
716
717   // Even though we haven't looped yet, we should be able to accept
718   // the connection.
719   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
720
721   // Since the connection is still in progress, there should be no data to
722   // read yet.  Verify that the accepted socket is not readable.
723   struct pollfd fds[1];
724   fds[0].fd = acceptedSocket->getSocketFD();
725   fds[0].events = POLLIN;
726   fds[0].revents = 0;
727   int rc = poll(fds, 1, 0);
728   CHECK_EQ(rc, 0);
729
730   // Write data to the accepted socket
731   uint8_t acceptedWbuf[192];
732   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
733   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
734   acceptedSocket->flush();
735   // Shutdown writes to the accepted socket.  This will cause it to see EOF
736   // and uninstall the read callback.
737   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
738
739   // Loop
740   evb.loop();
741
742   // The loop should have completed the connection, written the queued data,
743   // shutdown writes on the socket, read the data we wrote to it, and see the
744   // EOF.
745   //
746   // Check that the connection was completed successfully and that the read
747   // callback was invoked as expected.
748   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
749   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
750   CHECK_EQ(rcb.buffers.size(), 1);
751   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
752   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
753                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
754
755   // Since we used shutdownWriteNow(), it should have discarded all pending
756   // write data.  Verify we see an immediate EOF when reading from the accepted
757   // socket.
758   uint8_t readbuf[sizeof(wbuf)];
759   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
760   CHECK_EQ(bytesRead, 0);
761
762   // Fully close both sockets
763   acceptedSocket->close();
764   socket->close();
765
766   ASSERT_FALSE(socket->isClosedBySelf());
767   ASSERT_TRUE(socket->isClosedByPeer());
768 }
769
770 // Helper function for use in testConnectOptWrite()
771 // Temporarily disable the read callback
772 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
773   // Uninstall the read callback
774   socket->setReadCB(nullptr);
775   // Schedule the read callback to be reinstalled after 1ms
776   socket->getEventBase()->runInLoop(
777       std::bind(&AsyncSocket::setReadCB, socket, rcb));
778 }
779
780 /**
781  * Test connect+write, then have the connect callback perform another write.
782  *
783  * This tests interaction of the optimistic writing after connect with
784  * additional write attempts that occur in the connect callback.
785  */
786 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
787   TestServer server;
788   EventBase evb;
789   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
790
791   // connect()
792   ConnCallback ccb;
793   socket->connect(&ccb, server.getAddress(), 30);
794
795   // Hopefully the connect didn't succeed immediately.
796   // If it did, we can't exercise the optimistic write code path.
797   if (ccb.state == STATE_SUCCEEDED) {
798     LOG(INFO) << "connect() succeeded immediately; aborting test "
799                        "of optimistic write behavior";
800     return;
801   }
802
803   // Tell the connect callback to perform a write when the connect succeeds
804   WriteCallback wcb2;
805   scoped_array<char> buf2(new char[size2]);
806   memset(buf2.get(), 'b', size2);
807   if (size2 > 0) {
808     ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
809     // Tell the second write callback to close the connection when it is done
810     wcb2.successCallback = [&] { socket->closeNow(); };
811   }
812
813   // Schedule one write() immediately, before the connect finishes
814   scoped_array<char> buf1(new char[size1]);
815   memset(buf1.get(), 'a', size1);
816   WriteCallback wcb1;
817   if (size1 > 0) {
818     socket->write(&wcb1, buf1.get(), size1);
819   }
820
821   if (close) {
822     // immediately perform a close, before connect() completes
823     socket->close();
824   }
825
826   // Start reading from the other endpoint after 10ms.
827   // If we're using large buffers, we have to read so that the writes don't
828   // block forever.
829   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
830   ReadCallback rcb;
831   rcb.dataAvailableCallback = std::bind(tmpDisableReads,
832                                         acceptedSocket.get(), &rcb);
833   socket->getEventBase()->tryRunAfterDelay(
834       std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
835       10);
836
837   // Loop.  We don't bother accepting on the server socket yet.
838   // The kernel should be able to buffer the write request so it can succeed.
839   evb.loop();
840
841   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
842   if (size1 > 0) {
843     CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
844   }
845   if (size2 > 0) {
846     CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
847   }
848
849   socket->close();
850
851   // Make sure the read callback received all of the data
852   size_t bytesRead = 0;
853   for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
854        it != rcb.buffers.end();
855        ++it) {
856     size_t start = bytesRead;
857     bytesRead += it->length;
858     size_t end = bytesRead;
859     if (start < size1) {
860       size_t cmpLen = min(size1, end) - start;
861       CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
862     }
863     if (end > size1 && end <= size1 + size2) {
864       size_t itOffset;
865       size_t buf2Offset;
866       size_t cmpLen;
867       if (start >= size1) {
868         itOffset = 0;
869         buf2Offset = start - size1;
870         cmpLen = end - start;
871       } else {
872         itOffset = size1 - start;
873         buf2Offset = 0;
874         cmpLen = end - size1;
875       }
876       CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
877                                cmpLen),
878                         0);
879     }
880   }
881   CHECK_EQ(bytesRead, size1 + size2);
882 }
883
884 TEST(AsyncSocketTest, ConnectCallbackWrite) {
885   // Test using small writes that should both succeed immediately
886   testConnectOptWrite(100, 200);
887
888   // Test using a large buffer in the connect callback, that should block
889   const size_t largeSize = 8*1024*1024;
890   testConnectOptWrite(100, largeSize);
891
892   // Test using a large initial write
893   testConnectOptWrite(largeSize, 100);
894
895   // Test using two large buffers
896   testConnectOptWrite(largeSize, largeSize);
897
898   // Test a small write in the connect callback,
899   // but no immediate write before connect completes
900   testConnectOptWrite(0, 64);
901
902   // Test a large write in the connect callback,
903   // but no immediate write before connect completes
904   testConnectOptWrite(0, largeSize);
905
906   // Test connect, a small write, then immediately call close() before connect
907   // completes
908   testConnectOptWrite(211, 0, true);
909
910   // Test connect, a large immediate write (that will block), then immediately
911   // call close() before connect completes
912   testConnectOptWrite(largeSize, 0, true);
913 }
914
915 ///////////////////////////////////////////////////////////////////////////
916 // write() related tests
917 ///////////////////////////////////////////////////////////////////////////
918
919 /**
920  * Test writing using a nullptr callback
921  */
922 TEST(AsyncSocketTest, WriteNullCallback) {
923   TestServer server;
924
925   // connect()
926   EventBase evb;
927   std::shared_ptr<AsyncSocket> socket =
928     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
929   evb.loop(); // loop until the socket is connected
930
931   // write() with a nullptr callback
932   char buf[128];
933   memset(buf, 'a', sizeof(buf));
934   socket->write(nullptr, buf, sizeof(buf));
935
936   evb.loop(); // loop until the data is sent
937
938   // Make sure the server got a connection and received the data
939   socket->close();
940   server.verifyConnection(buf, sizeof(buf));
941
942   ASSERT_TRUE(socket->isClosedBySelf());
943   ASSERT_FALSE(socket->isClosedByPeer());
944 }
945
946 /**
947  * Test writing with a send timeout
948  */
949 TEST(AsyncSocketTest, WriteTimeout) {
950   TestServer server;
951
952   // connect()
953   EventBase evb;
954   std::shared_ptr<AsyncSocket> socket =
955     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
956   evb.loop(); // loop until the socket is connected
957
958   // write() a large chunk of data, with no-one on the other end reading
959   size_t writeLength = 8*1024*1024;
960   uint32_t timeout = 200;
961   socket->setSendTimeout(timeout);
962   scoped_array<char> buf(new char[writeLength]);
963   memset(buf.get(), 'a', writeLength);
964   WriteCallback wcb;
965   socket->write(&wcb, buf.get(), writeLength);
966
967   TimePoint start;
968   evb.loop();
969   TimePoint end;
970
971   // Make sure the write attempt timed out as requested
972   CHECK_EQ(wcb.state, STATE_FAILED);
973   CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
974
975   // Check that the write timed out within a reasonable period of time.
976   // We don't check for exactly the specified timeout, since AsyncSocket only
977   // times out when it hasn't made progress for that period of time.
978   //
979   // On linux, the first write sends a few hundred kb of data, then blocks for
980   // writability, and then unblocks again after 40ms and is able to write
981   // another smaller of data before blocking permanently.  Therefore it doesn't
982   // time out until 40ms + timeout.
983   //
984   // I haven't fully verified the cause of this, but I believe it probably
985   // occurs because the receiving end delays sending an ack for up to 40ms.
986   // (This is the default value for TCP_DELACK_MIN.)  Once the sender receives
987   // the ack, it can send some more data.  However, after that point the
988   // receiver's kernel buffer is full.  This 40ms delay happens even with
989   // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints.  However, the
990   // kernel may be automatically disabling TCP_QUICKACK after receiving some
991   // data.
992   //
993   // For now, we simply check that the timeout occurred within 160ms of
994   // the requested value.
995   T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
996 }
997
998 /**
999  * Test writing to a socket that the remote endpoint has closed
1000  */
1001 TEST(AsyncSocketTest, WritePipeError) {
1002   TestServer server;
1003
1004   // connect()
1005   EventBase evb;
1006   std::shared_ptr<AsyncSocket> socket =
1007     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1008   socket->setSendTimeout(1000);
1009   evb.loop(); // loop until the socket is connected
1010
1011   // accept and immediately close the socket
1012   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1013   acceptedSocket.reset();
1014
1015   // write() a large chunk of data
1016   size_t writeLength = 8*1024*1024;
1017   scoped_array<char> buf(new char[writeLength]);
1018   memset(buf.get(), 'a', writeLength);
1019   WriteCallback wcb;
1020   socket->write(&wcb, buf.get(), writeLength);
1021
1022   evb.loop();
1023
1024   // Make sure the write failed.
1025   // It would be nice if AsyncSocketException could convey the errno value,
1026   // so that we could check for EPIPE
1027   CHECK_EQ(wcb.state, STATE_FAILED);
1028   CHECK_EQ(wcb.exception.getType(),
1029                     AsyncSocketException::INTERNAL_ERROR);
1030
1031   ASSERT_FALSE(socket->isClosedBySelf());
1032   ASSERT_FALSE(socket->isClosedByPeer());
1033 }
1034
1035 /**
1036  * Test writing a mix of simple buffers and IOBufs
1037  */
1038 TEST(AsyncSocketTest, WriteIOBuf) {
1039   TestServer server;
1040
1041   // connect()
1042   EventBase evb;
1043   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1044   ConnCallback ccb;
1045   socket->connect(&ccb, server.getAddress(), 30);
1046
1047   // Accept the connection
1048   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1049   ReadCallback rcb;
1050   acceptedSocket->setReadCB(&rcb);
1051
1052   // Write a simple buffer to the socket
1053   size_t simpleBufLength = 5;
1054   char simpleBuf[simpleBufLength];
1055   memset(simpleBuf, 'a', simpleBufLength);
1056   WriteCallback wcb;
1057   socket->write(&wcb, simpleBuf, simpleBufLength);
1058
1059   // Write a single-element IOBuf chain
1060   size_t buf1Length = 7;
1061   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1062   memset(buf1->writableData(), 'b', buf1Length);
1063   buf1->append(buf1Length);
1064   unique_ptr<IOBuf> buf1Copy(buf1->clone());
1065   WriteCallback wcb2;
1066   socket->writeChain(&wcb2, std::move(buf1));
1067
1068   // Write a multiple-element IOBuf chain
1069   size_t buf2Length = 11;
1070   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1071   memset(buf2->writableData(), 'c', buf2Length);
1072   buf2->append(buf2Length);
1073   size_t buf3Length = 13;
1074   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1075   memset(buf3->writableData(), 'd', buf3Length);
1076   buf3->append(buf3Length);
1077   buf2->appendChain(std::move(buf3));
1078   unique_ptr<IOBuf> buf2Copy(buf2->clone());
1079   buf2Copy->coalesce();
1080   WriteCallback wcb3;
1081   socket->writeChain(&wcb3, std::move(buf2));
1082   socket->shutdownWrite();
1083
1084   // Let the reads and writes run to completion
1085   evb.loop();
1086
1087   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1088   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1089   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1090
1091   // Make sure the reader got the right data in the right order
1092   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1093   CHECK_EQ(rcb.buffers.size(), 1);
1094   CHECK_EQ(rcb.buffers[0].length,
1095       simpleBufLength + buf1Length + buf2Length + buf3Length);
1096   CHECK_EQ(
1097       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1098   CHECK_EQ(
1099       memcmp(rcb.buffers[0].buffer + simpleBufLength,
1100           buf1Copy->data(), buf1Copy->length()), 0);
1101   CHECK_EQ(
1102       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1103           buf2Copy->data(), buf2Copy->length()), 0);
1104
1105   acceptedSocket->close();
1106   socket->close();
1107
1108   ASSERT_TRUE(socket->isClosedBySelf());
1109   ASSERT_FALSE(socket->isClosedByPeer());
1110 }
1111
1112 TEST(AsyncSocketTest, WriteIOBufCorked) {
1113   TestServer server;
1114
1115   // connect()
1116   EventBase evb;
1117   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1118   ConnCallback ccb;
1119   socket->connect(&ccb, server.getAddress(), 30);
1120
1121   // Accept the connection
1122   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1123   ReadCallback rcb;
1124   acceptedSocket->setReadCB(&rcb);
1125
1126   // Do three writes, 100ms apart, with the "cork" flag set
1127   // on the second write.  The reader should see the first write
1128   // arrive by itself, followed by the second and third writes
1129   // arriving together.
1130   size_t buf1Length = 5;
1131   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1132   memset(buf1->writableData(), 'a', buf1Length);
1133   buf1->append(buf1Length);
1134   size_t buf2Length = 7;
1135   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1136   memset(buf2->writableData(), 'b', buf2Length);
1137   buf2->append(buf2Length);
1138   size_t buf3Length = 11;
1139   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1140   memset(buf3->writableData(), 'c', buf3Length);
1141   buf3->append(buf3Length);
1142   WriteCallback wcb1;
1143   socket->writeChain(&wcb1, std::move(buf1));
1144   WriteCallback wcb2;
1145   DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1146   write2.scheduleTimeout(100);
1147   WriteCallback wcb3;
1148   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1149   write3.scheduleTimeout(200);
1150
1151   evb.loop();
1152   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1153   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1154   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1155   if (wcb3.state != STATE_SUCCEEDED) {
1156     throw(wcb3.exception);
1157   }
1158   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1159
1160   // Make sure the reader got the data with the right grouping
1161   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1162   CHECK_EQ(rcb.buffers.size(), 2);
1163   CHECK_EQ(rcb.buffers[0].length, buf1Length);
1164   CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1165
1166   acceptedSocket->close();
1167   socket->close();
1168
1169   ASSERT_TRUE(socket->isClosedBySelf());
1170   ASSERT_FALSE(socket->isClosedByPeer());
1171 }
1172
1173 /**
1174  * Test performing a zero-length write
1175  */
1176 TEST(AsyncSocketTest, ZeroLengthWrite) {
1177   TestServer server;
1178
1179   // connect()
1180   EventBase evb;
1181   std::shared_ptr<AsyncSocket> socket =
1182     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1183   evb.loop(); // loop until the socket is connected
1184
1185   auto acceptedSocket = server.acceptAsync(&evb);
1186   ReadCallback rcb;
1187   acceptedSocket->setReadCB(&rcb);
1188
1189   size_t len1 = 1024*1024;
1190   size_t len2 = 1024*1024;
1191   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1192   memset(buf.get(), 'a', len1);
1193   memset(buf.get(), 'b', len2);
1194
1195   WriteCallback wcb1;
1196   WriteCallback wcb2;
1197   WriteCallback wcb3;
1198   WriteCallback wcb4;
1199   socket->write(&wcb1, buf.get(), 0);
1200   socket->write(&wcb2, buf.get(), len1);
1201   socket->write(&wcb3, buf.get() + len1, 0);
1202   socket->write(&wcb4, buf.get() + len1, len2);
1203   socket->close();
1204
1205   evb.loop(); // loop until the data is sent
1206
1207   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1208   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1209   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1210   CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1211   rcb.verifyData(buf.get(), len1 + len2);
1212
1213   ASSERT_TRUE(socket->isClosedBySelf());
1214   ASSERT_FALSE(socket->isClosedByPeer());
1215 }
1216
1217 TEST(AsyncSocketTest, ZeroLengthWritev) {
1218   TestServer server;
1219
1220   // connect()
1221   EventBase evb;
1222   std::shared_ptr<AsyncSocket> socket =
1223     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1224   evb.loop(); // loop until the socket is connected
1225
1226   auto acceptedSocket = server.acceptAsync(&evb);
1227   ReadCallback rcb;
1228   acceptedSocket->setReadCB(&rcb);
1229
1230   size_t len1 = 1024*1024;
1231   size_t len2 = 1024*1024;
1232   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1233   memset(buf.get(), 'a', len1);
1234   memset(buf.get(), 'b', len2);
1235
1236   WriteCallback wcb;
1237   size_t iovCount = 4;
1238   struct iovec iov[iovCount];
1239   iov[0].iov_base = buf.get();
1240   iov[0].iov_len = len1;
1241   iov[1].iov_base = buf.get() + len1;
1242   iov[1].iov_len = 0;
1243   iov[2].iov_base = buf.get() + len1;
1244   iov[2].iov_len = len2;
1245   iov[3].iov_base = buf.get() + len1 + len2;
1246   iov[3].iov_len = 0;
1247
1248   socket->writev(&wcb, iov, iovCount);
1249   socket->close();
1250   evb.loop(); // loop until the data is sent
1251
1252   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1253   rcb.verifyData(buf.get(), len1 + len2);
1254
1255   ASSERT_TRUE(socket->isClosedBySelf());
1256   ASSERT_FALSE(socket->isClosedByPeer());
1257 }
1258
1259 ///////////////////////////////////////////////////////////////////////////
1260 // close() related tests
1261 ///////////////////////////////////////////////////////////////////////////
1262
1263 /**
1264  * Test calling close() with pending writes when the socket is already closing.
1265  */
1266 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1267   TestServer server;
1268
1269   // connect()
1270   EventBase evb;
1271   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1272   ConnCallback ccb;
1273   socket->connect(&ccb, server.getAddress(), 30);
1274
1275   // accept the socket on the server side
1276   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1277
1278   // Loop to ensure the connect has completed
1279   evb.loop();
1280
1281   // Make sure we are connected
1282   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1283
1284   // Schedule pending writes, until several write attempts have blocked
1285   char buf[128];
1286   memset(buf, 'a', sizeof(buf));
1287   typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1288   WriteCallbackVector writeCallbacks;
1289
1290   writeCallbacks.reserve(5);
1291   while (writeCallbacks.size() < 5) {
1292     std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1293
1294     socket->write(wcb.get(), buf, sizeof(buf));
1295     if (wcb->state == STATE_SUCCEEDED) {
1296       // Succeeded immediately.  Keep performing more writes
1297       continue;
1298     }
1299
1300     // This write is blocked.
1301     // Have the write callback call close() when writeError() is invoked
1302     wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1303     writeCallbacks.push_back(wcb);
1304   }
1305
1306   // Call closeNow() to immediately fail the pending writes
1307   socket->closeNow();
1308
1309   // Make sure writeError() was invoked on all of the pending write callbacks
1310   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1311        it != writeCallbacks.end();
1312        ++it) {
1313     CHECK_EQ((*it)->state, STATE_FAILED);
1314   }
1315
1316   ASSERT_TRUE(socket->isClosedBySelf());
1317   ASSERT_FALSE(socket->isClosedByPeer());
1318 }
1319
1320 ///////////////////////////////////////////////////////////////////////////
1321 // ImmediateRead related tests
1322 ///////////////////////////////////////////////////////////////////////////
1323
1324 /* AsyncSocket use to verify immediate read works */
1325 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1326  public:
1327   bool immediateReadCalled = false;
1328   explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1329  protected:
1330   void checkForImmediateRead() noexcept override {
1331     immediateReadCalled = true;
1332     AsyncSocket::handleRead();
1333   }
1334 };
1335
1336 TEST(AsyncSocket, ConnectReadImmediateRead) {
1337   TestServer server;
1338
1339   const size_t maxBufferSz = 100;
1340   const size_t maxReadsPerEvent = 1;
1341   const size_t expectedDataSz = maxBufferSz * 3;
1342   char expectedData[expectedDataSz];
1343   memset(expectedData, 'j', expectedDataSz);
1344
1345   EventBase evb;
1346   ReadCallback rcb(maxBufferSz);
1347   AsyncSocketImmediateRead socket(&evb);
1348   socket.connect(nullptr, server.getAddress(), 30);
1349
1350   evb.loop(); // loop until the socket is connected
1351
1352   socket.setReadCB(&rcb);
1353   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1354   socket.immediateReadCalled = false;
1355
1356   auto acceptedSocket = server.acceptAsync(&evb);
1357
1358   ReadCallback rcbServer;
1359   WriteCallback wcbServer;
1360   rcbServer.dataAvailableCallback = [&]() {
1361     if (rcbServer.dataRead() == expectedDataSz) {
1362       // write back all data read
1363       rcbServer.verifyData(expectedData, expectedDataSz);
1364       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1365       acceptedSocket->close();
1366     }
1367   };
1368   acceptedSocket->setReadCB(&rcbServer);
1369
1370   // write data
1371   WriteCallback wcb1;
1372   socket.write(&wcb1, expectedData, expectedDataSz);
1373   evb.loop();
1374   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1375   rcb.verifyData(expectedData, expectedDataSz);
1376   CHECK_EQ(socket.immediateReadCalled, true);
1377
1378   ASSERT_FALSE(socket.isClosedBySelf());
1379   ASSERT_FALSE(socket.isClosedByPeer());
1380 }
1381
1382 TEST(AsyncSocket, ConnectReadUninstallRead) {
1383   TestServer server;
1384
1385   const size_t maxBufferSz = 100;
1386   const size_t maxReadsPerEvent = 1;
1387   const size_t expectedDataSz = maxBufferSz * 3;
1388   char expectedData[expectedDataSz];
1389   memset(expectedData, 'k', expectedDataSz);
1390
1391   EventBase evb;
1392   ReadCallback rcb(maxBufferSz);
1393   AsyncSocketImmediateRead socket(&evb);
1394   socket.connect(nullptr, server.getAddress(), 30);
1395
1396   evb.loop(); // loop until the socket is connected
1397
1398   socket.setReadCB(&rcb);
1399   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1400   socket.immediateReadCalled = false;
1401
1402   auto acceptedSocket = server.acceptAsync(&evb);
1403
1404   ReadCallback rcbServer;
1405   WriteCallback wcbServer;
1406   rcbServer.dataAvailableCallback = [&]() {
1407     if (rcbServer.dataRead() == expectedDataSz) {
1408       // write back all data read
1409       rcbServer.verifyData(expectedData, expectedDataSz);
1410       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1411       acceptedSocket->close();
1412     }
1413   };
1414   acceptedSocket->setReadCB(&rcbServer);
1415
1416   rcb.dataAvailableCallback = [&]() {
1417     // we read data and reset readCB
1418     socket.setReadCB(nullptr);
1419   };
1420
1421   // write data
1422   WriteCallback wcb;
1423   socket.write(&wcb, expectedData, expectedDataSz);
1424   evb.loop();
1425   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1426
1427   /* we shoud've only read maxBufferSz data since readCallback_
1428    * was reset in dataAvailableCallback */
1429   CHECK_EQ(rcb.dataRead(), maxBufferSz);
1430   CHECK_EQ(socket.immediateReadCalled, false);
1431
1432   ASSERT_FALSE(socket.isClosedBySelf());
1433   ASSERT_FALSE(socket.isClosedByPeer());
1434 }
1435
1436 // TODO:
1437 // - Test connect() and have the connect callback set the read callback
1438 // - Test connect() and have the connect callback unset the read callback
1439 // - Test reading/writing/closing/destroying the socket in the connect callback
1440 // - Test reading/writing/closing/destroying the socket in the read callback
1441 // - Test reading/writing/closing/destroying the socket in the write callback
1442 // - Test one-way shutdown behavior
1443 // - Test changing the EventBase
1444 //
1445 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1446 //   in connectSuccess(), readDataAvailable(), writeSuccess()
1447
1448
1449 ///////////////////////////////////////////////////////////////////////////
1450 // AsyncServerSocket tests
1451 ///////////////////////////////////////////////////////////////////////////
1452
1453 /**
1454  * Helper AcceptCallback class for the test code
1455  * It records the callbacks that were invoked, and also supports calling
1456  * generic std::function objects in each callback.
1457  */
1458 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1459  public:
1460   enum EventType {
1461     TYPE_START,
1462     TYPE_ACCEPT,
1463     TYPE_ERROR,
1464     TYPE_STOP
1465   };
1466   struct EventInfo {
1467     EventInfo(int fd, const folly::SocketAddress& addr)
1468       : type(TYPE_ACCEPT),
1469         fd(fd),
1470         address(addr),
1471         errorMsg() {}
1472     explicit EventInfo(const std::string& msg)
1473       : type(TYPE_ERROR),
1474         fd(-1),
1475         address(),
1476         errorMsg(msg) {}
1477     explicit EventInfo(EventType et)
1478       : type(et),
1479         fd(-1),
1480         address(),
1481         errorMsg() {}
1482
1483     EventType type;
1484     int fd;  // valid for TYPE_ACCEPT
1485     folly::SocketAddress address;  // valid for TYPE_ACCEPT
1486     string errorMsg;  // valid for TYPE_ERROR
1487   };
1488   typedef std::deque<EventInfo> EventList;
1489
1490   TestAcceptCallback()
1491     : connectionAcceptedFn_(),
1492       acceptErrorFn_(),
1493       acceptStoppedFn_(),
1494       events_() {}
1495
1496   std::deque<EventInfo>* getEvents() {
1497     return &events_;
1498   }
1499
1500   void setConnectionAcceptedFn(
1501       const std::function<void(int, const folly::SocketAddress&)>& fn) {
1502     connectionAcceptedFn_ = fn;
1503   }
1504   void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1505     acceptErrorFn_ = fn;
1506   }
1507   void setAcceptStartedFn(const std::function<void()>& fn) {
1508     acceptStartedFn_ = fn;
1509   }
1510   void setAcceptStoppedFn(const std::function<void()>& fn) {
1511     acceptStoppedFn_ = fn;
1512   }
1513
1514   void connectionAccepted(
1515       int fd, const folly::SocketAddress& clientAddr) noexcept override {
1516     events_.emplace_back(fd, clientAddr);
1517
1518     if (connectionAcceptedFn_) {
1519       connectionAcceptedFn_(fd, clientAddr);
1520     }
1521   }
1522   void acceptError(const std::exception& ex) noexcept override {
1523     events_.emplace_back(ex.what());
1524
1525     if (acceptErrorFn_) {
1526       acceptErrorFn_(ex);
1527     }
1528   }
1529   void acceptStarted() noexcept override {
1530     events_.emplace_back(TYPE_START);
1531
1532     if (acceptStartedFn_) {
1533       acceptStartedFn_();
1534     }
1535   }
1536   void acceptStopped() noexcept override {
1537     events_.emplace_back(TYPE_STOP);
1538
1539     if (acceptStoppedFn_) {
1540       acceptStoppedFn_();
1541     }
1542   }
1543
1544  private:
1545   std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1546   std::function<void(const std::exception&)> acceptErrorFn_;
1547   std::function<void()> acceptStartedFn_;
1548   std::function<void()> acceptStoppedFn_;
1549
1550   std::deque<EventInfo> events_;
1551 };
1552
1553 /**
1554  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1555  */
1556 TEST(AsyncSocketTest, ServerAcceptOptions) {
1557   EventBase eventBase;
1558
1559   // Create a server socket
1560   std::shared_ptr<AsyncServerSocket> serverSocket(
1561       AsyncServerSocket::newSocket(&eventBase));
1562   serverSocket->bind(0);
1563   serverSocket->listen(16);
1564   folly::SocketAddress serverAddress;
1565   serverSocket->getAddress(&serverAddress);
1566
1567   // Add a callback to accept one connection then stop the loop
1568   TestAcceptCallback acceptCallback;
1569   acceptCallback.setConnectionAcceptedFn(
1570     [&](int fd, const folly::SocketAddress& addr) {
1571       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1572     });
1573   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1574     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1575   });
1576   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1577   serverSocket->startAccepting();
1578
1579   // Connect to the server socket
1580   std::shared_ptr<AsyncSocket> socket(
1581       AsyncSocket::newSocket(&eventBase, serverAddress));
1582
1583   eventBase.loop();
1584
1585   // Verify that the server accepted a connection
1586   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1587   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1588                     TestAcceptCallback::TYPE_START);
1589   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1590                     TestAcceptCallback::TYPE_ACCEPT);
1591   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1592                     TestAcceptCallback::TYPE_STOP);
1593   int fd = acceptCallback.getEvents()->at(1).fd;
1594
1595   // The accepted connection should already be in non-blocking mode
1596   int flags = fcntl(fd, F_GETFL, 0);
1597   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1598
1599 #ifndef TCP_NOPUSH
1600   // The accepted connection should already have TCP_NODELAY set
1601   int value;
1602   socklen_t valueLength = sizeof(value);
1603   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1604   CHECK_EQ(rc, 0);
1605   CHECK_EQ(value, 1);
1606 #endif
1607 }
1608
1609 /**
1610  * Test AsyncServerSocket::removeAcceptCallback()
1611  */
1612 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1613   // Create a new AsyncServerSocket
1614   EventBase eventBase;
1615   std::shared_ptr<AsyncServerSocket> serverSocket(
1616       AsyncServerSocket::newSocket(&eventBase));
1617   serverSocket->bind(0);
1618   serverSocket->listen(16);
1619   folly::SocketAddress serverAddress;
1620   serverSocket->getAddress(&serverAddress);
1621
1622   // Add several accept callbacks
1623   TestAcceptCallback cb1;
1624   TestAcceptCallback cb2;
1625   TestAcceptCallback cb3;
1626   TestAcceptCallback cb4;
1627   TestAcceptCallback cb5;
1628   TestAcceptCallback cb6;
1629   TestAcceptCallback cb7;
1630
1631   // Test having callbacks remove other callbacks before them on the list,
1632   // after them on the list, or removing themselves.
1633   //
1634   // Have callback 2 remove callback 3 and callback 5 the first time it is
1635   // called.
1636   int cb2Count = 0;
1637   cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1638       std::shared_ptr<AsyncSocket> sock2(
1639         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1640       });
1641   cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1642     });
1643   cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1644       std::shared_ptr<AsyncSocket> sock3(
1645         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1646     });
1647   cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1648   std::shared_ptr<AsyncSocket> sock5(
1649       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1650
1651     });
1652   cb2.setConnectionAcceptedFn(
1653     [&](int fd, const folly::SocketAddress& addr) {
1654       if (cb2Count == 0) {
1655         serverSocket->removeAcceptCallback(&cb3, nullptr);
1656         serverSocket->removeAcceptCallback(&cb5, nullptr);
1657       }
1658       ++cb2Count;
1659     });
1660   // Have callback 6 remove callback 4 the first time it is called,
1661   // and destroy the server socket the second time it is called
1662   int cb6Count = 0;
1663   cb6.setConnectionAcceptedFn(
1664     [&](int fd, const folly::SocketAddress& addr) {
1665       if (cb6Count == 0) {
1666         serverSocket->removeAcceptCallback(&cb4, nullptr);
1667         std::shared_ptr<AsyncSocket> sock6(
1668           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1669         std::shared_ptr<AsyncSocket> sock7(
1670           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1671         std::shared_ptr<AsyncSocket> sock8(
1672           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1673
1674       } else {
1675         serverSocket.reset();
1676       }
1677       ++cb6Count;
1678     });
1679   // Have callback 7 remove itself
1680   cb7.setConnectionAcceptedFn(
1681     [&](int fd, const folly::SocketAddress& addr) {
1682       serverSocket->removeAcceptCallback(&cb7, nullptr);
1683     });
1684
1685   serverSocket->addAcceptCallback(&cb1, nullptr);
1686   serverSocket->addAcceptCallback(&cb2, nullptr);
1687   serverSocket->addAcceptCallback(&cb3, nullptr);
1688   serverSocket->addAcceptCallback(&cb4, nullptr);
1689   serverSocket->addAcceptCallback(&cb5, nullptr);
1690   serverSocket->addAcceptCallback(&cb6, nullptr);
1691   serverSocket->addAcceptCallback(&cb7, nullptr);
1692   serverSocket->startAccepting();
1693
1694   // Make several connections to the socket
1695   std::shared_ptr<AsyncSocket> sock1(
1696       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1697   std::shared_ptr<AsyncSocket> sock4(
1698       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1699
1700   // Loop until we are stopped
1701   eventBase.loop();
1702
1703   // Check to make sure that the expected callbacks were invoked.
1704   //
1705   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1706   // the AcceptCallbacks in round-robin fashion, in the order that they were
1707   // added.  The code is implemented this way right now, but the API doesn't
1708   // explicitly require it be done this way.  If we change the code not to be
1709   // exactly round robin in the future, we can simplify the test checks here.
1710   // (We'll also need to update the termination code, since we expect cb6 to
1711   // get called twice to terminate the loop.)
1712   CHECK_EQ(cb1.getEvents()->size(), 4);
1713   CHECK_EQ(cb1.getEvents()->at(0).type,
1714                     TestAcceptCallback::TYPE_START);
1715   CHECK_EQ(cb1.getEvents()->at(1).type,
1716                     TestAcceptCallback::TYPE_ACCEPT);
1717   CHECK_EQ(cb1.getEvents()->at(2).type,
1718                     TestAcceptCallback::TYPE_ACCEPT);
1719   CHECK_EQ(cb1.getEvents()->at(3).type,
1720                     TestAcceptCallback::TYPE_STOP);
1721
1722   CHECK_EQ(cb2.getEvents()->size(), 4);
1723   CHECK_EQ(cb2.getEvents()->at(0).type,
1724                     TestAcceptCallback::TYPE_START);
1725   CHECK_EQ(cb2.getEvents()->at(1).type,
1726                     TestAcceptCallback::TYPE_ACCEPT);
1727   CHECK_EQ(cb2.getEvents()->at(2).type,
1728                     TestAcceptCallback::TYPE_ACCEPT);
1729   CHECK_EQ(cb2.getEvents()->at(3).type,
1730                     TestAcceptCallback::TYPE_STOP);
1731
1732   CHECK_EQ(cb3.getEvents()->size(), 2);
1733   CHECK_EQ(cb3.getEvents()->at(0).type,
1734                     TestAcceptCallback::TYPE_START);
1735   CHECK_EQ(cb3.getEvents()->at(1).type,
1736                     TestAcceptCallback::TYPE_STOP);
1737
1738   CHECK_EQ(cb4.getEvents()->size(), 3);
1739   CHECK_EQ(cb4.getEvents()->at(0).type,
1740                     TestAcceptCallback::TYPE_START);
1741   CHECK_EQ(cb4.getEvents()->at(1).type,
1742                     TestAcceptCallback::TYPE_ACCEPT);
1743   CHECK_EQ(cb4.getEvents()->at(2).type,
1744                     TestAcceptCallback::TYPE_STOP);
1745
1746   CHECK_EQ(cb5.getEvents()->size(), 2);
1747   CHECK_EQ(cb5.getEvents()->at(0).type,
1748                     TestAcceptCallback::TYPE_START);
1749   CHECK_EQ(cb5.getEvents()->at(1).type,
1750                     TestAcceptCallback::TYPE_STOP);
1751
1752   CHECK_EQ(cb6.getEvents()->size(), 4);
1753   CHECK_EQ(cb6.getEvents()->at(0).type,
1754                     TestAcceptCallback::TYPE_START);
1755   CHECK_EQ(cb6.getEvents()->at(1).type,
1756                     TestAcceptCallback::TYPE_ACCEPT);
1757   CHECK_EQ(cb6.getEvents()->at(2).type,
1758                     TestAcceptCallback::TYPE_ACCEPT);
1759   CHECK_EQ(cb6.getEvents()->at(3).type,
1760                     TestAcceptCallback::TYPE_STOP);
1761
1762   CHECK_EQ(cb7.getEvents()->size(), 3);
1763   CHECK_EQ(cb7.getEvents()->at(0).type,
1764                     TestAcceptCallback::TYPE_START);
1765   CHECK_EQ(cb7.getEvents()->at(1).type,
1766                     TestAcceptCallback::TYPE_ACCEPT);
1767   CHECK_EQ(cb7.getEvents()->at(2).type,
1768                     TestAcceptCallback::TYPE_STOP);
1769 }
1770
1771 /**
1772  * Test AsyncServerSocket::removeAcceptCallback()
1773  */
1774 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1775   // Create a new AsyncServerSocket
1776   EventBase eventBase;
1777   std::shared_ptr<AsyncServerSocket> serverSocket(
1778       AsyncServerSocket::newSocket(&eventBase));
1779   serverSocket->bind(0);
1780   serverSocket->listen(16);
1781   folly::SocketAddress serverAddress;
1782   serverSocket->getAddress(&serverAddress);
1783
1784   // Add several accept callbacks
1785   TestAcceptCallback cb1;
1786   auto thread_id = pthread_self();
1787   cb1.setAcceptStartedFn([&](){
1788     CHECK_NE(thread_id, pthread_self());
1789     thread_id = pthread_self();
1790   });
1791   cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1792     CHECK_EQ(thread_id, pthread_self());
1793     serverSocket->removeAcceptCallback(&cb1, nullptr);
1794   });
1795   cb1.setAcceptStoppedFn([&](){
1796     CHECK_EQ(thread_id, pthread_self());
1797   });
1798
1799   // Test having callbacks remove other callbacks before them on the list,
1800   serverSocket->addAcceptCallback(&cb1, nullptr);
1801   serverSocket->startAccepting();
1802
1803   // Make several connections to the socket
1804   std::shared_ptr<AsyncSocket> sock1(
1805       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1806
1807   // Loop in another thread
1808   auto other = std::thread([&](){
1809     eventBase.loop();
1810   });
1811   other.join();
1812
1813   // Check to make sure that the expected callbacks were invoked.
1814   //
1815   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1816   // the AcceptCallbacks in round-robin fashion, in the order that they were
1817   // added.  The code is implemented this way right now, but the API doesn't
1818   // explicitly require it be done this way.  If we change the code not to be
1819   // exactly round robin in the future, we can simplify the test checks here.
1820   // (We'll also need to update the termination code, since we expect cb6 to
1821   // get called twice to terminate the loop.)
1822   CHECK_EQ(cb1.getEvents()->size(), 3);
1823   CHECK_EQ(cb1.getEvents()->at(0).type,
1824                     TestAcceptCallback::TYPE_START);
1825   CHECK_EQ(cb1.getEvents()->at(1).type,
1826                     TestAcceptCallback::TYPE_ACCEPT);
1827   CHECK_EQ(cb1.getEvents()->at(2).type,
1828                     TestAcceptCallback::TYPE_STOP);
1829
1830 }
1831
1832 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1833   // Add a callback to accept one connection then stop accepting
1834   TestAcceptCallback acceptCallback;
1835   acceptCallback.setConnectionAcceptedFn(
1836     [&](int fd, const folly::SocketAddress& addr) {
1837       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1838     });
1839   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1840     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1841   });
1842   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1843   serverSocket->startAccepting();
1844
1845   // Connect to the server socket
1846   EventBase* eventBase = serverSocket->getEventBase();
1847   folly::SocketAddress serverAddress;
1848   serverSocket->getAddress(&serverAddress);
1849   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1850
1851   // Loop to process all events
1852   eventBase->loop();
1853
1854   // Verify that the server accepted a connection
1855   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1856   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1857                     TestAcceptCallback::TYPE_START);
1858   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1859                     TestAcceptCallback::TYPE_ACCEPT);
1860   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1861                     TestAcceptCallback::TYPE_STOP);
1862 }
1863
1864 /* Verify that we don't leak sockets if we are destroyed()
1865  * and there are still writes pending
1866  *
1867  * If destroy() only calls close() instead of closeNow(),
1868  * it would shutdown(writes) on the socket, but it would
1869  * never be close()'d, and the socket would leak
1870  */
1871 TEST(AsyncSocketTest, DestroyCloseTest) {
1872   TestServer server;
1873
1874   // connect()
1875   EventBase clientEB;
1876   EventBase serverEB;
1877   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1878   ConnCallback ccb;
1879   socket->connect(&ccb, server.getAddress(), 30);
1880
1881   // Accept the connection
1882   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1883   ReadCallback rcb;
1884   acceptedSocket->setReadCB(&rcb);
1885
1886   // Write a large buffer to the socket that is larger than kernel buffer
1887   size_t simpleBufLength = 5000000;
1888   char* simpleBuf = new char[simpleBufLength];
1889   memset(simpleBuf, 'a', simpleBufLength);
1890   WriteCallback wcb;
1891
1892   // Let the reads and writes run to completion
1893   int fd = acceptedSocket->getFd();
1894
1895   acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1896   socket.reset();
1897   acceptedSocket.reset();
1898
1899   // Test that server socket was closed
1900   ssize_t sz = read(fd, simpleBuf, simpleBufLength);
1901   CHECK_EQ(sz, -1);
1902   CHECK_EQ(errno, 9);
1903   delete[] simpleBuf;
1904 }
1905
1906 /**
1907  * Test AsyncServerSocket::useExistingSocket()
1908  */
1909 TEST(AsyncSocketTest, ServerExistingSocket) {
1910   EventBase eventBase;
1911
1912   // Test creating a socket, and letting AsyncServerSocket bind and listen
1913   {
1914     // Manually create a socket
1915     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1916     ASSERT_GE(fd, 0);
1917
1918     // Create a server socket
1919     AsyncServerSocket::UniquePtr serverSocket(
1920         new AsyncServerSocket(&eventBase));
1921     serverSocket->useExistingSocket(fd);
1922     folly::SocketAddress address;
1923     serverSocket->getAddress(&address);
1924     address.setPort(0);
1925     serverSocket->bind(address);
1926     serverSocket->listen(16);
1927
1928     // Make sure the socket works
1929     serverSocketSanityTest(serverSocket.get());
1930   }
1931
1932   // Test creating a socket and binding manually,
1933   // then letting AsyncServerSocket listen
1934   {
1935     // Manually create a socket
1936     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1937     ASSERT_GE(fd, 0);
1938     // bind
1939     struct sockaddr_in addr;
1940     addr.sin_family = AF_INET;
1941     addr.sin_port = 0;
1942     addr.sin_addr.s_addr = INADDR_ANY;
1943     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1944                              sizeof(addr)), 0);
1945     // Look up the address that we bound to
1946     folly::SocketAddress boundAddress;
1947     boundAddress.setFromLocalAddress(fd);
1948
1949     // Create a server socket
1950     AsyncServerSocket::UniquePtr serverSocket(
1951         new AsyncServerSocket(&eventBase));
1952     serverSocket->useExistingSocket(fd);
1953     serverSocket->listen(16);
1954
1955     // Make sure AsyncServerSocket reports the same address that we bound to
1956     folly::SocketAddress serverSocketAddress;
1957     serverSocket->getAddress(&serverSocketAddress);
1958     CHECK_EQ(boundAddress, serverSocketAddress);
1959
1960     // Make sure the socket works
1961     serverSocketSanityTest(serverSocket.get());
1962   }
1963
1964   // Test creating a socket, binding and listening manually,
1965   // then giving it to AsyncServerSocket
1966   {
1967     // Manually create a socket
1968     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1969     ASSERT_GE(fd, 0);
1970     // bind
1971     struct sockaddr_in addr;
1972     addr.sin_family = AF_INET;
1973     addr.sin_port = 0;
1974     addr.sin_addr.s_addr = INADDR_ANY;
1975     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1976                              sizeof(addr)), 0);
1977     // Look up the address that we bound to
1978     folly::SocketAddress boundAddress;
1979     boundAddress.setFromLocalAddress(fd);
1980     // listen
1981     CHECK_EQ(listen(fd, 16), 0);
1982
1983     // Create a server socket
1984     AsyncServerSocket::UniquePtr serverSocket(
1985         new AsyncServerSocket(&eventBase));
1986     serverSocket->useExistingSocket(fd);
1987
1988     // Make sure AsyncServerSocket reports the same address that we bound to
1989     folly::SocketAddress serverSocketAddress;
1990     serverSocket->getAddress(&serverSocketAddress);
1991     CHECK_EQ(boundAddress, serverSocketAddress);
1992
1993     // Make sure the socket works
1994     serverSocketSanityTest(serverSocket.get());
1995   }
1996 }
1997
1998 TEST(AsyncSocketTest, UnixDomainSocketTest) {
1999   EventBase eventBase;
2000
2001   // Create a server socket
2002   std::shared_ptr<AsyncServerSocket> serverSocket(
2003       AsyncServerSocket::newSocket(&eventBase));
2004   string path(1, 0);
2005   path.append("/anonymous");
2006   folly::SocketAddress serverAddress;
2007   serverAddress.setFromPath(path);
2008   serverSocket->bind(serverAddress);
2009   serverSocket->listen(16);
2010
2011   // Add a callback to accept one connection then stop the loop
2012   TestAcceptCallback acceptCallback;
2013   acceptCallback.setConnectionAcceptedFn(
2014     [&](int fd, const folly::SocketAddress& addr) {
2015       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2016     });
2017   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
2018     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2019   });
2020   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2021   serverSocket->startAccepting();
2022
2023   // Connect to the server socket
2024   std::shared_ptr<AsyncSocket> socket(
2025       AsyncSocket::newSocket(&eventBase, serverAddress));
2026
2027   eventBase.loop();
2028
2029   // Verify that the server accepted a connection
2030   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2031   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2032                     TestAcceptCallback::TYPE_START);
2033   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2034                     TestAcceptCallback::TYPE_ACCEPT);
2035   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2036                     TestAcceptCallback::TYPE_STOP);
2037   int fd = acceptCallback.getEvents()->at(1).fd;
2038
2039   // The accepted connection should already be in non-blocking mode
2040   int flags = fcntl(fd, F_GETFL, 0);
2041   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2042 }