Add method to get the connect timeout used for an AsyncSocket
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/RWSpinLock.h>
21 #include <folly/SocketAddress.h>
22
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/async/test/AsyncSocketTest.h>
25 #include <folly/io/async/test/Util.h>
26 #include <folly/test/SocketAddressTestHelper.h>
27
28 #include <gtest/gtest.h>
29 #include <boost/scoped_array.hpp>
30 #include <iostream>
31 #include <unistd.h>
32 #include <fcntl.h>
33 #include <poll.h>
34 #include <sys/types.h>
35 #include <sys/socket.h>
36 #include <netinet/tcp.h>
37 #include <thread>
38
39 using namespace boost;
40
41 using std::string;
42 using std::vector;
43 using std::min;
44 using std::cerr;
45 using std::endl;
46 using std::unique_ptr;
47 using std::chrono::milliseconds;
48 using boost::scoped_array;
49
50 using namespace folly;
51
52 class DelayedWrite: public AsyncTimeout {
53  public:
54   DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
55       unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
56       bool cork, bool lastWrite = false):
57     AsyncTimeout(socket->getEventBase()),
58     socket_(socket),
59     bufs_(std::move(bufs)),
60     wcb_(wcb),
61     cork_(cork),
62     lastWrite_(lastWrite) {}
63
64  private:
65   void timeoutExpired() noexcept override {
66     WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
67     socket_->writeChain(wcb_, std::move(bufs_), flags);
68     if (lastWrite_) {
69       socket_->shutdownWrite();
70     }
71   }
72
73   std::shared_ptr<AsyncSocket> socket_;
74   unique_ptr<IOBuf> bufs_;
75   AsyncTransportWrapper::WriteCallback* wcb_;
76   bool cork_;
77   bool lastWrite_;
78 };
79
80 ///////////////////////////////////////////////////////////////////////////
81 // connect() tests
82 ///////////////////////////////////////////////////////////////////////////
83
84 /**
85  * Test connecting to a server
86  */
87 TEST(AsyncSocketTest, Connect) {
88   // Start listening on a local port
89   TestServer server;
90
91   // Connect using a AsyncSocket
92   EventBase evb;
93   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
94   ConnCallback cb;
95   socket->connect(&cb, server.getAddress(), 30);
96
97   evb.loop();
98
99   CHECK_EQ(cb.state, STATE_SUCCEEDED);
100   EXPECT_LE(0, socket->getConnectTime().count());
101   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
102 }
103
104 /**
105  * Test connecting to a server that isn't listening
106  */
107 TEST(AsyncSocketTest, ConnectRefused) {
108   EventBase evb;
109
110   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
111
112   // Hopefully nothing is actually listening on this address
113   folly::SocketAddress addr("127.0.0.1", 65535);
114   ConnCallback cb;
115   socket->connect(&cb, addr, 30);
116
117   evb.loop();
118
119   CHECK_EQ(cb.state, STATE_FAILED);
120   CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
121   EXPECT_LE(0, socket->getConnectTime().count());
122   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
123 }
124
125 /**
126  * Test connection timeout
127  */
128 TEST(AsyncSocketTest, ConnectTimeout) {
129   EventBase evb;
130
131   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
132
133   // Try connecting to server that won't respond.
134   //
135   // This depends somewhat on the network where this test is run.
136   // Hopefully this IP will be routable but unresponsive.
137   // (Alternatively, we could try listening on a local raw socket, but that
138   // normally requires root privileges.)
139   auto host =
140       SocketAddressTestHelper::isIPv6Enabled() ?
141       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
142       SocketAddressTestHelper::isIPv4Enabled() ?
143       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
144       nullptr;
145   SocketAddress addr(host, 65535);
146   ConnCallback cb;
147   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
148
149   evb.loop();
150
151   CHECK_EQ(cb.state, STATE_FAILED);
152   CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
153
154   // Verify that we can still get the peer address after a timeout.
155   // Use case is if the client was created from a client pool, and we want
156   // to log which peer failed.
157   folly::SocketAddress peer;
158   socket->getPeerAddress(&peer);
159   CHECK_EQ(peer, addr);
160   EXPECT_LE(0, socket->getConnectTime().count());
161   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
162 }
163
164 /**
165  * Test writing immediately after connecting, without waiting for connect
166  * to finish.
167  */
168 TEST(AsyncSocketTest, ConnectAndWrite) {
169   TestServer server;
170
171   // connect()
172   EventBase evb;
173   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
174   ConnCallback ccb;
175   socket->connect(&ccb, server.getAddress(), 30);
176
177   // write()
178   char buf[128];
179   memset(buf, 'a', sizeof(buf));
180   WriteCallback wcb;
181   socket->write(&wcb, buf, sizeof(buf));
182
183   // Loop.  We don't bother accepting on the server socket yet.
184   // The kernel should be able to buffer the write request so it can succeed.
185   evb.loop();
186
187   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
188   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
189
190   // Make sure the server got a connection and received the data
191   socket->close();
192   server.verifyConnection(buf, sizeof(buf));
193
194   ASSERT_TRUE(socket->isClosedBySelf());
195   ASSERT_FALSE(socket->isClosedByPeer());
196   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
197 }
198
199 /**
200  * Test connecting using a nullptr connect callback.
201  */
202 TEST(AsyncSocketTest, ConnectNullCallback) {
203   TestServer server;
204
205   // connect()
206   EventBase evb;
207   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
208   socket->connect(nullptr, server.getAddress(), 30);
209
210   // write some data, just so we have some way of verifing
211   // that the socket works correctly after connecting
212   char buf[128];
213   memset(buf, 'a', sizeof(buf));
214   WriteCallback wcb;
215   socket->write(&wcb, buf, sizeof(buf));
216
217   evb.loop();
218
219   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
220
221   // Make sure the server got a connection and received the data
222   socket->close();
223   server.verifyConnection(buf, sizeof(buf));
224
225   ASSERT_TRUE(socket->isClosedBySelf());
226   ASSERT_FALSE(socket->isClosedByPeer());
227 }
228
229 /**
230  * Test calling both write() and close() immediately after connecting, without
231  * waiting for connect to finish.
232  *
233  * This exercises the STATE_CONNECTING_CLOSING code.
234  */
235 TEST(AsyncSocketTest, ConnectWriteAndClose) {
236   TestServer server;
237
238   // connect()
239   EventBase evb;
240   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
241   ConnCallback ccb;
242   socket->connect(&ccb, server.getAddress(), 30);
243
244   // write()
245   char buf[128];
246   memset(buf, 'a', sizeof(buf));
247   WriteCallback wcb;
248   socket->write(&wcb, buf, sizeof(buf));
249
250   // close()
251   socket->close();
252
253   // Loop.  We don't bother accepting on the server socket yet.
254   // The kernel should be able to buffer the write request so it can succeed.
255   evb.loop();
256
257   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
258   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
259
260   // Make sure the server got a connection and received the data
261   server.verifyConnection(buf, sizeof(buf));
262
263   ASSERT_TRUE(socket->isClosedBySelf());
264   ASSERT_FALSE(socket->isClosedByPeer());
265 }
266
267 /**
268  * Test calling close() immediately after connect()
269  */
270 TEST(AsyncSocketTest, ConnectAndClose) {
271   TestServer server;
272
273   // Connect using a AsyncSocket
274   EventBase evb;
275   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
276   ConnCallback ccb;
277   socket->connect(&ccb, server.getAddress(), 30);
278
279   // Hopefully the connect didn't succeed immediately.
280   // If it did, we can't exercise the close-while-connecting code path.
281   if (ccb.state == STATE_SUCCEEDED) {
282     LOG(INFO) << "connect() succeeded immediately; aborting test "
283                        "of close-during-connect behavior";
284     return;
285   }
286
287   socket->close();
288
289   // Loop, although there shouldn't be anything to do.
290   evb.loop();
291
292   // Make sure the connection was aborted
293   CHECK_EQ(ccb.state, STATE_FAILED);
294
295   ASSERT_TRUE(socket->isClosedBySelf());
296   ASSERT_FALSE(socket->isClosedByPeer());
297 }
298
299 /**
300  * Test calling closeNow() immediately after connect()
301  *
302  * This should be identical to the normal close behavior.
303  */
304 TEST(AsyncSocketTest, ConnectAndCloseNow) {
305   TestServer server;
306
307   // Connect using a AsyncSocket
308   EventBase evb;
309   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
310   ConnCallback ccb;
311   socket->connect(&ccb, server.getAddress(), 30);
312
313   // Hopefully the connect didn't succeed immediately.
314   // If it did, we can't exercise the close-while-connecting code path.
315   if (ccb.state == STATE_SUCCEEDED) {
316     LOG(INFO) << "connect() succeeded immediately; aborting test "
317                        "of closeNow()-during-connect behavior";
318     return;
319   }
320
321   socket->closeNow();
322
323   // Loop, although there shouldn't be anything to do.
324   evb.loop();
325
326   // Make sure the connection was aborted
327   CHECK_EQ(ccb.state, STATE_FAILED);
328
329   ASSERT_TRUE(socket->isClosedBySelf());
330   ASSERT_FALSE(socket->isClosedByPeer());
331 }
332
333 /**
334  * Test calling both write() and closeNow() immediately after connecting,
335  * without waiting for connect to finish.
336  *
337  * This should abort the pending write.
338  */
339 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
340   TestServer server;
341
342   // connect()
343   EventBase evb;
344   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
345   ConnCallback ccb;
346   socket->connect(&ccb, server.getAddress(), 30);
347
348   // Hopefully the connect didn't succeed immediately.
349   // If it did, we can't exercise the close-while-connecting code path.
350   if (ccb.state == STATE_SUCCEEDED) {
351     LOG(INFO) << "connect() succeeded immediately; aborting test "
352                        "of write-during-connect behavior";
353     return;
354   }
355
356   // write()
357   char buf[128];
358   memset(buf, 'a', sizeof(buf));
359   WriteCallback wcb;
360   socket->write(&wcb, buf, sizeof(buf));
361
362   // close()
363   socket->closeNow();
364
365   // Loop, although there shouldn't be anything to do.
366   evb.loop();
367
368   CHECK_EQ(ccb.state, STATE_FAILED);
369   CHECK_EQ(wcb.state, STATE_FAILED);
370
371   ASSERT_TRUE(socket->isClosedBySelf());
372   ASSERT_FALSE(socket->isClosedByPeer());
373 }
374
375 /**
376  * Test installing a read callback immediately, before connect() finishes.
377  */
378 TEST(AsyncSocketTest, ConnectAndRead) {
379   TestServer server;
380
381   // connect()
382   EventBase evb;
383   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
384   ConnCallback ccb;
385   socket->connect(&ccb, server.getAddress(), 30);
386
387   ReadCallback rcb;
388   socket->setReadCB(&rcb);
389
390   // Even though we haven't looped yet, we should be able to accept
391   // the connection and send data to it.
392   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
393   uint8_t buf[128];
394   memset(buf, 'a', sizeof(buf));
395   acceptedSocket->write(buf, sizeof(buf));
396   acceptedSocket->flush();
397   acceptedSocket->close();
398
399   // Loop, although there shouldn't be anything to do.
400   evb.loop();
401
402   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
403   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
404   CHECK_EQ(rcb.buffers.size(), 1);
405   CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
406   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
407
408   ASSERT_FALSE(socket->isClosedBySelf());
409   ASSERT_FALSE(socket->isClosedByPeer());
410 }
411
412 /**
413  * Test installing a read callback and then closing immediately before the
414  * connect attempt finishes.
415  */
416 TEST(AsyncSocketTest, ConnectReadAndClose) {
417   TestServer server;
418
419   // connect()
420   EventBase evb;
421   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
422   ConnCallback ccb;
423   socket->connect(&ccb, server.getAddress(), 30);
424
425   // Hopefully the connect didn't succeed immediately.
426   // If it did, we can't exercise the close-while-connecting code path.
427   if (ccb.state == STATE_SUCCEEDED) {
428     LOG(INFO) << "connect() succeeded immediately; aborting test "
429                        "of read-during-connect behavior";
430     return;
431   }
432
433   ReadCallback rcb;
434   socket->setReadCB(&rcb);
435
436   // close()
437   socket->close();
438
439   // Loop, although there shouldn't be anything to do.
440   evb.loop();
441
442   CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
443   CHECK_EQ(rcb.buffers.size(), 0);
444   CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
445
446   ASSERT_TRUE(socket->isClosedBySelf());
447   ASSERT_FALSE(socket->isClosedByPeer());
448 }
449
450 /**
451  * Test both writing and installing a read callback immediately,
452  * before connect() finishes.
453  */
454 TEST(AsyncSocketTest, ConnectWriteAndRead) {
455   TestServer server;
456
457   // connect()
458   EventBase evb;
459   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
460   ConnCallback ccb;
461   socket->connect(&ccb, server.getAddress(), 30);
462
463   // write()
464   char buf1[128];
465   memset(buf1, 'a', sizeof(buf1));
466   WriteCallback wcb;
467   socket->write(&wcb, buf1, sizeof(buf1));
468
469   // set a read callback
470   ReadCallback rcb;
471   socket->setReadCB(&rcb);
472
473   // Even though we haven't looped yet, we should be able to accept
474   // the connection and send data to it.
475   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
476   uint8_t buf2[128];
477   memset(buf2, 'b', sizeof(buf2));
478   acceptedSocket->write(buf2, sizeof(buf2));
479   acceptedSocket->flush();
480
481   // shut down the write half of acceptedSocket, so that the AsyncSocket
482   // will stop reading and we can break out of the event loop.
483   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
484
485   // Loop
486   evb.loop();
487
488   // Make sure the connect succeeded
489   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
490
491   // Make sure the AsyncSocket read the data written by the accepted socket
492   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
493   CHECK_EQ(rcb.buffers.size(), 1);
494   CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
495   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
496
497   // Close the AsyncSocket so we'll see EOF on acceptedSocket
498   socket->close();
499
500   // Make sure the accepted socket saw the data written by the AsyncSocket
501   uint8_t readbuf[sizeof(buf1)];
502   acceptedSocket->readAll(readbuf, sizeof(readbuf));
503   CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
504   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
505   CHECK_EQ(bytesRead, 0);
506
507   ASSERT_FALSE(socket->isClosedBySelf());
508   ASSERT_TRUE(socket->isClosedByPeer());
509 }
510
511 /**
512  * Test writing to the socket then shutting down writes before the connect
513  * attempt finishes.
514  */
515 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
516   TestServer server;
517
518   // connect()
519   EventBase evb;
520   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
521   ConnCallback ccb;
522   socket->connect(&ccb, server.getAddress(), 30);
523
524   // Hopefully the connect didn't succeed immediately.
525   // If it did, we can't exercise the write-while-connecting code path.
526   if (ccb.state == STATE_SUCCEEDED) {
527     LOG(INFO) << "connect() succeeded immediately; skipping test";
528     return;
529   }
530
531   // Ask to write some data
532   char wbuf[128];
533   memset(wbuf, 'a', sizeof(wbuf));
534   WriteCallback wcb;
535   socket->write(&wcb, wbuf, sizeof(wbuf));
536   socket->shutdownWrite();
537
538   // Shutdown writes
539   socket->shutdownWrite();
540
541   // Even though we haven't looped yet, we should be able to accept
542   // the connection.
543   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
544
545   // Since the connection is still in progress, there should be no data to
546   // read yet.  Verify that the accepted socket is not readable.
547   struct pollfd fds[1];
548   fds[0].fd = acceptedSocket->getSocketFD();
549   fds[0].events = POLLIN;
550   fds[0].revents = 0;
551   int rc = poll(fds, 1, 0);
552   CHECK_EQ(rc, 0);
553
554   // Write data to the accepted socket
555   uint8_t acceptedWbuf[192];
556   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
557   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
558   acceptedSocket->flush();
559
560   // Loop
561   evb.loop();
562
563   // The loop should have completed the connection, written the queued data,
564   // and shutdown writes on the socket.
565   //
566   // Check that the connection was completed successfully and that the write
567   // callback succeeded.
568   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
569   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
570
571   // Check that we can read the data that was written to the socket, and that
572   // we see an EOF, since its socket was half-shutdown.
573   uint8_t readbuf[sizeof(wbuf)];
574   acceptedSocket->readAll(readbuf, sizeof(readbuf));
575   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
576   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
577   CHECK_EQ(bytesRead, 0);
578
579   // Close the accepted socket.  This will cause it to see EOF
580   // and uninstall the read callback when we loop next.
581   acceptedSocket->close();
582
583   // Install a read callback, then loop again.
584   ReadCallback rcb;
585   socket->setReadCB(&rcb);
586   evb.loop();
587
588   // This loop should have read the data and seen the EOF
589   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
590   CHECK_EQ(rcb.buffers.size(), 1);
591   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
592   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
593                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
594
595   ASSERT_FALSE(socket->isClosedBySelf());
596   ASSERT_FALSE(socket->isClosedByPeer());
597 }
598
599 /**
600  * Test reading, writing, and shutting down writes before the connect attempt
601  * finishes.
602  */
603 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
604   TestServer server;
605
606   // connect()
607   EventBase evb;
608   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
609   ConnCallback ccb;
610   socket->connect(&ccb, server.getAddress(), 30);
611
612   // Hopefully the connect didn't succeed immediately.
613   // If it did, we can't exercise the write-while-connecting code path.
614   if (ccb.state == STATE_SUCCEEDED) {
615     LOG(INFO) << "connect() succeeded immediately; skipping test";
616     return;
617   }
618
619   // Install a read callback
620   ReadCallback rcb;
621   socket->setReadCB(&rcb);
622
623   // Ask to write some data
624   char wbuf[128];
625   memset(wbuf, 'a', sizeof(wbuf));
626   WriteCallback wcb;
627   socket->write(&wcb, wbuf, sizeof(wbuf));
628
629   // Shutdown writes
630   socket->shutdownWrite();
631
632   // Even though we haven't looped yet, we should be able to accept
633   // the connection.
634   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
635
636   // Since the connection is still in progress, there should be no data to
637   // read yet.  Verify that the accepted socket is not readable.
638   struct pollfd fds[1];
639   fds[0].fd = acceptedSocket->getSocketFD();
640   fds[0].events = POLLIN;
641   fds[0].revents = 0;
642   int rc = poll(fds, 1, 0);
643   CHECK_EQ(rc, 0);
644
645   // Write data to the accepted socket
646   uint8_t acceptedWbuf[192];
647   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
648   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
649   acceptedSocket->flush();
650   // Shutdown writes to the accepted socket.  This will cause it to see EOF
651   // and uninstall the read callback.
652   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
653
654   // Loop
655   evb.loop();
656
657   // The loop should have completed the connection, written the queued data,
658   // shutdown writes on the socket, read the data we wrote to it, and see the
659   // EOF.
660   //
661   // Check that the connection was completed successfully and that the read
662   // and write callbacks were invoked as expected.
663   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
664   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
665   CHECK_EQ(rcb.buffers.size(), 1);
666   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
667   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
668                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
669   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
670
671   // Check that we can read the data that was written to the socket, and that
672   // we see an EOF, since its socket was half-shutdown.
673   uint8_t readbuf[sizeof(wbuf)];
674   acceptedSocket->readAll(readbuf, sizeof(readbuf));
675   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
676   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
677   CHECK_EQ(bytesRead, 0);
678
679   // Fully close both sockets
680   acceptedSocket->close();
681   socket->close();
682
683   ASSERT_FALSE(socket->isClosedBySelf());
684   ASSERT_TRUE(socket->isClosedByPeer());
685 }
686
687 /**
688  * Test reading, writing, and calling shutdownWriteNow() before the
689  * connect attempt finishes.
690  */
691 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
692   TestServer server;
693
694   // connect()
695   EventBase evb;
696   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
697   ConnCallback ccb;
698   socket->connect(&ccb, server.getAddress(), 30);
699
700   // Hopefully the connect didn't succeed immediately.
701   // If it did, we can't exercise the write-while-connecting code path.
702   if (ccb.state == STATE_SUCCEEDED) {
703     LOG(INFO) << "connect() succeeded immediately; skipping test";
704     return;
705   }
706
707   // Install a read callback
708   ReadCallback rcb;
709   socket->setReadCB(&rcb);
710
711   // Ask to write some data
712   char wbuf[128];
713   memset(wbuf, 'a', sizeof(wbuf));
714   WriteCallback wcb;
715   socket->write(&wcb, wbuf, sizeof(wbuf));
716
717   // Shutdown writes immediately.
718   // This should immediately discard the data that we just tried to write.
719   socket->shutdownWriteNow();
720
721   // Verify that writeError() was invoked on the write callback.
722   CHECK_EQ(wcb.state, STATE_FAILED);
723   CHECK_EQ(wcb.bytesWritten, 0);
724
725   // Even though we haven't looped yet, we should be able to accept
726   // the connection.
727   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
728
729   // Since the connection is still in progress, there should be no data to
730   // read yet.  Verify that the accepted socket is not readable.
731   struct pollfd fds[1];
732   fds[0].fd = acceptedSocket->getSocketFD();
733   fds[0].events = POLLIN;
734   fds[0].revents = 0;
735   int rc = poll(fds, 1, 0);
736   CHECK_EQ(rc, 0);
737
738   // Write data to the accepted socket
739   uint8_t acceptedWbuf[192];
740   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
741   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
742   acceptedSocket->flush();
743   // Shutdown writes to the accepted socket.  This will cause it to see EOF
744   // and uninstall the read callback.
745   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
746
747   // Loop
748   evb.loop();
749
750   // The loop should have completed the connection, written the queued data,
751   // shutdown writes on the socket, read the data we wrote to it, and see the
752   // EOF.
753   //
754   // Check that the connection was completed successfully and that the read
755   // callback was invoked as expected.
756   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
757   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
758   CHECK_EQ(rcb.buffers.size(), 1);
759   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
760   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
761                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
762
763   // Since we used shutdownWriteNow(), it should have discarded all pending
764   // write data.  Verify we see an immediate EOF when reading from the accepted
765   // socket.
766   uint8_t readbuf[sizeof(wbuf)];
767   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
768   CHECK_EQ(bytesRead, 0);
769
770   // Fully close both sockets
771   acceptedSocket->close();
772   socket->close();
773
774   ASSERT_FALSE(socket->isClosedBySelf());
775   ASSERT_TRUE(socket->isClosedByPeer());
776 }
777
778 // Helper function for use in testConnectOptWrite()
779 // Temporarily disable the read callback
780 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
781   // Uninstall the read callback
782   socket->setReadCB(nullptr);
783   // Schedule the read callback to be reinstalled after 1ms
784   socket->getEventBase()->runInLoop(
785       std::bind(&AsyncSocket::setReadCB, socket, rcb));
786 }
787
788 /**
789  * Test connect+write, then have the connect callback perform another write.
790  *
791  * This tests interaction of the optimistic writing after connect with
792  * additional write attempts that occur in the connect callback.
793  */
794 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
795   TestServer server;
796   EventBase evb;
797   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
798
799   // connect()
800   ConnCallback ccb;
801   socket->connect(&ccb, server.getAddress(), 30);
802
803   // Hopefully the connect didn't succeed immediately.
804   // If it did, we can't exercise the optimistic write code path.
805   if (ccb.state == STATE_SUCCEEDED) {
806     LOG(INFO) << "connect() succeeded immediately; aborting test "
807                        "of optimistic write behavior";
808     return;
809   }
810
811   // Tell the connect callback to perform a write when the connect succeeds
812   WriteCallback wcb2;
813   scoped_array<char> buf2(new char[size2]);
814   memset(buf2.get(), 'b', size2);
815   if (size2 > 0) {
816     ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
817     // Tell the second write callback to close the connection when it is done
818     wcb2.successCallback = [&] { socket->closeNow(); };
819   }
820
821   // Schedule one write() immediately, before the connect finishes
822   scoped_array<char> buf1(new char[size1]);
823   memset(buf1.get(), 'a', size1);
824   WriteCallback wcb1;
825   if (size1 > 0) {
826     socket->write(&wcb1, buf1.get(), size1);
827   }
828
829   if (close) {
830     // immediately perform a close, before connect() completes
831     socket->close();
832   }
833
834   // Start reading from the other endpoint after 10ms.
835   // If we're using large buffers, we have to read so that the writes don't
836   // block forever.
837   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
838   ReadCallback rcb;
839   rcb.dataAvailableCallback = std::bind(tmpDisableReads,
840                                         acceptedSocket.get(), &rcb);
841   socket->getEventBase()->tryRunAfterDelay(
842       std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
843       10);
844
845   // Loop.  We don't bother accepting on the server socket yet.
846   // The kernel should be able to buffer the write request so it can succeed.
847   evb.loop();
848
849   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
850   if (size1 > 0) {
851     CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
852   }
853   if (size2 > 0) {
854     CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
855   }
856
857   socket->close();
858
859   // Make sure the read callback received all of the data
860   size_t bytesRead = 0;
861   for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
862        it != rcb.buffers.end();
863        ++it) {
864     size_t start = bytesRead;
865     bytesRead += it->length;
866     size_t end = bytesRead;
867     if (start < size1) {
868       size_t cmpLen = min(size1, end) - start;
869       CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
870     }
871     if (end > size1 && end <= size1 + size2) {
872       size_t itOffset;
873       size_t buf2Offset;
874       size_t cmpLen;
875       if (start >= size1) {
876         itOffset = 0;
877         buf2Offset = start - size1;
878         cmpLen = end - start;
879       } else {
880         itOffset = size1 - start;
881         buf2Offset = 0;
882         cmpLen = end - size1;
883       }
884       CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
885                                cmpLen),
886                         0);
887     }
888   }
889   CHECK_EQ(bytesRead, size1 + size2);
890 }
891
892 TEST(AsyncSocketTest, ConnectCallbackWrite) {
893   // Test using small writes that should both succeed immediately
894   testConnectOptWrite(100, 200);
895
896   // Test using a large buffer in the connect callback, that should block
897   const size_t largeSize = 8*1024*1024;
898   testConnectOptWrite(100, largeSize);
899
900   // Test using a large initial write
901   testConnectOptWrite(largeSize, 100);
902
903   // Test using two large buffers
904   testConnectOptWrite(largeSize, largeSize);
905
906   // Test a small write in the connect callback,
907   // but no immediate write before connect completes
908   testConnectOptWrite(0, 64);
909
910   // Test a large write in the connect callback,
911   // but no immediate write before connect completes
912   testConnectOptWrite(0, largeSize);
913
914   // Test connect, a small write, then immediately call close() before connect
915   // completes
916   testConnectOptWrite(211, 0, true);
917
918   // Test connect, a large immediate write (that will block), then immediately
919   // call close() before connect completes
920   testConnectOptWrite(largeSize, 0, true);
921 }
922
923 ///////////////////////////////////////////////////////////////////////////
924 // write() related tests
925 ///////////////////////////////////////////////////////////////////////////
926
927 /**
928  * Test writing using a nullptr callback
929  */
930 TEST(AsyncSocketTest, WriteNullCallback) {
931   TestServer server;
932
933   // connect()
934   EventBase evb;
935   std::shared_ptr<AsyncSocket> socket =
936     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
937   evb.loop(); // loop until the socket is connected
938
939   // write() with a nullptr callback
940   char buf[128];
941   memset(buf, 'a', sizeof(buf));
942   socket->write(nullptr, buf, sizeof(buf));
943
944   evb.loop(); // loop until the data is sent
945
946   // Make sure the server got a connection and received the data
947   socket->close();
948   server.verifyConnection(buf, sizeof(buf));
949
950   ASSERT_TRUE(socket->isClosedBySelf());
951   ASSERT_FALSE(socket->isClosedByPeer());
952 }
953
954 /**
955  * Test writing with a send timeout
956  */
957 TEST(AsyncSocketTest, WriteTimeout) {
958   TestServer server;
959
960   // connect()
961   EventBase evb;
962   std::shared_ptr<AsyncSocket> socket =
963     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
964   evb.loop(); // loop until the socket is connected
965
966   // write() a large chunk of data, with no-one on the other end reading
967   size_t writeLength = 8*1024*1024;
968   uint32_t timeout = 200;
969   socket->setSendTimeout(timeout);
970   scoped_array<char> buf(new char[writeLength]);
971   memset(buf.get(), 'a', writeLength);
972   WriteCallback wcb;
973   socket->write(&wcb, buf.get(), writeLength);
974
975   TimePoint start;
976   evb.loop();
977   TimePoint end;
978
979   // Make sure the write attempt timed out as requested
980   CHECK_EQ(wcb.state, STATE_FAILED);
981   CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
982
983   // Check that the write timed out within a reasonable period of time.
984   // We don't check for exactly the specified timeout, since AsyncSocket only
985   // times out when it hasn't made progress for that period of time.
986   //
987   // On linux, the first write sends a few hundred kb of data, then blocks for
988   // writability, and then unblocks again after 40ms and is able to write
989   // another smaller of data before blocking permanently.  Therefore it doesn't
990   // time out until 40ms + timeout.
991   //
992   // I haven't fully verified the cause of this, but I believe it probably
993   // occurs because the receiving end delays sending an ack for up to 40ms.
994   // (This is the default value for TCP_DELACK_MIN.)  Once the sender receives
995   // the ack, it can send some more data.  However, after that point the
996   // receiver's kernel buffer is full.  This 40ms delay happens even with
997   // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints.  However, the
998   // kernel may be automatically disabling TCP_QUICKACK after receiving some
999   // data.
1000   //
1001   // For now, we simply check that the timeout occurred within 160ms of
1002   // the requested value.
1003   T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1004 }
1005
1006 /**
1007  * Test writing to a socket that the remote endpoint has closed
1008  */
1009 TEST(AsyncSocketTest, WritePipeError) {
1010   TestServer server;
1011
1012   // connect()
1013   EventBase evb;
1014   std::shared_ptr<AsyncSocket> socket =
1015     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1016   socket->setSendTimeout(1000);
1017   evb.loop(); // loop until the socket is connected
1018
1019   // accept and immediately close the socket
1020   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1021   acceptedSocket.reset();
1022
1023   // write() a large chunk of data
1024   size_t writeLength = 8*1024*1024;
1025   scoped_array<char> buf(new char[writeLength]);
1026   memset(buf.get(), 'a', writeLength);
1027   WriteCallback wcb;
1028   socket->write(&wcb, buf.get(), writeLength);
1029
1030   evb.loop();
1031
1032   // Make sure the write failed.
1033   // It would be nice if AsyncSocketException could convey the errno value,
1034   // so that we could check for EPIPE
1035   CHECK_EQ(wcb.state, STATE_FAILED);
1036   CHECK_EQ(wcb.exception.getType(),
1037                     AsyncSocketException::INTERNAL_ERROR);
1038
1039   ASSERT_FALSE(socket->isClosedBySelf());
1040   ASSERT_FALSE(socket->isClosedByPeer());
1041 }
1042
1043 /**
1044  * Test writing a mix of simple buffers and IOBufs
1045  */
1046 TEST(AsyncSocketTest, WriteIOBuf) {
1047   TestServer server;
1048
1049   // connect()
1050   EventBase evb;
1051   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1052   ConnCallback ccb;
1053   socket->connect(&ccb, server.getAddress(), 30);
1054
1055   // Accept the connection
1056   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1057   ReadCallback rcb;
1058   acceptedSocket->setReadCB(&rcb);
1059
1060   // Write a simple buffer to the socket
1061   size_t simpleBufLength = 5;
1062   char simpleBuf[simpleBufLength];
1063   memset(simpleBuf, 'a', simpleBufLength);
1064   WriteCallback wcb;
1065   socket->write(&wcb, simpleBuf, simpleBufLength);
1066
1067   // Write a single-element IOBuf chain
1068   size_t buf1Length = 7;
1069   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1070   memset(buf1->writableData(), 'b', buf1Length);
1071   buf1->append(buf1Length);
1072   unique_ptr<IOBuf> buf1Copy(buf1->clone());
1073   WriteCallback wcb2;
1074   socket->writeChain(&wcb2, std::move(buf1));
1075
1076   // Write a multiple-element IOBuf chain
1077   size_t buf2Length = 11;
1078   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1079   memset(buf2->writableData(), 'c', buf2Length);
1080   buf2->append(buf2Length);
1081   size_t buf3Length = 13;
1082   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1083   memset(buf3->writableData(), 'd', buf3Length);
1084   buf3->append(buf3Length);
1085   buf2->appendChain(std::move(buf3));
1086   unique_ptr<IOBuf> buf2Copy(buf2->clone());
1087   buf2Copy->coalesce();
1088   WriteCallback wcb3;
1089   socket->writeChain(&wcb3, std::move(buf2));
1090   socket->shutdownWrite();
1091
1092   // Let the reads and writes run to completion
1093   evb.loop();
1094
1095   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1096   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1097   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1098
1099   // Make sure the reader got the right data in the right order
1100   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1101   CHECK_EQ(rcb.buffers.size(), 1);
1102   CHECK_EQ(rcb.buffers[0].length,
1103       simpleBufLength + buf1Length + buf2Length + buf3Length);
1104   CHECK_EQ(
1105       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1106   CHECK_EQ(
1107       memcmp(rcb.buffers[0].buffer + simpleBufLength,
1108           buf1Copy->data(), buf1Copy->length()), 0);
1109   CHECK_EQ(
1110       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1111           buf2Copy->data(), buf2Copy->length()), 0);
1112
1113   acceptedSocket->close();
1114   socket->close();
1115
1116   ASSERT_TRUE(socket->isClosedBySelf());
1117   ASSERT_FALSE(socket->isClosedByPeer());
1118 }
1119
1120 TEST(AsyncSocketTest, WriteIOBufCorked) {
1121   TestServer server;
1122
1123   // connect()
1124   EventBase evb;
1125   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1126   ConnCallback ccb;
1127   socket->connect(&ccb, server.getAddress(), 30);
1128
1129   // Accept the connection
1130   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1131   ReadCallback rcb;
1132   acceptedSocket->setReadCB(&rcb);
1133
1134   // Do three writes, 100ms apart, with the "cork" flag set
1135   // on the second write.  The reader should see the first write
1136   // arrive by itself, followed by the second and third writes
1137   // arriving together.
1138   size_t buf1Length = 5;
1139   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1140   memset(buf1->writableData(), 'a', buf1Length);
1141   buf1->append(buf1Length);
1142   size_t buf2Length = 7;
1143   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1144   memset(buf2->writableData(), 'b', buf2Length);
1145   buf2->append(buf2Length);
1146   size_t buf3Length = 11;
1147   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1148   memset(buf3->writableData(), 'c', buf3Length);
1149   buf3->append(buf3Length);
1150   WriteCallback wcb1;
1151   socket->writeChain(&wcb1, std::move(buf1));
1152   WriteCallback wcb2;
1153   DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1154   write2.scheduleTimeout(100);
1155   WriteCallback wcb3;
1156   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1157   write3.scheduleTimeout(200);
1158
1159   evb.loop();
1160   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1161   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1162   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1163   if (wcb3.state != STATE_SUCCEEDED) {
1164     throw(wcb3.exception);
1165   }
1166   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1167
1168   // Make sure the reader got the data with the right grouping
1169   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1170   CHECK_EQ(rcb.buffers.size(), 2);
1171   CHECK_EQ(rcb.buffers[0].length, buf1Length);
1172   CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1173
1174   acceptedSocket->close();
1175   socket->close();
1176
1177   ASSERT_TRUE(socket->isClosedBySelf());
1178   ASSERT_FALSE(socket->isClosedByPeer());
1179 }
1180
1181 /**
1182  * Test performing a zero-length write
1183  */
1184 TEST(AsyncSocketTest, ZeroLengthWrite) {
1185   TestServer server;
1186
1187   // connect()
1188   EventBase evb;
1189   std::shared_ptr<AsyncSocket> socket =
1190     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1191   evb.loop(); // loop until the socket is connected
1192
1193   auto acceptedSocket = server.acceptAsync(&evb);
1194   ReadCallback rcb;
1195   acceptedSocket->setReadCB(&rcb);
1196
1197   size_t len1 = 1024*1024;
1198   size_t len2 = 1024*1024;
1199   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1200   memset(buf.get(), 'a', len1);
1201   memset(buf.get(), 'b', len2);
1202
1203   WriteCallback wcb1;
1204   WriteCallback wcb2;
1205   WriteCallback wcb3;
1206   WriteCallback wcb4;
1207   socket->write(&wcb1, buf.get(), 0);
1208   socket->write(&wcb2, buf.get(), len1);
1209   socket->write(&wcb3, buf.get() + len1, 0);
1210   socket->write(&wcb4, buf.get() + len1, len2);
1211   socket->close();
1212
1213   evb.loop(); // loop until the data is sent
1214
1215   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1216   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1217   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1218   CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1219   rcb.verifyData(buf.get(), len1 + len2);
1220
1221   ASSERT_TRUE(socket->isClosedBySelf());
1222   ASSERT_FALSE(socket->isClosedByPeer());
1223 }
1224
1225 TEST(AsyncSocketTest, ZeroLengthWritev) {
1226   TestServer server;
1227
1228   // connect()
1229   EventBase evb;
1230   std::shared_ptr<AsyncSocket> socket =
1231     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1232   evb.loop(); // loop until the socket is connected
1233
1234   auto acceptedSocket = server.acceptAsync(&evb);
1235   ReadCallback rcb;
1236   acceptedSocket->setReadCB(&rcb);
1237
1238   size_t len1 = 1024*1024;
1239   size_t len2 = 1024*1024;
1240   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1241   memset(buf.get(), 'a', len1);
1242   memset(buf.get(), 'b', len2);
1243
1244   WriteCallback wcb;
1245   size_t iovCount = 4;
1246   struct iovec iov[iovCount];
1247   iov[0].iov_base = buf.get();
1248   iov[0].iov_len = len1;
1249   iov[1].iov_base = buf.get() + len1;
1250   iov[1].iov_len = 0;
1251   iov[2].iov_base = buf.get() + len1;
1252   iov[2].iov_len = len2;
1253   iov[3].iov_base = buf.get() + len1 + len2;
1254   iov[3].iov_len = 0;
1255
1256   socket->writev(&wcb, iov, iovCount);
1257   socket->close();
1258   evb.loop(); // loop until the data is sent
1259
1260   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1261   rcb.verifyData(buf.get(), len1 + len2);
1262
1263   ASSERT_TRUE(socket->isClosedBySelf());
1264   ASSERT_FALSE(socket->isClosedByPeer());
1265 }
1266
1267 ///////////////////////////////////////////////////////////////////////////
1268 // close() related tests
1269 ///////////////////////////////////////////////////////////////////////////
1270
1271 /**
1272  * Test calling close() with pending writes when the socket is already closing.
1273  */
1274 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1275   TestServer server;
1276
1277   // connect()
1278   EventBase evb;
1279   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1280   ConnCallback ccb;
1281   socket->connect(&ccb, server.getAddress(), 30);
1282
1283   // accept the socket on the server side
1284   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1285
1286   // Loop to ensure the connect has completed
1287   evb.loop();
1288
1289   // Make sure we are connected
1290   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1291
1292   // Schedule pending writes, until several write attempts have blocked
1293   char buf[128];
1294   memset(buf, 'a', sizeof(buf));
1295   typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1296   WriteCallbackVector writeCallbacks;
1297
1298   writeCallbacks.reserve(5);
1299   while (writeCallbacks.size() < 5) {
1300     std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1301
1302     socket->write(wcb.get(), buf, sizeof(buf));
1303     if (wcb->state == STATE_SUCCEEDED) {
1304       // Succeeded immediately.  Keep performing more writes
1305       continue;
1306     }
1307
1308     // This write is blocked.
1309     // Have the write callback call close() when writeError() is invoked
1310     wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1311     writeCallbacks.push_back(wcb);
1312   }
1313
1314   // Call closeNow() to immediately fail the pending writes
1315   socket->closeNow();
1316
1317   // Make sure writeError() was invoked on all of the pending write callbacks
1318   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1319        it != writeCallbacks.end();
1320        ++it) {
1321     CHECK_EQ((*it)->state, STATE_FAILED);
1322   }
1323
1324   ASSERT_TRUE(socket->isClosedBySelf());
1325   ASSERT_FALSE(socket->isClosedByPeer());
1326 }
1327
1328 ///////////////////////////////////////////////////////////////////////////
1329 // ImmediateRead related tests
1330 ///////////////////////////////////////////////////////////////////////////
1331
1332 /* AsyncSocket use to verify immediate read works */
1333 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1334  public:
1335   bool immediateReadCalled = false;
1336   explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1337  protected:
1338   void checkForImmediateRead() noexcept override {
1339     immediateReadCalled = true;
1340     AsyncSocket::handleRead();
1341   }
1342 };
1343
1344 TEST(AsyncSocket, ConnectReadImmediateRead) {
1345   TestServer server;
1346
1347   const size_t maxBufferSz = 100;
1348   const size_t maxReadsPerEvent = 1;
1349   const size_t expectedDataSz = maxBufferSz * 3;
1350   char expectedData[expectedDataSz];
1351   memset(expectedData, 'j', expectedDataSz);
1352
1353   EventBase evb;
1354   ReadCallback rcb(maxBufferSz);
1355   AsyncSocketImmediateRead socket(&evb);
1356   socket.connect(nullptr, server.getAddress(), 30);
1357
1358   evb.loop(); // loop until the socket is connected
1359
1360   socket.setReadCB(&rcb);
1361   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1362   socket.immediateReadCalled = false;
1363
1364   auto acceptedSocket = server.acceptAsync(&evb);
1365
1366   ReadCallback rcbServer;
1367   WriteCallback wcbServer;
1368   rcbServer.dataAvailableCallback = [&]() {
1369     if (rcbServer.dataRead() == expectedDataSz) {
1370       // write back all data read
1371       rcbServer.verifyData(expectedData, expectedDataSz);
1372       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1373       acceptedSocket->close();
1374     }
1375   };
1376   acceptedSocket->setReadCB(&rcbServer);
1377
1378   // write data
1379   WriteCallback wcb1;
1380   socket.write(&wcb1, expectedData, expectedDataSz);
1381   evb.loop();
1382   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1383   rcb.verifyData(expectedData, expectedDataSz);
1384   CHECK_EQ(socket.immediateReadCalled, true);
1385
1386   ASSERT_FALSE(socket.isClosedBySelf());
1387   ASSERT_FALSE(socket.isClosedByPeer());
1388 }
1389
1390 TEST(AsyncSocket, ConnectReadUninstallRead) {
1391   TestServer server;
1392
1393   const size_t maxBufferSz = 100;
1394   const size_t maxReadsPerEvent = 1;
1395   const size_t expectedDataSz = maxBufferSz * 3;
1396   char expectedData[expectedDataSz];
1397   memset(expectedData, 'k', expectedDataSz);
1398
1399   EventBase evb;
1400   ReadCallback rcb(maxBufferSz);
1401   AsyncSocketImmediateRead socket(&evb);
1402   socket.connect(nullptr, server.getAddress(), 30);
1403
1404   evb.loop(); // loop until the socket is connected
1405
1406   socket.setReadCB(&rcb);
1407   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1408   socket.immediateReadCalled = false;
1409
1410   auto acceptedSocket = server.acceptAsync(&evb);
1411
1412   ReadCallback rcbServer;
1413   WriteCallback wcbServer;
1414   rcbServer.dataAvailableCallback = [&]() {
1415     if (rcbServer.dataRead() == expectedDataSz) {
1416       // write back all data read
1417       rcbServer.verifyData(expectedData, expectedDataSz);
1418       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1419       acceptedSocket->close();
1420     }
1421   };
1422   acceptedSocket->setReadCB(&rcbServer);
1423
1424   rcb.dataAvailableCallback = [&]() {
1425     // we read data and reset readCB
1426     socket.setReadCB(nullptr);
1427   };
1428
1429   // write data
1430   WriteCallback wcb;
1431   socket.write(&wcb, expectedData, expectedDataSz);
1432   evb.loop();
1433   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1434
1435   /* we shoud've only read maxBufferSz data since readCallback_
1436    * was reset in dataAvailableCallback */
1437   CHECK_EQ(rcb.dataRead(), maxBufferSz);
1438   CHECK_EQ(socket.immediateReadCalled, false);
1439
1440   ASSERT_FALSE(socket.isClosedBySelf());
1441   ASSERT_FALSE(socket.isClosedByPeer());
1442 }
1443
1444 // TODO:
1445 // - Test connect() and have the connect callback set the read callback
1446 // - Test connect() and have the connect callback unset the read callback
1447 // - Test reading/writing/closing/destroying the socket in the connect callback
1448 // - Test reading/writing/closing/destroying the socket in the read callback
1449 // - Test reading/writing/closing/destroying the socket in the write callback
1450 // - Test one-way shutdown behavior
1451 // - Test changing the EventBase
1452 //
1453 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1454 //   in connectSuccess(), readDataAvailable(), writeSuccess()
1455
1456
1457 ///////////////////////////////////////////////////////////////////////////
1458 // AsyncServerSocket tests
1459 ///////////////////////////////////////////////////////////////////////////
1460 namespace {
1461 /**
1462  * Helper ConnectionEventCallback class for the test code.
1463  * It maintains counters protected by a spin lock.
1464  */
1465 class TestConnectionEventCallback :
1466   public AsyncServerSocket::ConnectionEventCallback {
1467  public:
1468   virtual void onConnectionAccepted(
1469       const int /* socket */,
1470       const SocketAddress& /* addr */) noexcept override {
1471     folly::RWSpinLock::WriteHolder holder(spinLock_);
1472     connectionAccepted_++;
1473   }
1474
1475   virtual void onConnectionAcceptError(const int /* err */) noexcept override {
1476     folly::RWSpinLock::WriteHolder holder(spinLock_);
1477     connectionAcceptedError_++;
1478   }
1479
1480   virtual void onConnectionDropped(
1481       const int /* socket */,
1482       const SocketAddress& /* addr */) noexcept override {
1483     folly::RWSpinLock::WriteHolder holder(spinLock_);
1484     connectionDropped_++;
1485   }
1486
1487   virtual void onConnectionEnqueuedForAcceptorCallback(
1488       const int /* socket */,
1489       const SocketAddress& /* addr */) noexcept override {
1490     folly::RWSpinLock::WriteHolder holder(spinLock_);
1491     connectionEnqueuedForAcceptCallback_++;
1492   }
1493
1494   virtual void onConnectionDequeuedByAcceptorCallback(
1495       const int /* socket */,
1496       const SocketAddress& /* addr */) noexcept override {
1497     folly::RWSpinLock::WriteHolder holder(spinLock_);
1498     connectionDequeuedByAcceptCallback_++;
1499   }
1500
1501   virtual void onBackoffStarted() noexcept override {
1502     folly::RWSpinLock::WriteHolder holder(spinLock_);
1503     backoffStarted_++;
1504   }
1505
1506   virtual void onBackoffEnded() noexcept override {
1507     folly::RWSpinLock::WriteHolder holder(spinLock_);
1508     backoffEnded_++;
1509   }
1510
1511   virtual void onBackoffError() noexcept override {
1512     folly::RWSpinLock::WriteHolder holder(spinLock_);
1513     backoffError_++;
1514   }
1515
1516   unsigned int getConnectionAccepted() const {
1517     folly::RWSpinLock::ReadHolder holder(spinLock_);
1518     return connectionAccepted_;
1519   }
1520
1521   unsigned int getConnectionAcceptedError() const {
1522     folly::RWSpinLock::ReadHolder holder(spinLock_);
1523     return connectionAcceptedError_;
1524   }
1525
1526   unsigned int getConnectionDropped() const {
1527     folly::RWSpinLock::ReadHolder holder(spinLock_);
1528     return connectionDropped_;
1529   }
1530
1531   unsigned int getConnectionEnqueuedForAcceptCallback() const {
1532     folly::RWSpinLock::ReadHolder holder(spinLock_);
1533     return connectionEnqueuedForAcceptCallback_;
1534   }
1535
1536   unsigned int getConnectionDequeuedByAcceptCallback() const {
1537     folly::RWSpinLock::ReadHolder holder(spinLock_);
1538     return connectionDequeuedByAcceptCallback_;
1539   }
1540
1541   unsigned int getBackoffStarted() const {
1542     folly::RWSpinLock::ReadHolder holder(spinLock_);
1543     return backoffStarted_;
1544   }
1545
1546   unsigned int getBackoffEnded() const {
1547     folly::RWSpinLock::ReadHolder holder(spinLock_);
1548     return backoffEnded_;
1549   }
1550
1551   unsigned int getBackoffError() const {
1552     folly::RWSpinLock::ReadHolder holder(spinLock_);
1553     return backoffError_;
1554   }
1555
1556  private:
1557   mutable folly::RWSpinLock spinLock_;
1558   unsigned int connectionAccepted_{0};
1559   unsigned int connectionAcceptedError_{0};
1560   unsigned int connectionDropped_{0};
1561   unsigned int connectionEnqueuedForAcceptCallback_{0};
1562   unsigned int connectionDequeuedByAcceptCallback_{0};
1563   unsigned int backoffStarted_{0};
1564   unsigned int backoffEnded_{0};
1565   unsigned int backoffError_{0};
1566 };
1567
1568 /**
1569  * Helper AcceptCallback class for the test code
1570  * It records the callbacks that were invoked, and also supports calling
1571  * generic std::function objects in each callback.
1572  */
1573 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1574  public:
1575   enum EventType {
1576     TYPE_START,
1577     TYPE_ACCEPT,
1578     TYPE_ERROR,
1579     TYPE_STOP
1580   };
1581   struct EventInfo {
1582     EventInfo(int fd, const folly::SocketAddress& addr)
1583       : type(TYPE_ACCEPT),
1584         fd(fd),
1585         address(addr),
1586         errorMsg() {}
1587     explicit EventInfo(const std::string& msg)
1588       : type(TYPE_ERROR),
1589         fd(-1),
1590         address(),
1591         errorMsg(msg) {}
1592     explicit EventInfo(EventType et)
1593       : type(et),
1594         fd(-1),
1595         address(),
1596         errorMsg() {}
1597
1598     EventType type;
1599     int fd;  // valid for TYPE_ACCEPT
1600     folly::SocketAddress address;  // valid for TYPE_ACCEPT
1601     string errorMsg;  // valid for TYPE_ERROR
1602   };
1603   typedef std::deque<EventInfo> EventList;
1604
1605   TestAcceptCallback()
1606     : connectionAcceptedFn_(),
1607       acceptErrorFn_(),
1608       acceptStoppedFn_(),
1609       events_() {}
1610
1611   std::deque<EventInfo>* getEvents() {
1612     return &events_;
1613   }
1614
1615   void setConnectionAcceptedFn(
1616       const std::function<void(int, const folly::SocketAddress&)>& fn) {
1617     connectionAcceptedFn_ = fn;
1618   }
1619   void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1620     acceptErrorFn_ = fn;
1621   }
1622   void setAcceptStartedFn(const std::function<void()>& fn) {
1623     acceptStartedFn_ = fn;
1624   }
1625   void setAcceptStoppedFn(const std::function<void()>& fn) {
1626     acceptStoppedFn_ = fn;
1627   }
1628
1629   void connectionAccepted(
1630       int fd, const folly::SocketAddress& clientAddr) noexcept override {
1631     events_.emplace_back(fd, clientAddr);
1632
1633     if (connectionAcceptedFn_) {
1634       connectionAcceptedFn_(fd, clientAddr);
1635     }
1636   }
1637   void acceptError(const std::exception& ex) noexcept override {
1638     events_.emplace_back(ex.what());
1639
1640     if (acceptErrorFn_) {
1641       acceptErrorFn_(ex);
1642     }
1643   }
1644   void acceptStarted() noexcept override {
1645     events_.emplace_back(TYPE_START);
1646
1647     if (acceptStartedFn_) {
1648       acceptStartedFn_();
1649     }
1650   }
1651   void acceptStopped() noexcept override {
1652     events_.emplace_back(TYPE_STOP);
1653
1654     if (acceptStoppedFn_) {
1655       acceptStoppedFn_();
1656     }
1657   }
1658
1659  private:
1660   std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1661   std::function<void(const std::exception&)> acceptErrorFn_;
1662   std::function<void()> acceptStartedFn_;
1663   std::function<void()> acceptStoppedFn_;
1664
1665   std::deque<EventInfo> events_;
1666 };
1667 }
1668
1669 /**
1670  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1671  */
1672 TEST(AsyncSocketTest, ServerAcceptOptions) {
1673   EventBase eventBase;
1674
1675   // Create a server socket
1676   std::shared_ptr<AsyncServerSocket> serverSocket(
1677       AsyncServerSocket::newSocket(&eventBase));
1678   serverSocket->bind(0);
1679   serverSocket->listen(16);
1680   folly::SocketAddress serverAddress;
1681   serverSocket->getAddress(&serverAddress);
1682
1683   // Add a callback to accept one connection then stop the loop
1684   TestAcceptCallback acceptCallback;
1685   acceptCallback.setConnectionAcceptedFn(
1686       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1687         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1688       });
1689   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1690     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1691   });
1692   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1693   serverSocket->startAccepting();
1694
1695   // Connect to the server socket
1696   std::shared_ptr<AsyncSocket> socket(
1697       AsyncSocket::newSocket(&eventBase, serverAddress));
1698
1699   eventBase.loop();
1700
1701   // Verify that the server accepted a connection
1702   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1703   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1704                     TestAcceptCallback::TYPE_START);
1705   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1706                     TestAcceptCallback::TYPE_ACCEPT);
1707   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1708                     TestAcceptCallback::TYPE_STOP);
1709   int fd = acceptCallback.getEvents()->at(1).fd;
1710
1711   // The accepted connection should already be in non-blocking mode
1712   int flags = fcntl(fd, F_GETFL, 0);
1713   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1714
1715 #ifndef TCP_NOPUSH
1716   // The accepted connection should already have TCP_NODELAY set
1717   int value;
1718   socklen_t valueLength = sizeof(value);
1719   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1720   CHECK_EQ(rc, 0);
1721   CHECK_EQ(value, 1);
1722 #endif
1723 }
1724
1725 /**
1726  * Test AsyncServerSocket::removeAcceptCallback()
1727  */
1728 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1729   // Create a new AsyncServerSocket
1730   EventBase eventBase;
1731   std::shared_ptr<AsyncServerSocket> serverSocket(
1732       AsyncServerSocket::newSocket(&eventBase));
1733   serverSocket->bind(0);
1734   serverSocket->listen(16);
1735   folly::SocketAddress serverAddress;
1736   serverSocket->getAddress(&serverAddress);
1737
1738   // Add several accept callbacks
1739   TestAcceptCallback cb1;
1740   TestAcceptCallback cb2;
1741   TestAcceptCallback cb3;
1742   TestAcceptCallback cb4;
1743   TestAcceptCallback cb5;
1744   TestAcceptCallback cb6;
1745   TestAcceptCallback cb7;
1746
1747   // Test having callbacks remove other callbacks before them on the list,
1748   // after them on the list, or removing themselves.
1749   //
1750   // Have callback 2 remove callback 3 and callback 5 the first time it is
1751   // called.
1752   int cb2Count = 0;
1753   cb1.setConnectionAcceptedFn([&](int /* fd */,
1754                                   const folly::SocketAddress& /* addr */) {
1755     std::shared_ptr<AsyncSocket> sock2(
1756         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1757   });
1758   cb3.setConnectionAcceptedFn(
1759       [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1760   cb4.setConnectionAcceptedFn(
1761       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1762         std::shared_ptr<AsyncSocket> sock3(
1763             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1764       });
1765   cb5.setConnectionAcceptedFn(
1766       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1767         std::shared_ptr<AsyncSocket> sock5(
1768             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1769
1770       });
1771   cb2.setConnectionAcceptedFn(
1772       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1773         if (cb2Count == 0) {
1774           serverSocket->removeAcceptCallback(&cb3, nullptr);
1775           serverSocket->removeAcceptCallback(&cb5, nullptr);
1776         }
1777         ++cb2Count;
1778       });
1779   // Have callback 6 remove callback 4 the first time it is called,
1780   // and destroy the server socket the second time it is called
1781   int cb6Count = 0;
1782   cb6.setConnectionAcceptedFn(
1783       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1784         if (cb6Count == 0) {
1785           serverSocket->removeAcceptCallback(&cb4, nullptr);
1786           std::shared_ptr<AsyncSocket> sock6(
1787               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1788           std::shared_ptr<AsyncSocket> sock7(
1789               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1790           std::shared_ptr<AsyncSocket> sock8(
1791               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1792
1793         } else {
1794           serverSocket.reset();
1795         }
1796         ++cb6Count;
1797       });
1798   // Have callback 7 remove itself
1799   cb7.setConnectionAcceptedFn(
1800       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1801         serverSocket->removeAcceptCallback(&cb7, nullptr);
1802       });
1803
1804   serverSocket->addAcceptCallback(&cb1, nullptr);
1805   serverSocket->addAcceptCallback(&cb2, nullptr);
1806   serverSocket->addAcceptCallback(&cb3, nullptr);
1807   serverSocket->addAcceptCallback(&cb4, nullptr);
1808   serverSocket->addAcceptCallback(&cb5, nullptr);
1809   serverSocket->addAcceptCallback(&cb6, nullptr);
1810   serverSocket->addAcceptCallback(&cb7, nullptr);
1811   serverSocket->startAccepting();
1812
1813   // Make several connections to the socket
1814   std::shared_ptr<AsyncSocket> sock1(
1815       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1816   std::shared_ptr<AsyncSocket> sock4(
1817       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1818
1819   // Loop until we are stopped
1820   eventBase.loop();
1821
1822   // Check to make sure that the expected callbacks were invoked.
1823   //
1824   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1825   // the AcceptCallbacks in round-robin fashion, in the order that they were
1826   // added.  The code is implemented this way right now, but the API doesn't
1827   // explicitly require it be done this way.  If we change the code not to be
1828   // exactly round robin in the future, we can simplify the test checks here.
1829   // (We'll also need to update the termination code, since we expect cb6 to
1830   // get called twice to terminate the loop.)
1831   CHECK_EQ(cb1.getEvents()->size(), 4);
1832   CHECK_EQ(cb1.getEvents()->at(0).type,
1833                     TestAcceptCallback::TYPE_START);
1834   CHECK_EQ(cb1.getEvents()->at(1).type,
1835                     TestAcceptCallback::TYPE_ACCEPT);
1836   CHECK_EQ(cb1.getEvents()->at(2).type,
1837                     TestAcceptCallback::TYPE_ACCEPT);
1838   CHECK_EQ(cb1.getEvents()->at(3).type,
1839                     TestAcceptCallback::TYPE_STOP);
1840
1841   CHECK_EQ(cb2.getEvents()->size(), 4);
1842   CHECK_EQ(cb2.getEvents()->at(0).type,
1843                     TestAcceptCallback::TYPE_START);
1844   CHECK_EQ(cb2.getEvents()->at(1).type,
1845                     TestAcceptCallback::TYPE_ACCEPT);
1846   CHECK_EQ(cb2.getEvents()->at(2).type,
1847                     TestAcceptCallback::TYPE_ACCEPT);
1848   CHECK_EQ(cb2.getEvents()->at(3).type,
1849                     TestAcceptCallback::TYPE_STOP);
1850
1851   CHECK_EQ(cb3.getEvents()->size(), 2);
1852   CHECK_EQ(cb3.getEvents()->at(0).type,
1853                     TestAcceptCallback::TYPE_START);
1854   CHECK_EQ(cb3.getEvents()->at(1).type,
1855                     TestAcceptCallback::TYPE_STOP);
1856
1857   CHECK_EQ(cb4.getEvents()->size(), 3);
1858   CHECK_EQ(cb4.getEvents()->at(0).type,
1859                     TestAcceptCallback::TYPE_START);
1860   CHECK_EQ(cb4.getEvents()->at(1).type,
1861                     TestAcceptCallback::TYPE_ACCEPT);
1862   CHECK_EQ(cb4.getEvents()->at(2).type,
1863                     TestAcceptCallback::TYPE_STOP);
1864
1865   CHECK_EQ(cb5.getEvents()->size(), 2);
1866   CHECK_EQ(cb5.getEvents()->at(0).type,
1867                     TestAcceptCallback::TYPE_START);
1868   CHECK_EQ(cb5.getEvents()->at(1).type,
1869                     TestAcceptCallback::TYPE_STOP);
1870
1871   CHECK_EQ(cb6.getEvents()->size(), 4);
1872   CHECK_EQ(cb6.getEvents()->at(0).type,
1873                     TestAcceptCallback::TYPE_START);
1874   CHECK_EQ(cb6.getEvents()->at(1).type,
1875                     TestAcceptCallback::TYPE_ACCEPT);
1876   CHECK_EQ(cb6.getEvents()->at(2).type,
1877                     TestAcceptCallback::TYPE_ACCEPT);
1878   CHECK_EQ(cb6.getEvents()->at(3).type,
1879                     TestAcceptCallback::TYPE_STOP);
1880
1881   CHECK_EQ(cb7.getEvents()->size(), 3);
1882   CHECK_EQ(cb7.getEvents()->at(0).type,
1883                     TestAcceptCallback::TYPE_START);
1884   CHECK_EQ(cb7.getEvents()->at(1).type,
1885                     TestAcceptCallback::TYPE_ACCEPT);
1886   CHECK_EQ(cb7.getEvents()->at(2).type,
1887                     TestAcceptCallback::TYPE_STOP);
1888 }
1889
1890 /**
1891  * Test AsyncServerSocket::removeAcceptCallback()
1892  */
1893 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1894   // Create a new AsyncServerSocket
1895   EventBase eventBase;
1896   std::shared_ptr<AsyncServerSocket> serverSocket(
1897       AsyncServerSocket::newSocket(&eventBase));
1898   serverSocket->bind(0);
1899   serverSocket->listen(16);
1900   folly::SocketAddress serverAddress;
1901   serverSocket->getAddress(&serverAddress);
1902
1903   // Add several accept callbacks
1904   TestAcceptCallback cb1;
1905   auto thread_id = pthread_self();
1906   cb1.setAcceptStartedFn([&](){
1907     CHECK_NE(thread_id, pthread_self());
1908     thread_id = pthread_self();
1909   });
1910   cb1.setConnectionAcceptedFn(
1911       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1912         CHECK_EQ(thread_id, pthread_self());
1913         serverSocket->removeAcceptCallback(&cb1, nullptr);
1914       });
1915   cb1.setAcceptStoppedFn([&](){
1916     CHECK_EQ(thread_id, pthread_self());
1917   });
1918
1919   // Test having callbacks remove other callbacks before them on the list,
1920   serverSocket->addAcceptCallback(&cb1, nullptr);
1921   serverSocket->startAccepting();
1922
1923   // Make several connections to the socket
1924   std::shared_ptr<AsyncSocket> sock1(
1925       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1926
1927   // Loop in another thread
1928   auto other = std::thread([&](){
1929     eventBase.loop();
1930   });
1931   other.join();
1932
1933   // Check to make sure that the expected callbacks were invoked.
1934   //
1935   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1936   // the AcceptCallbacks in round-robin fashion, in the order that they were
1937   // added.  The code is implemented this way right now, but the API doesn't
1938   // explicitly require it be done this way.  If we change the code not to be
1939   // exactly round robin in the future, we can simplify the test checks here.
1940   // (We'll also need to update the termination code, since we expect cb6 to
1941   // get called twice to terminate the loop.)
1942   CHECK_EQ(cb1.getEvents()->size(), 3);
1943   CHECK_EQ(cb1.getEvents()->at(0).type,
1944                     TestAcceptCallback::TYPE_START);
1945   CHECK_EQ(cb1.getEvents()->at(1).type,
1946                     TestAcceptCallback::TYPE_ACCEPT);
1947   CHECK_EQ(cb1.getEvents()->at(2).type,
1948                     TestAcceptCallback::TYPE_STOP);
1949
1950 }
1951
1952 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1953   // Add a callback to accept one connection then stop accepting
1954   TestAcceptCallback acceptCallback;
1955   acceptCallback.setConnectionAcceptedFn(
1956       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1957         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1958       });
1959   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1960     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1961   });
1962   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1963   serverSocket->startAccepting();
1964
1965   // Connect to the server socket
1966   EventBase* eventBase = serverSocket->getEventBase();
1967   folly::SocketAddress serverAddress;
1968   serverSocket->getAddress(&serverAddress);
1969   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1970
1971   // Loop to process all events
1972   eventBase->loop();
1973
1974   // Verify that the server accepted a connection
1975   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1976   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1977                     TestAcceptCallback::TYPE_START);
1978   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1979                     TestAcceptCallback::TYPE_ACCEPT);
1980   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1981                     TestAcceptCallback::TYPE_STOP);
1982 }
1983
1984 /* Verify that we don't leak sockets if we are destroyed()
1985  * and there are still writes pending
1986  *
1987  * If destroy() only calls close() instead of closeNow(),
1988  * it would shutdown(writes) on the socket, but it would
1989  * never be close()'d, and the socket would leak
1990  */
1991 TEST(AsyncSocketTest, DestroyCloseTest) {
1992   TestServer server;
1993
1994   // connect()
1995   EventBase clientEB;
1996   EventBase serverEB;
1997   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1998   ConnCallback ccb;
1999   socket->connect(&ccb, server.getAddress(), 30);
2000
2001   // Accept the connection
2002   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
2003   ReadCallback rcb;
2004   acceptedSocket->setReadCB(&rcb);
2005
2006   // Write a large buffer to the socket that is larger than kernel buffer
2007   size_t simpleBufLength = 5000000;
2008   char* simpleBuf = new char[simpleBufLength];
2009   memset(simpleBuf, 'a', simpleBufLength);
2010   WriteCallback wcb;
2011
2012   // Let the reads and writes run to completion
2013   int fd = acceptedSocket->getFd();
2014
2015   acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2016   socket.reset();
2017   acceptedSocket.reset();
2018
2019   // Test that server socket was closed
2020   ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2021   CHECK_EQ(sz, -1);
2022   CHECK_EQ(errno, 9);
2023   delete[] simpleBuf;
2024 }
2025
2026 /**
2027  * Test AsyncServerSocket::useExistingSocket()
2028  */
2029 TEST(AsyncSocketTest, ServerExistingSocket) {
2030   EventBase eventBase;
2031
2032   // Test creating a socket, and letting AsyncServerSocket bind and listen
2033   {
2034     // Manually create a socket
2035     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2036     ASSERT_GE(fd, 0);
2037
2038     // Create a server socket
2039     AsyncServerSocket::UniquePtr serverSocket(
2040         new AsyncServerSocket(&eventBase));
2041     serverSocket->useExistingSocket(fd);
2042     folly::SocketAddress address;
2043     serverSocket->getAddress(&address);
2044     address.setPort(0);
2045     serverSocket->bind(address);
2046     serverSocket->listen(16);
2047
2048     // Make sure the socket works
2049     serverSocketSanityTest(serverSocket.get());
2050   }
2051
2052   // Test creating a socket and binding manually,
2053   // then letting AsyncServerSocket listen
2054   {
2055     // Manually create a socket
2056     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2057     ASSERT_GE(fd, 0);
2058     // bind
2059     struct sockaddr_in addr;
2060     addr.sin_family = AF_INET;
2061     addr.sin_port = 0;
2062     addr.sin_addr.s_addr = INADDR_ANY;
2063     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2064                              sizeof(addr)), 0);
2065     // Look up the address that we bound to
2066     folly::SocketAddress boundAddress;
2067     boundAddress.setFromLocalAddress(fd);
2068
2069     // Create a server socket
2070     AsyncServerSocket::UniquePtr serverSocket(
2071         new AsyncServerSocket(&eventBase));
2072     serverSocket->useExistingSocket(fd);
2073     serverSocket->listen(16);
2074
2075     // Make sure AsyncServerSocket reports the same address that we bound to
2076     folly::SocketAddress serverSocketAddress;
2077     serverSocket->getAddress(&serverSocketAddress);
2078     CHECK_EQ(boundAddress, serverSocketAddress);
2079
2080     // Make sure the socket works
2081     serverSocketSanityTest(serverSocket.get());
2082   }
2083
2084   // Test creating a socket, binding and listening manually,
2085   // then giving it to AsyncServerSocket
2086   {
2087     // Manually create a socket
2088     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2089     ASSERT_GE(fd, 0);
2090     // bind
2091     struct sockaddr_in addr;
2092     addr.sin_family = AF_INET;
2093     addr.sin_port = 0;
2094     addr.sin_addr.s_addr = INADDR_ANY;
2095     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2096                              sizeof(addr)), 0);
2097     // Look up the address that we bound to
2098     folly::SocketAddress boundAddress;
2099     boundAddress.setFromLocalAddress(fd);
2100     // listen
2101     CHECK_EQ(listen(fd, 16), 0);
2102
2103     // Create a server socket
2104     AsyncServerSocket::UniquePtr serverSocket(
2105         new AsyncServerSocket(&eventBase));
2106     serverSocket->useExistingSocket(fd);
2107
2108     // Make sure AsyncServerSocket reports the same address that we bound to
2109     folly::SocketAddress serverSocketAddress;
2110     serverSocket->getAddress(&serverSocketAddress);
2111     CHECK_EQ(boundAddress, serverSocketAddress);
2112
2113     // Make sure the socket works
2114     serverSocketSanityTest(serverSocket.get());
2115   }
2116 }
2117
2118 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2119   EventBase eventBase;
2120
2121   // Create a server socket
2122   std::shared_ptr<AsyncServerSocket> serverSocket(
2123       AsyncServerSocket::newSocket(&eventBase));
2124   string path(1, 0);
2125   path.append("/anonymous");
2126   folly::SocketAddress serverAddress;
2127   serverAddress.setFromPath(path);
2128   serverSocket->bind(serverAddress);
2129   serverSocket->listen(16);
2130
2131   // Add a callback to accept one connection then stop the loop
2132   TestAcceptCallback acceptCallback;
2133   acceptCallback.setConnectionAcceptedFn(
2134       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2135         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2136       });
2137   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2138     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2139   });
2140   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2141   serverSocket->startAccepting();
2142
2143   // Connect to the server socket
2144   std::shared_ptr<AsyncSocket> socket(
2145       AsyncSocket::newSocket(&eventBase, serverAddress));
2146
2147   eventBase.loop();
2148
2149   // Verify that the server accepted a connection
2150   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2151   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2152                     TestAcceptCallback::TYPE_START);
2153   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2154                     TestAcceptCallback::TYPE_ACCEPT);
2155   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2156                     TestAcceptCallback::TYPE_STOP);
2157   int fd = acceptCallback.getEvents()->at(1).fd;
2158
2159   // The accepted connection should already be in non-blocking mode
2160   int flags = fcntl(fd, F_GETFL, 0);
2161   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2162 }
2163
2164 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2165   EventBase eventBase;
2166   TestConnectionEventCallback connectionEventCallback;
2167
2168   // Create a server socket
2169   std::shared_ptr<AsyncServerSocket> serverSocket(
2170       AsyncServerSocket::newSocket(&eventBase));
2171   serverSocket->setConnectionEventCallback(&connectionEventCallback);
2172   serverSocket->bind(0);
2173   serverSocket->listen(16);
2174   folly::SocketAddress serverAddress;
2175   serverSocket->getAddress(&serverAddress);
2176
2177   // Add a callback to accept one connection then stop the loop
2178   TestAcceptCallback acceptCallback;
2179   acceptCallback.setConnectionAcceptedFn(
2180       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2181         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2182       });
2183   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2184     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2185   });
2186   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2187   serverSocket->startAccepting();
2188
2189   // Connect to the server socket
2190   std::shared_ptr<AsyncSocket> socket(
2191       AsyncSocket::newSocket(&eventBase, serverAddress));
2192
2193   eventBase.loop();
2194
2195   // Validate the connection event counters
2196   ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2197   ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2198   ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2199   ASSERT_EQ(
2200       connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2201   ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2202   ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2203   ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2204   ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2205 }
2206
2207 /**
2208  * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2209  */
2210 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2211   EventBase eventBase;
2212
2213   // Counter of how many connections have been accepted
2214   int count = 0;
2215
2216   // Create a server socket
2217   auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2218   serverSocket->bind(0);
2219   serverSocket->listen(16);
2220   folly::SocketAddress serverAddress;
2221   serverSocket->getAddress(&serverAddress);
2222
2223   // Add a callback to accept connections
2224   TestAcceptCallback acceptCallback;
2225   acceptCallback.setConnectionAcceptedFn(
2226       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2227         count++;
2228         CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2229
2230         if (count == 4) {
2231           // all messages are processed, remove accept callback
2232           serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2233         }
2234       });
2235   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2236     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2237   });
2238   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2239   serverSocket->startAccepting();
2240
2241   // Connect to the server socket, 4 clients, there are 4 connections
2242   auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2243   auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2244   auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2245   auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2246
2247   eventBase.loop();
2248 }
2249
2250 /**
2251  * Test AsyncTransport::BufferCallback
2252  */
2253 TEST(AsyncSocketTest, BufferTest) {
2254   TestServer server;
2255
2256   EventBase evb;
2257   AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2258   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2259   ConnCallback ccb;
2260   socket->connect(&ccb, server.getAddress(), 30, option);
2261
2262   char buf[100 * 1024];
2263   memset(buf, 'c', sizeof(buf));
2264   WriteCallback wcb;
2265   BufferCallback bcb;
2266   socket->setBufferCallback(&bcb);
2267   socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2268
2269   evb.loop();
2270   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2271   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
2272
2273   ASSERT_TRUE(bcb.hasBuffered());
2274   ASSERT_TRUE(bcb.hasBufferCleared());
2275
2276   socket->close();
2277   server.verifyConnection(buf, sizeof(buf));
2278
2279   ASSERT_TRUE(socket->isClosedBySelf());
2280   ASSERT_FALSE(socket->isClosedByPeer());
2281 }