Add connection event callback to AsyncServerSocket
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/RWSpinLock.h>
21 #include <folly/SocketAddress.h>
22
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/async/test/AsyncSocketTest.h>
25 #include <folly/io/async/test/Util.h>
26 #include <folly/test/SocketAddressTestHelper.h>
27
28 #include <gtest/gtest.h>
29 #include <boost/scoped_array.hpp>
30 #include <iostream>
31 #include <unistd.h>
32 #include <fcntl.h>
33 #include <poll.h>
34 #include <sys/types.h>
35 #include <sys/socket.h>
36 #include <netinet/tcp.h>
37 #include <thread>
38
39 using namespace boost;
40
41 using std::string;
42 using std::vector;
43 using std::min;
44 using std::cerr;
45 using std::endl;
46 using std::unique_ptr;
47 using std::chrono::milliseconds;
48 using boost::scoped_array;
49
50 using namespace folly;
51
52 class DelayedWrite: public AsyncTimeout {
53  public:
54   DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
55       unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
56       bool cork, bool lastWrite = false):
57     AsyncTimeout(socket->getEventBase()),
58     socket_(socket),
59     bufs_(std::move(bufs)),
60     wcb_(wcb),
61     cork_(cork),
62     lastWrite_(lastWrite) {}
63
64  private:
65   void timeoutExpired() noexcept override {
66     WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
67     socket_->writeChain(wcb_, std::move(bufs_), flags);
68     if (lastWrite_) {
69       socket_->shutdownWrite();
70     }
71   }
72
73   std::shared_ptr<AsyncSocket> socket_;
74   unique_ptr<IOBuf> bufs_;
75   AsyncTransportWrapper::WriteCallback* wcb_;
76   bool cork_;
77   bool lastWrite_;
78 };
79
80 ///////////////////////////////////////////////////////////////////////////
81 // connect() tests
82 ///////////////////////////////////////////////////////////////////////////
83
84 /**
85  * Test connecting to a server
86  */
87 TEST(AsyncSocketTest, Connect) {
88   // Start listening on a local port
89   TestServer server;
90
91   // Connect using a AsyncSocket
92   EventBase evb;
93   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
94   ConnCallback cb;
95   socket->connect(&cb, server.getAddress(), 30);
96
97   evb.loop();
98
99   CHECK_EQ(cb.state, STATE_SUCCEEDED);
100   EXPECT_LE(0, socket->getConnectTime().count());
101 }
102
103 /**
104  * Test connecting to a server that isn't listening
105  */
106 TEST(AsyncSocketTest, ConnectRefused) {
107   EventBase evb;
108
109   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
110
111   // Hopefully nothing is actually listening on this address
112   folly::SocketAddress addr("127.0.0.1", 65535);
113   ConnCallback cb;
114   socket->connect(&cb, addr, 30);
115
116   evb.loop();
117
118   CHECK_EQ(cb.state, STATE_FAILED);
119   CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
120   EXPECT_LE(0, socket->getConnectTime().count());
121 }
122
123 /**
124  * Test connection timeout
125  */
126 TEST(AsyncSocketTest, ConnectTimeout) {
127   EventBase evb;
128
129   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
130
131   // Try connecting to server that won't respond.
132   //
133   // This depends somewhat on the network where this test is run.
134   // Hopefully this IP will be routable but unresponsive.
135   // (Alternatively, we could try listening on a local raw socket, but that
136   // normally requires root privileges.)
137   auto host =
138       SocketAddressTestHelper::isIPv6Enabled() ?
139       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
140       SocketAddressTestHelper::isIPv4Enabled() ?
141       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
142       nullptr;
143   SocketAddress addr(host, 65535);
144   ConnCallback cb;
145   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
146
147   evb.loop();
148
149   CHECK_EQ(cb.state, STATE_FAILED);
150   CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
151
152   // Verify that we can still get the peer address after a timeout.
153   // Use case is if the client was created from a client pool, and we want
154   // to log which peer failed.
155   folly::SocketAddress peer;
156   socket->getPeerAddress(&peer);
157   CHECK_EQ(peer, addr);
158   EXPECT_LE(0, socket->getConnectTime().count());
159 }
160
161 /**
162  * Test writing immediately after connecting, without waiting for connect
163  * to finish.
164  */
165 TEST(AsyncSocketTest, ConnectAndWrite) {
166   TestServer server;
167
168   // connect()
169   EventBase evb;
170   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
171   ConnCallback ccb;
172   socket->connect(&ccb, server.getAddress(), 30);
173
174   // write()
175   char buf[128];
176   memset(buf, 'a', sizeof(buf));
177   WriteCallback wcb;
178   socket->write(&wcb, buf, sizeof(buf));
179
180   // Loop.  We don't bother accepting on the server socket yet.
181   // The kernel should be able to buffer the write request so it can succeed.
182   evb.loop();
183
184   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
185   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
186
187   // Make sure the server got a connection and received the data
188   socket->close();
189   server.verifyConnection(buf, sizeof(buf));
190
191   ASSERT_TRUE(socket->isClosedBySelf());
192   ASSERT_FALSE(socket->isClosedByPeer());
193 }
194
195 /**
196  * Test connecting using a nullptr connect callback.
197  */
198 TEST(AsyncSocketTest, ConnectNullCallback) {
199   TestServer server;
200
201   // connect()
202   EventBase evb;
203   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
204   socket->connect(nullptr, server.getAddress(), 30);
205
206   // write some data, just so we have some way of verifing
207   // that the socket works correctly after connecting
208   char buf[128];
209   memset(buf, 'a', sizeof(buf));
210   WriteCallback wcb;
211   socket->write(&wcb, buf, sizeof(buf));
212
213   evb.loop();
214
215   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
216
217   // Make sure the server got a connection and received the data
218   socket->close();
219   server.verifyConnection(buf, sizeof(buf));
220
221   ASSERT_TRUE(socket->isClosedBySelf());
222   ASSERT_FALSE(socket->isClosedByPeer());
223 }
224
225 /**
226  * Test calling both write() and close() immediately after connecting, without
227  * waiting for connect to finish.
228  *
229  * This exercises the STATE_CONNECTING_CLOSING code.
230  */
231 TEST(AsyncSocketTest, ConnectWriteAndClose) {
232   TestServer server;
233
234   // connect()
235   EventBase evb;
236   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
237   ConnCallback ccb;
238   socket->connect(&ccb, server.getAddress(), 30);
239
240   // write()
241   char buf[128];
242   memset(buf, 'a', sizeof(buf));
243   WriteCallback wcb;
244   socket->write(&wcb, buf, sizeof(buf));
245
246   // close()
247   socket->close();
248
249   // Loop.  We don't bother accepting on the server socket yet.
250   // The kernel should be able to buffer the write request so it can succeed.
251   evb.loop();
252
253   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
254   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
255
256   // Make sure the server got a connection and received the data
257   server.verifyConnection(buf, sizeof(buf));
258
259   ASSERT_TRUE(socket->isClosedBySelf());
260   ASSERT_FALSE(socket->isClosedByPeer());
261 }
262
263 /**
264  * Test calling close() immediately after connect()
265  */
266 TEST(AsyncSocketTest, ConnectAndClose) {
267   TestServer server;
268
269   // Connect using a AsyncSocket
270   EventBase evb;
271   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
272   ConnCallback ccb;
273   socket->connect(&ccb, server.getAddress(), 30);
274
275   // Hopefully the connect didn't succeed immediately.
276   // If it did, we can't exercise the close-while-connecting code path.
277   if (ccb.state == STATE_SUCCEEDED) {
278     LOG(INFO) << "connect() succeeded immediately; aborting test "
279                        "of close-during-connect behavior";
280     return;
281   }
282
283   socket->close();
284
285   // Loop, although there shouldn't be anything to do.
286   evb.loop();
287
288   // Make sure the connection was aborted
289   CHECK_EQ(ccb.state, STATE_FAILED);
290
291   ASSERT_TRUE(socket->isClosedBySelf());
292   ASSERT_FALSE(socket->isClosedByPeer());
293 }
294
295 /**
296  * Test calling closeNow() immediately after connect()
297  *
298  * This should be identical to the normal close behavior.
299  */
300 TEST(AsyncSocketTest, ConnectAndCloseNow) {
301   TestServer server;
302
303   // Connect using a AsyncSocket
304   EventBase evb;
305   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
306   ConnCallback ccb;
307   socket->connect(&ccb, server.getAddress(), 30);
308
309   // Hopefully the connect didn't succeed immediately.
310   // If it did, we can't exercise the close-while-connecting code path.
311   if (ccb.state == STATE_SUCCEEDED) {
312     LOG(INFO) << "connect() succeeded immediately; aborting test "
313                        "of closeNow()-during-connect behavior";
314     return;
315   }
316
317   socket->closeNow();
318
319   // Loop, although there shouldn't be anything to do.
320   evb.loop();
321
322   // Make sure the connection was aborted
323   CHECK_EQ(ccb.state, STATE_FAILED);
324
325   ASSERT_TRUE(socket->isClosedBySelf());
326   ASSERT_FALSE(socket->isClosedByPeer());
327 }
328
329 /**
330  * Test calling both write() and closeNow() immediately after connecting,
331  * without waiting for connect to finish.
332  *
333  * This should abort the pending write.
334  */
335 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
336   TestServer server;
337
338   // connect()
339   EventBase evb;
340   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
341   ConnCallback ccb;
342   socket->connect(&ccb, server.getAddress(), 30);
343
344   // Hopefully the connect didn't succeed immediately.
345   // If it did, we can't exercise the close-while-connecting code path.
346   if (ccb.state == STATE_SUCCEEDED) {
347     LOG(INFO) << "connect() succeeded immediately; aborting test "
348                        "of write-during-connect behavior";
349     return;
350   }
351
352   // write()
353   char buf[128];
354   memset(buf, 'a', sizeof(buf));
355   WriteCallback wcb;
356   socket->write(&wcb, buf, sizeof(buf));
357
358   // close()
359   socket->closeNow();
360
361   // Loop, although there shouldn't be anything to do.
362   evb.loop();
363
364   CHECK_EQ(ccb.state, STATE_FAILED);
365   CHECK_EQ(wcb.state, STATE_FAILED);
366
367   ASSERT_TRUE(socket->isClosedBySelf());
368   ASSERT_FALSE(socket->isClosedByPeer());
369 }
370
371 /**
372  * Test installing a read callback immediately, before connect() finishes.
373  */
374 TEST(AsyncSocketTest, ConnectAndRead) {
375   TestServer server;
376
377   // connect()
378   EventBase evb;
379   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
380   ConnCallback ccb;
381   socket->connect(&ccb, server.getAddress(), 30);
382
383   ReadCallback rcb;
384   socket->setReadCB(&rcb);
385
386   // Even though we haven't looped yet, we should be able to accept
387   // the connection and send data to it.
388   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
389   uint8_t buf[128];
390   memset(buf, 'a', sizeof(buf));
391   acceptedSocket->write(buf, sizeof(buf));
392   acceptedSocket->flush();
393   acceptedSocket->close();
394
395   // Loop, although there shouldn't be anything to do.
396   evb.loop();
397
398   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
399   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
400   CHECK_EQ(rcb.buffers.size(), 1);
401   CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
402   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
403
404   ASSERT_FALSE(socket->isClosedBySelf());
405   ASSERT_FALSE(socket->isClosedByPeer());
406 }
407
408 /**
409  * Test installing a read callback and then closing immediately before the
410  * connect attempt finishes.
411  */
412 TEST(AsyncSocketTest, ConnectReadAndClose) {
413   TestServer server;
414
415   // connect()
416   EventBase evb;
417   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
418   ConnCallback ccb;
419   socket->connect(&ccb, server.getAddress(), 30);
420
421   // Hopefully the connect didn't succeed immediately.
422   // If it did, we can't exercise the close-while-connecting code path.
423   if (ccb.state == STATE_SUCCEEDED) {
424     LOG(INFO) << "connect() succeeded immediately; aborting test "
425                        "of read-during-connect behavior";
426     return;
427   }
428
429   ReadCallback rcb;
430   socket->setReadCB(&rcb);
431
432   // close()
433   socket->close();
434
435   // Loop, although there shouldn't be anything to do.
436   evb.loop();
437
438   CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
439   CHECK_EQ(rcb.buffers.size(), 0);
440   CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
441
442   ASSERT_TRUE(socket->isClosedBySelf());
443   ASSERT_FALSE(socket->isClosedByPeer());
444 }
445
446 /**
447  * Test both writing and installing a read callback immediately,
448  * before connect() finishes.
449  */
450 TEST(AsyncSocketTest, ConnectWriteAndRead) {
451   TestServer server;
452
453   // connect()
454   EventBase evb;
455   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
456   ConnCallback ccb;
457   socket->connect(&ccb, server.getAddress(), 30);
458
459   // write()
460   char buf1[128];
461   memset(buf1, 'a', sizeof(buf1));
462   WriteCallback wcb;
463   socket->write(&wcb, buf1, sizeof(buf1));
464
465   // set a read callback
466   ReadCallback rcb;
467   socket->setReadCB(&rcb);
468
469   // Even though we haven't looped yet, we should be able to accept
470   // the connection and send data to it.
471   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
472   uint8_t buf2[128];
473   memset(buf2, 'b', sizeof(buf2));
474   acceptedSocket->write(buf2, sizeof(buf2));
475   acceptedSocket->flush();
476
477   // shut down the write half of acceptedSocket, so that the AsyncSocket
478   // will stop reading and we can break out of the event loop.
479   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
480
481   // Loop
482   evb.loop();
483
484   // Make sure the connect succeeded
485   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
486
487   // Make sure the AsyncSocket read the data written by the accepted socket
488   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
489   CHECK_EQ(rcb.buffers.size(), 1);
490   CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
491   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
492
493   // Close the AsyncSocket so we'll see EOF on acceptedSocket
494   socket->close();
495
496   // Make sure the accepted socket saw the data written by the AsyncSocket
497   uint8_t readbuf[sizeof(buf1)];
498   acceptedSocket->readAll(readbuf, sizeof(readbuf));
499   CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
500   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
501   CHECK_EQ(bytesRead, 0);
502
503   ASSERT_FALSE(socket->isClosedBySelf());
504   ASSERT_TRUE(socket->isClosedByPeer());
505 }
506
507 /**
508  * Test writing to the socket then shutting down writes before the connect
509  * attempt finishes.
510  */
511 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
512   TestServer server;
513
514   // connect()
515   EventBase evb;
516   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
517   ConnCallback ccb;
518   socket->connect(&ccb, server.getAddress(), 30);
519
520   // Hopefully the connect didn't succeed immediately.
521   // If it did, we can't exercise the write-while-connecting code path.
522   if (ccb.state == STATE_SUCCEEDED) {
523     LOG(INFO) << "connect() succeeded immediately; skipping test";
524     return;
525   }
526
527   // Ask to write some data
528   char wbuf[128];
529   memset(wbuf, 'a', sizeof(wbuf));
530   WriteCallback wcb;
531   socket->write(&wcb, wbuf, sizeof(wbuf));
532   socket->shutdownWrite();
533
534   // Shutdown writes
535   socket->shutdownWrite();
536
537   // Even though we haven't looped yet, we should be able to accept
538   // the connection.
539   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
540
541   // Since the connection is still in progress, there should be no data to
542   // read yet.  Verify that the accepted socket is not readable.
543   struct pollfd fds[1];
544   fds[0].fd = acceptedSocket->getSocketFD();
545   fds[0].events = POLLIN;
546   fds[0].revents = 0;
547   int rc = poll(fds, 1, 0);
548   CHECK_EQ(rc, 0);
549
550   // Write data to the accepted socket
551   uint8_t acceptedWbuf[192];
552   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
553   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
554   acceptedSocket->flush();
555
556   // Loop
557   evb.loop();
558
559   // The loop should have completed the connection, written the queued data,
560   // and shutdown writes on the socket.
561   //
562   // Check that the connection was completed successfully and that the write
563   // callback succeeded.
564   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
565   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
566
567   // Check that we can read the data that was written to the socket, and that
568   // we see an EOF, since its socket was half-shutdown.
569   uint8_t readbuf[sizeof(wbuf)];
570   acceptedSocket->readAll(readbuf, sizeof(readbuf));
571   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
572   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
573   CHECK_EQ(bytesRead, 0);
574
575   // Close the accepted socket.  This will cause it to see EOF
576   // and uninstall the read callback when we loop next.
577   acceptedSocket->close();
578
579   // Install a read callback, then loop again.
580   ReadCallback rcb;
581   socket->setReadCB(&rcb);
582   evb.loop();
583
584   // This loop should have read the data and seen the EOF
585   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
586   CHECK_EQ(rcb.buffers.size(), 1);
587   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
588   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
589                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
590
591   ASSERT_FALSE(socket->isClosedBySelf());
592   ASSERT_FALSE(socket->isClosedByPeer());
593 }
594
595 /**
596  * Test reading, writing, and shutting down writes before the connect attempt
597  * finishes.
598  */
599 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
600   TestServer server;
601
602   // connect()
603   EventBase evb;
604   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
605   ConnCallback ccb;
606   socket->connect(&ccb, server.getAddress(), 30);
607
608   // Hopefully the connect didn't succeed immediately.
609   // If it did, we can't exercise the write-while-connecting code path.
610   if (ccb.state == STATE_SUCCEEDED) {
611     LOG(INFO) << "connect() succeeded immediately; skipping test";
612     return;
613   }
614
615   // Install a read callback
616   ReadCallback rcb;
617   socket->setReadCB(&rcb);
618
619   // Ask to write some data
620   char wbuf[128];
621   memset(wbuf, 'a', sizeof(wbuf));
622   WriteCallback wcb;
623   socket->write(&wcb, wbuf, sizeof(wbuf));
624
625   // Shutdown writes
626   socket->shutdownWrite();
627
628   // Even though we haven't looped yet, we should be able to accept
629   // the connection.
630   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
631
632   // Since the connection is still in progress, there should be no data to
633   // read yet.  Verify that the accepted socket is not readable.
634   struct pollfd fds[1];
635   fds[0].fd = acceptedSocket->getSocketFD();
636   fds[0].events = POLLIN;
637   fds[0].revents = 0;
638   int rc = poll(fds, 1, 0);
639   CHECK_EQ(rc, 0);
640
641   // Write data to the accepted socket
642   uint8_t acceptedWbuf[192];
643   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
644   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
645   acceptedSocket->flush();
646   // Shutdown writes to the accepted socket.  This will cause it to see EOF
647   // and uninstall the read callback.
648   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
649
650   // Loop
651   evb.loop();
652
653   // The loop should have completed the connection, written the queued data,
654   // shutdown writes on the socket, read the data we wrote to it, and see the
655   // EOF.
656   //
657   // Check that the connection was completed successfully and that the read
658   // and write callbacks were invoked as expected.
659   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
660   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
661   CHECK_EQ(rcb.buffers.size(), 1);
662   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
663   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
664                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
665   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
666
667   // Check that we can read the data that was written to the socket, and that
668   // we see an EOF, since its socket was half-shutdown.
669   uint8_t readbuf[sizeof(wbuf)];
670   acceptedSocket->readAll(readbuf, sizeof(readbuf));
671   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
672   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
673   CHECK_EQ(bytesRead, 0);
674
675   // Fully close both sockets
676   acceptedSocket->close();
677   socket->close();
678
679   ASSERT_FALSE(socket->isClosedBySelf());
680   ASSERT_TRUE(socket->isClosedByPeer());
681 }
682
683 /**
684  * Test reading, writing, and calling shutdownWriteNow() before the
685  * connect attempt finishes.
686  */
687 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
688   TestServer server;
689
690   // connect()
691   EventBase evb;
692   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
693   ConnCallback ccb;
694   socket->connect(&ccb, server.getAddress(), 30);
695
696   // Hopefully the connect didn't succeed immediately.
697   // If it did, we can't exercise the write-while-connecting code path.
698   if (ccb.state == STATE_SUCCEEDED) {
699     LOG(INFO) << "connect() succeeded immediately; skipping test";
700     return;
701   }
702
703   // Install a read callback
704   ReadCallback rcb;
705   socket->setReadCB(&rcb);
706
707   // Ask to write some data
708   char wbuf[128];
709   memset(wbuf, 'a', sizeof(wbuf));
710   WriteCallback wcb;
711   socket->write(&wcb, wbuf, sizeof(wbuf));
712
713   // Shutdown writes immediately.
714   // This should immediately discard the data that we just tried to write.
715   socket->shutdownWriteNow();
716
717   // Verify that writeError() was invoked on the write callback.
718   CHECK_EQ(wcb.state, STATE_FAILED);
719   CHECK_EQ(wcb.bytesWritten, 0);
720
721   // Even though we haven't looped yet, we should be able to accept
722   // the connection.
723   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
724
725   // Since the connection is still in progress, there should be no data to
726   // read yet.  Verify that the accepted socket is not readable.
727   struct pollfd fds[1];
728   fds[0].fd = acceptedSocket->getSocketFD();
729   fds[0].events = POLLIN;
730   fds[0].revents = 0;
731   int rc = poll(fds, 1, 0);
732   CHECK_EQ(rc, 0);
733
734   // Write data to the accepted socket
735   uint8_t acceptedWbuf[192];
736   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
737   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
738   acceptedSocket->flush();
739   // Shutdown writes to the accepted socket.  This will cause it to see EOF
740   // and uninstall the read callback.
741   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
742
743   // Loop
744   evb.loop();
745
746   // The loop should have completed the connection, written the queued data,
747   // shutdown writes on the socket, read the data we wrote to it, and see the
748   // EOF.
749   //
750   // Check that the connection was completed successfully and that the read
751   // callback was invoked as expected.
752   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
753   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
754   CHECK_EQ(rcb.buffers.size(), 1);
755   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
756   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
757                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
758
759   // Since we used shutdownWriteNow(), it should have discarded all pending
760   // write data.  Verify we see an immediate EOF when reading from the accepted
761   // socket.
762   uint8_t readbuf[sizeof(wbuf)];
763   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
764   CHECK_EQ(bytesRead, 0);
765
766   // Fully close both sockets
767   acceptedSocket->close();
768   socket->close();
769
770   ASSERT_FALSE(socket->isClosedBySelf());
771   ASSERT_TRUE(socket->isClosedByPeer());
772 }
773
774 // Helper function for use in testConnectOptWrite()
775 // Temporarily disable the read callback
776 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
777   // Uninstall the read callback
778   socket->setReadCB(nullptr);
779   // Schedule the read callback to be reinstalled after 1ms
780   socket->getEventBase()->runInLoop(
781       std::bind(&AsyncSocket::setReadCB, socket, rcb));
782 }
783
784 /**
785  * Test connect+write, then have the connect callback perform another write.
786  *
787  * This tests interaction of the optimistic writing after connect with
788  * additional write attempts that occur in the connect callback.
789  */
790 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
791   TestServer server;
792   EventBase evb;
793   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
794
795   // connect()
796   ConnCallback ccb;
797   socket->connect(&ccb, server.getAddress(), 30);
798
799   // Hopefully the connect didn't succeed immediately.
800   // If it did, we can't exercise the optimistic write code path.
801   if (ccb.state == STATE_SUCCEEDED) {
802     LOG(INFO) << "connect() succeeded immediately; aborting test "
803                        "of optimistic write behavior";
804     return;
805   }
806
807   // Tell the connect callback to perform a write when the connect succeeds
808   WriteCallback wcb2;
809   scoped_array<char> buf2(new char[size2]);
810   memset(buf2.get(), 'b', size2);
811   if (size2 > 0) {
812     ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
813     // Tell the second write callback to close the connection when it is done
814     wcb2.successCallback = [&] { socket->closeNow(); };
815   }
816
817   // Schedule one write() immediately, before the connect finishes
818   scoped_array<char> buf1(new char[size1]);
819   memset(buf1.get(), 'a', size1);
820   WriteCallback wcb1;
821   if (size1 > 0) {
822     socket->write(&wcb1, buf1.get(), size1);
823   }
824
825   if (close) {
826     // immediately perform a close, before connect() completes
827     socket->close();
828   }
829
830   // Start reading from the other endpoint after 10ms.
831   // If we're using large buffers, we have to read so that the writes don't
832   // block forever.
833   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
834   ReadCallback rcb;
835   rcb.dataAvailableCallback = std::bind(tmpDisableReads,
836                                         acceptedSocket.get(), &rcb);
837   socket->getEventBase()->tryRunAfterDelay(
838       std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
839       10);
840
841   // Loop.  We don't bother accepting on the server socket yet.
842   // The kernel should be able to buffer the write request so it can succeed.
843   evb.loop();
844
845   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
846   if (size1 > 0) {
847     CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
848   }
849   if (size2 > 0) {
850     CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
851   }
852
853   socket->close();
854
855   // Make sure the read callback received all of the data
856   size_t bytesRead = 0;
857   for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
858        it != rcb.buffers.end();
859        ++it) {
860     size_t start = bytesRead;
861     bytesRead += it->length;
862     size_t end = bytesRead;
863     if (start < size1) {
864       size_t cmpLen = min(size1, end) - start;
865       CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
866     }
867     if (end > size1 && end <= size1 + size2) {
868       size_t itOffset;
869       size_t buf2Offset;
870       size_t cmpLen;
871       if (start >= size1) {
872         itOffset = 0;
873         buf2Offset = start - size1;
874         cmpLen = end - start;
875       } else {
876         itOffset = size1 - start;
877         buf2Offset = 0;
878         cmpLen = end - size1;
879       }
880       CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
881                                cmpLen),
882                         0);
883     }
884   }
885   CHECK_EQ(bytesRead, size1 + size2);
886 }
887
888 TEST(AsyncSocketTest, ConnectCallbackWrite) {
889   // Test using small writes that should both succeed immediately
890   testConnectOptWrite(100, 200);
891
892   // Test using a large buffer in the connect callback, that should block
893   const size_t largeSize = 8*1024*1024;
894   testConnectOptWrite(100, largeSize);
895
896   // Test using a large initial write
897   testConnectOptWrite(largeSize, 100);
898
899   // Test using two large buffers
900   testConnectOptWrite(largeSize, largeSize);
901
902   // Test a small write in the connect callback,
903   // but no immediate write before connect completes
904   testConnectOptWrite(0, 64);
905
906   // Test a large write in the connect callback,
907   // but no immediate write before connect completes
908   testConnectOptWrite(0, largeSize);
909
910   // Test connect, a small write, then immediately call close() before connect
911   // completes
912   testConnectOptWrite(211, 0, true);
913
914   // Test connect, a large immediate write (that will block), then immediately
915   // call close() before connect completes
916   testConnectOptWrite(largeSize, 0, true);
917 }
918
919 ///////////////////////////////////////////////////////////////////////////
920 // write() related tests
921 ///////////////////////////////////////////////////////////////////////////
922
923 /**
924  * Test writing using a nullptr callback
925  */
926 TEST(AsyncSocketTest, WriteNullCallback) {
927   TestServer server;
928
929   // connect()
930   EventBase evb;
931   std::shared_ptr<AsyncSocket> socket =
932     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
933   evb.loop(); // loop until the socket is connected
934
935   // write() with a nullptr callback
936   char buf[128];
937   memset(buf, 'a', sizeof(buf));
938   socket->write(nullptr, buf, sizeof(buf));
939
940   evb.loop(); // loop until the data is sent
941
942   // Make sure the server got a connection and received the data
943   socket->close();
944   server.verifyConnection(buf, sizeof(buf));
945
946   ASSERT_TRUE(socket->isClosedBySelf());
947   ASSERT_FALSE(socket->isClosedByPeer());
948 }
949
950 /**
951  * Test writing with a send timeout
952  */
953 TEST(AsyncSocketTest, WriteTimeout) {
954   TestServer server;
955
956   // connect()
957   EventBase evb;
958   std::shared_ptr<AsyncSocket> socket =
959     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
960   evb.loop(); // loop until the socket is connected
961
962   // write() a large chunk of data, with no-one on the other end reading
963   size_t writeLength = 8*1024*1024;
964   uint32_t timeout = 200;
965   socket->setSendTimeout(timeout);
966   scoped_array<char> buf(new char[writeLength]);
967   memset(buf.get(), 'a', writeLength);
968   WriteCallback wcb;
969   socket->write(&wcb, buf.get(), writeLength);
970
971   TimePoint start;
972   evb.loop();
973   TimePoint end;
974
975   // Make sure the write attempt timed out as requested
976   CHECK_EQ(wcb.state, STATE_FAILED);
977   CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
978
979   // Check that the write timed out within a reasonable period of time.
980   // We don't check for exactly the specified timeout, since AsyncSocket only
981   // times out when it hasn't made progress for that period of time.
982   //
983   // On linux, the first write sends a few hundred kb of data, then blocks for
984   // writability, and then unblocks again after 40ms and is able to write
985   // another smaller of data before blocking permanently.  Therefore it doesn't
986   // time out until 40ms + timeout.
987   //
988   // I haven't fully verified the cause of this, but I believe it probably
989   // occurs because the receiving end delays sending an ack for up to 40ms.
990   // (This is the default value for TCP_DELACK_MIN.)  Once the sender receives
991   // the ack, it can send some more data.  However, after that point the
992   // receiver's kernel buffer is full.  This 40ms delay happens even with
993   // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints.  However, the
994   // kernel may be automatically disabling TCP_QUICKACK after receiving some
995   // data.
996   //
997   // For now, we simply check that the timeout occurred within 160ms of
998   // the requested value.
999   T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1000 }
1001
1002 /**
1003  * Test writing to a socket that the remote endpoint has closed
1004  */
1005 TEST(AsyncSocketTest, WritePipeError) {
1006   TestServer server;
1007
1008   // connect()
1009   EventBase evb;
1010   std::shared_ptr<AsyncSocket> socket =
1011     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1012   socket->setSendTimeout(1000);
1013   evb.loop(); // loop until the socket is connected
1014
1015   // accept and immediately close the socket
1016   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1017   acceptedSocket.reset();
1018
1019   // write() a large chunk of data
1020   size_t writeLength = 8*1024*1024;
1021   scoped_array<char> buf(new char[writeLength]);
1022   memset(buf.get(), 'a', writeLength);
1023   WriteCallback wcb;
1024   socket->write(&wcb, buf.get(), writeLength);
1025
1026   evb.loop();
1027
1028   // Make sure the write failed.
1029   // It would be nice if AsyncSocketException could convey the errno value,
1030   // so that we could check for EPIPE
1031   CHECK_EQ(wcb.state, STATE_FAILED);
1032   CHECK_EQ(wcb.exception.getType(),
1033                     AsyncSocketException::INTERNAL_ERROR);
1034
1035   ASSERT_FALSE(socket->isClosedBySelf());
1036   ASSERT_FALSE(socket->isClosedByPeer());
1037 }
1038
1039 /**
1040  * Test writing a mix of simple buffers and IOBufs
1041  */
1042 TEST(AsyncSocketTest, WriteIOBuf) {
1043   TestServer server;
1044
1045   // connect()
1046   EventBase evb;
1047   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1048   ConnCallback ccb;
1049   socket->connect(&ccb, server.getAddress(), 30);
1050
1051   // Accept the connection
1052   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1053   ReadCallback rcb;
1054   acceptedSocket->setReadCB(&rcb);
1055
1056   // Write a simple buffer to the socket
1057   size_t simpleBufLength = 5;
1058   char simpleBuf[simpleBufLength];
1059   memset(simpleBuf, 'a', simpleBufLength);
1060   WriteCallback wcb;
1061   socket->write(&wcb, simpleBuf, simpleBufLength);
1062
1063   // Write a single-element IOBuf chain
1064   size_t buf1Length = 7;
1065   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1066   memset(buf1->writableData(), 'b', buf1Length);
1067   buf1->append(buf1Length);
1068   unique_ptr<IOBuf> buf1Copy(buf1->clone());
1069   WriteCallback wcb2;
1070   socket->writeChain(&wcb2, std::move(buf1));
1071
1072   // Write a multiple-element IOBuf chain
1073   size_t buf2Length = 11;
1074   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1075   memset(buf2->writableData(), 'c', buf2Length);
1076   buf2->append(buf2Length);
1077   size_t buf3Length = 13;
1078   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1079   memset(buf3->writableData(), 'd', buf3Length);
1080   buf3->append(buf3Length);
1081   buf2->appendChain(std::move(buf3));
1082   unique_ptr<IOBuf> buf2Copy(buf2->clone());
1083   buf2Copy->coalesce();
1084   WriteCallback wcb3;
1085   socket->writeChain(&wcb3, std::move(buf2));
1086   socket->shutdownWrite();
1087
1088   // Let the reads and writes run to completion
1089   evb.loop();
1090
1091   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1092   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1093   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1094
1095   // Make sure the reader got the right data in the right order
1096   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1097   CHECK_EQ(rcb.buffers.size(), 1);
1098   CHECK_EQ(rcb.buffers[0].length,
1099       simpleBufLength + buf1Length + buf2Length + buf3Length);
1100   CHECK_EQ(
1101       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1102   CHECK_EQ(
1103       memcmp(rcb.buffers[0].buffer + simpleBufLength,
1104           buf1Copy->data(), buf1Copy->length()), 0);
1105   CHECK_EQ(
1106       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1107           buf2Copy->data(), buf2Copy->length()), 0);
1108
1109   acceptedSocket->close();
1110   socket->close();
1111
1112   ASSERT_TRUE(socket->isClosedBySelf());
1113   ASSERT_FALSE(socket->isClosedByPeer());
1114 }
1115
1116 TEST(AsyncSocketTest, WriteIOBufCorked) {
1117   TestServer server;
1118
1119   // connect()
1120   EventBase evb;
1121   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1122   ConnCallback ccb;
1123   socket->connect(&ccb, server.getAddress(), 30);
1124
1125   // Accept the connection
1126   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1127   ReadCallback rcb;
1128   acceptedSocket->setReadCB(&rcb);
1129
1130   // Do three writes, 100ms apart, with the "cork" flag set
1131   // on the second write.  The reader should see the first write
1132   // arrive by itself, followed by the second and third writes
1133   // arriving together.
1134   size_t buf1Length = 5;
1135   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1136   memset(buf1->writableData(), 'a', buf1Length);
1137   buf1->append(buf1Length);
1138   size_t buf2Length = 7;
1139   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1140   memset(buf2->writableData(), 'b', buf2Length);
1141   buf2->append(buf2Length);
1142   size_t buf3Length = 11;
1143   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1144   memset(buf3->writableData(), 'c', buf3Length);
1145   buf3->append(buf3Length);
1146   WriteCallback wcb1;
1147   socket->writeChain(&wcb1, std::move(buf1));
1148   WriteCallback wcb2;
1149   DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1150   write2.scheduleTimeout(100);
1151   WriteCallback wcb3;
1152   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1153   write3.scheduleTimeout(200);
1154
1155   evb.loop();
1156   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1157   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1158   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1159   if (wcb3.state != STATE_SUCCEEDED) {
1160     throw(wcb3.exception);
1161   }
1162   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1163
1164   // Make sure the reader got the data with the right grouping
1165   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1166   CHECK_EQ(rcb.buffers.size(), 2);
1167   CHECK_EQ(rcb.buffers[0].length, buf1Length);
1168   CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1169
1170   acceptedSocket->close();
1171   socket->close();
1172
1173   ASSERT_TRUE(socket->isClosedBySelf());
1174   ASSERT_FALSE(socket->isClosedByPeer());
1175 }
1176
1177 /**
1178  * Test performing a zero-length write
1179  */
1180 TEST(AsyncSocketTest, ZeroLengthWrite) {
1181   TestServer server;
1182
1183   // connect()
1184   EventBase evb;
1185   std::shared_ptr<AsyncSocket> socket =
1186     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1187   evb.loop(); // loop until the socket is connected
1188
1189   auto acceptedSocket = server.acceptAsync(&evb);
1190   ReadCallback rcb;
1191   acceptedSocket->setReadCB(&rcb);
1192
1193   size_t len1 = 1024*1024;
1194   size_t len2 = 1024*1024;
1195   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1196   memset(buf.get(), 'a', len1);
1197   memset(buf.get(), 'b', len2);
1198
1199   WriteCallback wcb1;
1200   WriteCallback wcb2;
1201   WriteCallback wcb3;
1202   WriteCallback wcb4;
1203   socket->write(&wcb1, buf.get(), 0);
1204   socket->write(&wcb2, buf.get(), len1);
1205   socket->write(&wcb3, buf.get() + len1, 0);
1206   socket->write(&wcb4, buf.get() + len1, len2);
1207   socket->close();
1208
1209   evb.loop(); // loop until the data is sent
1210
1211   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1212   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1213   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1214   CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1215   rcb.verifyData(buf.get(), len1 + len2);
1216
1217   ASSERT_TRUE(socket->isClosedBySelf());
1218   ASSERT_FALSE(socket->isClosedByPeer());
1219 }
1220
1221 TEST(AsyncSocketTest, ZeroLengthWritev) {
1222   TestServer server;
1223
1224   // connect()
1225   EventBase evb;
1226   std::shared_ptr<AsyncSocket> socket =
1227     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1228   evb.loop(); // loop until the socket is connected
1229
1230   auto acceptedSocket = server.acceptAsync(&evb);
1231   ReadCallback rcb;
1232   acceptedSocket->setReadCB(&rcb);
1233
1234   size_t len1 = 1024*1024;
1235   size_t len2 = 1024*1024;
1236   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1237   memset(buf.get(), 'a', len1);
1238   memset(buf.get(), 'b', len2);
1239
1240   WriteCallback wcb;
1241   size_t iovCount = 4;
1242   struct iovec iov[iovCount];
1243   iov[0].iov_base = buf.get();
1244   iov[0].iov_len = len1;
1245   iov[1].iov_base = buf.get() + len1;
1246   iov[1].iov_len = 0;
1247   iov[2].iov_base = buf.get() + len1;
1248   iov[2].iov_len = len2;
1249   iov[3].iov_base = buf.get() + len1 + len2;
1250   iov[3].iov_len = 0;
1251
1252   socket->writev(&wcb, iov, iovCount);
1253   socket->close();
1254   evb.loop(); // loop until the data is sent
1255
1256   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1257   rcb.verifyData(buf.get(), len1 + len2);
1258
1259   ASSERT_TRUE(socket->isClosedBySelf());
1260   ASSERT_FALSE(socket->isClosedByPeer());
1261 }
1262
1263 ///////////////////////////////////////////////////////////////////////////
1264 // close() related tests
1265 ///////////////////////////////////////////////////////////////////////////
1266
1267 /**
1268  * Test calling close() with pending writes when the socket is already closing.
1269  */
1270 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1271   TestServer server;
1272
1273   // connect()
1274   EventBase evb;
1275   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1276   ConnCallback ccb;
1277   socket->connect(&ccb, server.getAddress(), 30);
1278
1279   // accept the socket on the server side
1280   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1281
1282   // Loop to ensure the connect has completed
1283   evb.loop();
1284
1285   // Make sure we are connected
1286   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1287
1288   // Schedule pending writes, until several write attempts have blocked
1289   char buf[128];
1290   memset(buf, 'a', sizeof(buf));
1291   typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1292   WriteCallbackVector writeCallbacks;
1293
1294   writeCallbacks.reserve(5);
1295   while (writeCallbacks.size() < 5) {
1296     std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1297
1298     socket->write(wcb.get(), buf, sizeof(buf));
1299     if (wcb->state == STATE_SUCCEEDED) {
1300       // Succeeded immediately.  Keep performing more writes
1301       continue;
1302     }
1303
1304     // This write is blocked.
1305     // Have the write callback call close() when writeError() is invoked
1306     wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1307     writeCallbacks.push_back(wcb);
1308   }
1309
1310   // Call closeNow() to immediately fail the pending writes
1311   socket->closeNow();
1312
1313   // Make sure writeError() was invoked on all of the pending write callbacks
1314   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1315        it != writeCallbacks.end();
1316        ++it) {
1317     CHECK_EQ((*it)->state, STATE_FAILED);
1318   }
1319
1320   ASSERT_TRUE(socket->isClosedBySelf());
1321   ASSERT_FALSE(socket->isClosedByPeer());
1322 }
1323
1324 ///////////////////////////////////////////////////////////////////////////
1325 // ImmediateRead related tests
1326 ///////////////////////////////////////////////////////////////////////////
1327
1328 /* AsyncSocket use to verify immediate read works */
1329 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1330  public:
1331   bool immediateReadCalled = false;
1332   explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1333  protected:
1334   void checkForImmediateRead() noexcept override {
1335     immediateReadCalled = true;
1336     AsyncSocket::handleRead();
1337   }
1338 };
1339
1340 TEST(AsyncSocket, ConnectReadImmediateRead) {
1341   TestServer server;
1342
1343   const size_t maxBufferSz = 100;
1344   const size_t maxReadsPerEvent = 1;
1345   const size_t expectedDataSz = maxBufferSz * 3;
1346   char expectedData[expectedDataSz];
1347   memset(expectedData, 'j', expectedDataSz);
1348
1349   EventBase evb;
1350   ReadCallback rcb(maxBufferSz);
1351   AsyncSocketImmediateRead socket(&evb);
1352   socket.connect(nullptr, server.getAddress(), 30);
1353
1354   evb.loop(); // loop until the socket is connected
1355
1356   socket.setReadCB(&rcb);
1357   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1358   socket.immediateReadCalled = false;
1359
1360   auto acceptedSocket = server.acceptAsync(&evb);
1361
1362   ReadCallback rcbServer;
1363   WriteCallback wcbServer;
1364   rcbServer.dataAvailableCallback = [&]() {
1365     if (rcbServer.dataRead() == expectedDataSz) {
1366       // write back all data read
1367       rcbServer.verifyData(expectedData, expectedDataSz);
1368       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1369       acceptedSocket->close();
1370     }
1371   };
1372   acceptedSocket->setReadCB(&rcbServer);
1373
1374   // write data
1375   WriteCallback wcb1;
1376   socket.write(&wcb1, expectedData, expectedDataSz);
1377   evb.loop();
1378   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1379   rcb.verifyData(expectedData, expectedDataSz);
1380   CHECK_EQ(socket.immediateReadCalled, true);
1381
1382   ASSERT_FALSE(socket.isClosedBySelf());
1383   ASSERT_FALSE(socket.isClosedByPeer());
1384 }
1385
1386 TEST(AsyncSocket, ConnectReadUninstallRead) {
1387   TestServer server;
1388
1389   const size_t maxBufferSz = 100;
1390   const size_t maxReadsPerEvent = 1;
1391   const size_t expectedDataSz = maxBufferSz * 3;
1392   char expectedData[expectedDataSz];
1393   memset(expectedData, 'k', expectedDataSz);
1394
1395   EventBase evb;
1396   ReadCallback rcb(maxBufferSz);
1397   AsyncSocketImmediateRead socket(&evb);
1398   socket.connect(nullptr, server.getAddress(), 30);
1399
1400   evb.loop(); // loop until the socket is connected
1401
1402   socket.setReadCB(&rcb);
1403   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1404   socket.immediateReadCalled = false;
1405
1406   auto acceptedSocket = server.acceptAsync(&evb);
1407
1408   ReadCallback rcbServer;
1409   WriteCallback wcbServer;
1410   rcbServer.dataAvailableCallback = [&]() {
1411     if (rcbServer.dataRead() == expectedDataSz) {
1412       // write back all data read
1413       rcbServer.verifyData(expectedData, expectedDataSz);
1414       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1415       acceptedSocket->close();
1416     }
1417   };
1418   acceptedSocket->setReadCB(&rcbServer);
1419
1420   rcb.dataAvailableCallback = [&]() {
1421     // we read data and reset readCB
1422     socket.setReadCB(nullptr);
1423   };
1424
1425   // write data
1426   WriteCallback wcb;
1427   socket.write(&wcb, expectedData, expectedDataSz);
1428   evb.loop();
1429   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1430
1431   /* we shoud've only read maxBufferSz data since readCallback_
1432    * was reset in dataAvailableCallback */
1433   CHECK_EQ(rcb.dataRead(), maxBufferSz);
1434   CHECK_EQ(socket.immediateReadCalled, false);
1435
1436   ASSERT_FALSE(socket.isClosedBySelf());
1437   ASSERT_FALSE(socket.isClosedByPeer());
1438 }
1439
1440 // TODO:
1441 // - Test connect() and have the connect callback set the read callback
1442 // - Test connect() and have the connect callback unset the read callback
1443 // - Test reading/writing/closing/destroying the socket in the connect callback
1444 // - Test reading/writing/closing/destroying the socket in the read callback
1445 // - Test reading/writing/closing/destroying the socket in the write callback
1446 // - Test one-way shutdown behavior
1447 // - Test changing the EventBase
1448 //
1449 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1450 //   in connectSuccess(), readDataAvailable(), writeSuccess()
1451
1452
1453 ///////////////////////////////////////////////////////////////////////////
1454 // AsyncServerSocket tests
1455 ///////////////////////////////////////////////////////////////////////////
1456 namespace {
1457 /**
1458  * Helper ConnectionEventCallback class for the test code.
1459  * It maintains counters protected by a spin lock.
1460  */
1461 class TestConnectionEventCallback :
1462   public AsyncServerSocket::ConnectionEventCallback {
1463  public:
1464   virtual void onConnectionAccepted(
1465       const int socket,
1466       const SocketAddress& addr) noexcept override {
1467     folly::RWSpinLock::WriteHolder holder(spinLock_);
1468     connectionAccepted_++;
1469   }
1470
1471   virtual void onConnectionAcceptError(const int err) noexcept override {
1472     folly::RWSpinLock::WriteHolder holder(spinLock_);
1473     connectionAcceptedError_++;
1474   }
1475
1476   virtual void onConnectionDropped(
1477       const int socket,
1478       const SocketAddress& addr) noexcept override {
1479     folly::RWSpinLock::WriteHolder holder(spinLock_);
1480     connectionDropped_++;
1481   }
1482
1483   virtual void onConnectionEnqueuedForAcceptCallback(
1484       const int socket,
1485       const SocketAddress& addr) noexcept override {
1486     folly::RWSpinLock::WriteHolder holder(spinLock_);
1487     connectionEnqueuedForAcceptCallback_++;
1488   }
1489
1490   virtual void onConnectionDequeuedByAcceptCallback(
1491       const int socket,
1492       const SocketAddress& addr) noexcept override {
1493     folly::RWSpinLock::WriteHolder holder(spinLock_);
1494     connectionDequeuedByAcceptCallback_++;
1495   }
1496
1497   virtual void onBackoffStarted() noexcept override {
1498     folly::RWSpinLock::WriteHolder holder(spinLock_);
1499     backoffStarted_++;
1500   }
1501
1502   virtual void onBackoffEnded() noexcept override {
1503     folly::RWSpinLock::WriteHolder holder(spinLock_);
1504     backoffEnded_++;
1505   }
1506
1507   virtual void onBackoffError() noexcept override {
1508     folly::RWSpinLock::WriteHolder holder(spinLock_);
1509     backoffError_++;
1510   }
1511
1512   unsigned int getConnectionAccepted() const {
1513     folly::RWSpinLock::ReadHolder holder(spinLock_);
1514     return connectionAccepted_;
1515   }
1516
1517   unsigned int getConnectionAcceptedError() const {
1518     folly::RWSpinLock::ReadHolder holder(spinLock_);
1519     return connectionAcceptedError_;
1520   }
1521
1522   unsigned int getConnectionDropped() const {
1523     folly::RWSpinLock::ReadHolder holder(spinLock_);
1524     return connectionDropped_;
1525   }
1526
1527   unsigned int getConnectionEnqueuedForAcceptCallback() const {
1528     folly::RWSpinLock::ReadHolder holder(spinLock_);
1529     return connectionEnqueuedForAcceptCallback_;
1530   }
1531
1532   unsigned int getConnectionDequeuedByAcceptCallback() const {
1533     folly::RWSpinLock::ReadHolder holder(spinLock_);
1534     return connectionDequeuedByAcceptCallback_;
1535   }
1536
1537   unsigned int getBackoffStarted() const {
1538     folly::RWSpinLock::ReadHolder holder(spinLock_);
1539     return backoffStarted_;
1540   }
1541
1542   unsigned int getBackoffEnded() const {
1543     folly::RWSpinLock::ReadHolder holder(spinLock_);
1544     return backoffEnded_;
1545   }
1546
1547   unsigned int getBackoffError() const {
1548     folly::RWSpinLock::ReadHolder holder(spinLock_);
1549     return backoffError_;
1550   }
1551
1552  private:
1553   mutable folly::RWSpinLock spinLock_;
1554   unsigned int connectionAccepted_{0};
1555   unsigned int connectionAcceptedError_{0};
1556   unsigned int connectionDropped_{0};
1557   unsigned int connectionEnqueuedForAcceptCallback_{0};
1558   unsigned int connectionDequeuedByAcceptCallback_{0};
1559   unsigned int backoffStarted_{0};
1560   unsigned int backoffEnded_{0};
1561   unsigned int backoffError_{0};
1562 };
1563
1564 /**
1565  * Helper AcceptCallback class for the test code
1566  * It records the callbacks that were invoked, and also supports calling
1567  * generic std::function objects in each callback.
1568  */
1569 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1570  public:
1571   enum EventType {
1572     TYPE_START,
1573     TYPE_ACCEPT,
1574     TYPE_ERROR,
1575     TYPE_STOP
1576   };
1577   struct EventInfo {
1578     EventInfo(int fd, const folly::SocketAddress& addr)
1579       : type(TYPE_ACCEPT),
1580         fd(fd),
1581         address(addr),
1582         errorMsg() {}
1583     explicit EventInfo(const std::string& msg)
1584       : type(TYPE_ERROR),
1585         fd(-1),
1586         address(),
1587         errorMsg(msg) {}
1588     explicit EventInfo(EventType et)
1589       : type(et),
1590         fd(-1),
1591         address(),
1592         errorMsg() {}
1593
1594     EventType type;
1595     int fd;  // valid for TYPE_ACCEPT
1596     folly::SocketAddress address;  // valid for TYPE_ACCEPT
1597     string errorMsg;  // valid for TYPE_ERROR
1598   };
1599   typedef std::deque<EventInfo> EventList;
1600
1601   TestAcceptCallback()
1602     : connectionAcceptedFn_(),
1603       acceptErrorFn_(),
1604       acceptStoppedFn_(),
1605       events_() {}
1606
1607   std::deque<EventInfo>* getEvents() {
1608     return &events_;
1609   }
1610
1611   void setConnectionAcceptedFn(
1612       const std::function<void(int, const folly::SocketAddress&)>& fn) {
1613     connectionAcceptedFn_ = fn;
1614   }
1615   void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1616     acceptErrorFn_ = fn;
1617   }
1618   void setAcceptStartedFn(const std::function<void()>& fn) {
1619     acceptStartedFn_ = fn;
1620   }
1621   void setAcceptStoppedFn(const std::function<void()>& fn) {
1622     acceptStoppedFn_ = fn;
1623   }
1624
1625   void connectionAccepted(
1626       int fd, const folly::SocketAddress& clientAddr) noexcept override {
1627     events_.emplace_back(fd, clientAddr);
1628
1629     if (connectionAcceptedFn_) {
1630       connectionAcceptedFn_(fd, clientAddr);
1631     }
1632   }
1633   void acceptError(const std::exception& ex) noexcept override {
1634     events_.emplace_back(ex.what());
1635
1636     if (acceptErrorFn_) {
1637       acceptErrorFn_(ex);
1638     }
1639   }
1640   void acceptStarted() noexcept override {
1641     events_.emplace_back(TYPE_START);
1642
1643     if (acceptStartedFn_) {
1644       acceptStartedFn_();
1645     }
1646   }
1647   void acceptStopped() noexcept override {
1648     events_.emplace_back(TYPE_STOP);
1649
1650     if (acceptStoppedFn_) {
1651       acceptStoppedFn_();
1652     }
1653   }
1654
1655  private:
1656   std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1657   std::function<void(const std::exception&)> acceptErrorFn_;
1658   std::function<void()> acceptStartedFn_;
1659   std::function<void()> acceptStoppedFn_;
1660
1661   std::deque<EventInfo> events_;
1662 };
1663 }
1664
1665 /**
1666  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1667  */
1668 TEST(AsyncSocketTest, ServerAcceptOptions) {
1669   EventBase eventBase;
1670
1671   // Create a server socket
1672   std::shared_ptr<AsyncServerSocket> serverSocket(
1673       AsyncServerSocket::newSocket(&eventBase));
1674   serverSocket->bind(0);
1675   serverSocket->listen(16);
1676   folly::SocketAddress serverAddress;
1677   serverSocket->getAddress(&serverAddress);
1678
1679   // Add a callback to accept one connection then stop the loop
1680   TestAcceptCallback acceptCallback;
1681   acceptCallback.setConnectionAcceptedFn(
1682     [&](int fd, const folly::SocketAddress& addr) {
1683       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1684     });
1685   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1686     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1687   });
1688   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1689   serverSocket->startAccepting();
1690
1691   // Connect to the server socket
1692   std::shared_ptr<AsyncSocket> socket(
1693       AsyncSocket::newSocket(&eventBase, serverAddress));
1694
1695   eventBase.loop();
1696
1697   // Verify that the server accepted a connection
1698   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1699   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1700                     TestAcceptCallback::TYPE_START);
1701   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1702                     TestAcceptCallback::TYPE_ACCEPT);
1703   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1704                     TestAcceptCallback::TYPE_STOP);
1705   int fd = acceptCallback.getEvents()->at(1).fd;
1706
1707   // The accepted connection should already be in non-blocking mode
1708   int flags = fcntl(fd, F_GETFL, 0);
1709   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1710
1711 #ifndef TCP_NOPUSH
1712   // The accepted connection should already have TCP_NODELAY set
1713   int value;
1714   socklen_t valueLength = sizeof(value);
1715   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1716   CHECK_EQ(rc, 0);
1717   CHECK_EQ(value, 1);
1718 #endif
1719 }
1720
1721 /**
1722  * Test AsyncServerSocket::removeAcceptCallback()
1723  */
1724 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1725   // Create a new AsyncServerSocket
1726   EventBase eventBase;
1727   std::shared_ptr<AsyncServerSocket> serverSocket(
1728       AsyncServerSocket::newSocket(&eventBase));
1729   serverSocket->bind(0);
1730   serverSocket->listen(16);
1731   folly::SocketAddress serverAddress;
1732   serverSocket->getAddress(&serverAddress);
1733
1734   // Add several accept callbacks
1735   TestAcceptCallback cb1;
1736   TestAcceptCallback cb2;
1737   TestAcceptCallback cb3;
1738   TestAcceptCallback cb4;
1739   TestAcceptCallback cb5;
1740   TestAcceptCallback cb6;
1741   TestAcceptCallback cb7;
1742
1743   // Test having callbacks remove other callbacks before them on the list,
1744   // after them on the list, or removing themselves.
1745   //
1746   // Have callback 2 remove callback 3 and callback 5 the first time it is
1747   // called.
1748   int cb2Count = 0;
1749   cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1750       std::shared_ptr<AsyncSocket> sock2(
1751         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1752       });
1753   cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1754     });
1755   cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1756       std::shared_ptr<AsyncSocket> sock3(
1757         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1758     });
1759   cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1760   std::shared_ptr<AsyncSocket> sock5(
1761       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1762
1763     });
1764   cb2.setConnectionAcceptedFn(
1765     [&](int fd, const folly::SocketAddress& addr) {
1766       if (cb2Count == 0) {
1767         serverSocket->removeAcceptCallback(&cb3, nullptr);
1768         serverSocket->removeAcceptCallback(&cb5, nullptr);
1769       }
1770       ++cb2Count;
1771     });
1772   // Have callback 6 remove callback 4 the first time it is called,
1773   // and destroy the server socket the second time it is called
1774   int cb6Count = 0;
1775   cb6.setConnectionAcceptedFn(
1776     [&](int fd, const folly::SocketAddress& addr) {
1777       if (cb6Count == 0) {
1778         serverSocket->removeAcceptCallback(&cb4, nullptr);
1779         std::shared_ptr<AsyncSocket> sock6(
1780           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1781         std::shared_ptr<AsyncSocket> sock7(
1782           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1783         std::shared_ptr<AsyncSocket> sock8(
1784           AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1785
1786       } else {
1787         serverSocket.reset();
1788       }
1789       ++cb6Count;
1790     });
1791   // Have callback 7 remove itself
1792   cb7.setConnectionAcceptedFn(
1793     [&](int fd, const folly::SocketAddress& addr) {
1794       serverSocket->removeAcceptCallback(&cb7, nullptr);
1795     });
1796
1797   serverSocket->addAcceptCallback(&cb1, nullptr);
1798   serverSocket->addAcceptCallback(&cb2, nullptr);
1799   serverSocket->addAcceptCallback(&cb3, nullptr);
1800   serverSocket->addAcceptCallback(&cb4, nullptr);
1801   serverSocket->addAcceptCallback(&cb5, nullptr);
1802   serverSocket->addAcceptCallback(&cb6, nullptr);
1803   serverSocket->addAcceptCallback(&cb7, nullptr);
1804   serverSocket->startAccepting();
1805
1806   // Make several connections to the socket
1807   std::shared_ptr<AsyncSocket> sock1(
1808       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1809   std::shared_ptr<AsyncSocket> sock4(
1810       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1811
1812   // Loop until we are stopped
1813   eventBase.loop();
1814
1815   // Check to make sure that the expected callbacks were invoked.
1816   //
1817   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1818   // the AcceptCallbacks in round-robin fashion, in the order that they were
1819   // added.  The code is implemented this way right now, but the API doesn't
1820   // explicitly require it be done this way.  If we change the code not to be
1821   // exactly round robin in the future, we can simplify the test checks here.
1822   // (We'll also need to update the termination code, since we expect cb6 to
1823   // get called twice to terminate the loop.)
1824   CHECK_EQ(cb1.getEvents()->size(), 4);
1825   CHECK_EQ(cb1.getEvents()->at(0).type,
1826                     TestAcceptCallback::TYPE_START);
1827   CHECK_EQ(cb1.getEvents()->at(1).type,
1828                     TestAcceptCallback::TYPE_ACCEPT);
1829   CHECK_EQ(cb1.getEvents()->at(2).type,
1830                     TestAcceptCallback::TYPE_ACCEPT);
1831   CHECK_EQ(cb1.getEvents()->at(3).type,
1832                     TestAcceptCallback::TYPE_STOP);
1833
1834   CHECK_EQ(cb2.getEvents()->size(), 4);
1835   CHECK_EQ(cb2.getEvents()->at(0).type,
1836                     TestAcceptCallback::TYPE_START);
1837   CHECK_EQ(cb2.getEvents()->at(1).type,
1838                     TestAcceptCallback::TYPE_ACCEPT);
1839   CHECK_EQ(cb2.getEvents()->at(2).type,
1840                     TestAcceptCallback::TYPE_ACCEPT);
1841   CHECK_EQ(cb2.getEvents()->at(3).type,
1842                     TestAcceptCallback::TYPE_STOP);
1843
1844   CHECK_EQ(cb3.getEvents()->size(), 2);
1845   CHECK_EQ(cb3.getEvents()->at(0).type,
1846                     TestAcceptCallback::TYPE_START);
1847   CHECK_EQ(cb3.getEvents()->at(1).type,
1848                     TestAcceptCallback::TYPE_STOP);
1849
1850   CHECK_EQ(cb4.getEvents()->size(), 3);
1851   CHECK_EQ(cb4.getEvents()->at(0).type,
1852                     TestAcceptCallback::TYPE_START);
1853   CHECK_EQ(cb4.getEvents()->at(1).type,
1854                     TestAcceptCallback::TYPE_ACCEPT);
1855   CHECK_EQ(cb4.getEvents()->at(2).type,
1856                     TestAcceptCallback::TYPE_STOP);
1857
1858   CHECK_EQ(cb5.getEvents()->size(), 2);
1859   CHECK_EQ(cb5.getEvents()->at(0).type,
1860                     TestAcceptCallback::TYPE_START);
1861   CHECK_EQ(cb5.getEvents()->at(1).type,
1862                     TestAcceptCallback::TYPE_STOP);
1863
1864   CHECK_EQ(cb6.getEvents()->size(), 4);
1865   CHECK_EQ(cb6.getEvents()->at(0).type,
1866                     TestAcceptCallback::TYPE_START);
1867   CHECK_EQ(cb6.getEvents()->at(1).type,
1868                     TestAcceptCallback::TYPE_ACCEPT);
1869   CHECK_EQ(cb6.getEvents()->at(2).type,
1870                     TestAcceptCallback::TYPE_ACCEPT);
1871   CHECK_EQ(cb6.getEvents()->at(3).type,
1872                     TestAcceptCallback::TYPE_STOP);
1873
1874   CHECK_EQ(cb7.getEvents()->size(), 3);
1875   CHECK_EQ(cb7.getEvents()->at(0).type,
1876                     TestAcceptCallback::TYPE_START);
1877   CHECK_EQ(cb7.getEvents()->at(1).type,
1878                     TestAcceptCallback::TYPE_ACCEPT);
1879   CHECK_EQ(cb7.getEvents()->at(2).type,
1880                     TestAcceptCallback::TYPE_STOP);
1881 }
1882
1883 /**
1884  * Test AsyncServerSocket::removeAcceptCallback()
1885  */
1886 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1887   // Create a new AsyncServerSocket
1888   EventBase eventBase;
1889   std::shared_ptr<AsyncServerSocket> serverSocket(
1890       AsyncServerSocket::newSocket(&eventBase));
1891   serverSocket->bind(0);
1892   serverSocket->listen(16);
1893   folly::SocketAddress serverAddress;
1894   serverSocket->getAddress(&serverAddress);
1895
1896   // Add several accept callbacks
1897   TestAcceptCallback cb1;
1898   auto thread_id = pthread_self();
1899   cb1.setAcceptStartedFn([&](){
1900     CHECK_NE(thread_id, pthread_self());
1901     thread_id = pthread_self();
1902   });
1903   cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1904     CHECK_EQ(thread_id, pthread_self());
1905     serverSocket->removeAcceptCallback(&cb1, nullptr);
1906   });
1907   cb1.setAcceptStoppedFn([&](){
1908     CHECK_EQ(thread_id, pthread_self());
1909   });
1910
1911   // Test having callbacks remove other callbacks before them on the list,
1912   serverSocket->addAcceptCallback(&cb1, nullptr);
1913   serverSocket->startAccepting();
1914
1915   // Make several connections to the socket
1916   std::shared_ptr<AsyncSocket> sock1(
1917       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1918
1919   // Loop in another thread
1920   auto other = std::thread([&](){
1921     eventBase.loop();
1922   });
1923   other.join();
1924
1925   // Check to make sure that the expected callbacks were invoked.
1926   //
1927   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1928   // the AcceptCallbacks in round-robin fashion, in the order that they were
1929   // added.  The code is implemented this way right now, but the API doesn't
1930   // explicitly require it be done this way.  If we change the code not to be
1931   // exactly round robin in the future, we can simplify the test checks here.
1932   // (We'll also need to update the termination code, since we expect cb6 to
1933   // get called twice to terminate the loop.)
1934   CHECK_EQ(cb1.getEvents()->size(), 3);
1935   CHECK_EQ(cb1.getEvents()->at(0).type,
1936                     TestAcceptCallback::TYPE_START);
1937   CHECK_EQ(cb1.getEvents()->at(1).type,
1938                     TestAcceptCallback::TYPE_ACCEPT);
1939   CHECK_EQ(cb1.getEvents()->at(2).type,
1940                     TestAcceptCallback::TYPE_STOP);
1941
1942 }
1943
1944 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1945   // Add a callback to accept one connection then stop accepting
1946   TestAcceptCallback acceptCallback;
1947   acceptCallback.setConnectionAcceptedFn(
1948     [&](int fd, const folly::SocketAddress& addr) {
1949       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1950     });
1951   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1952     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1953   });
1954   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1955   serverSocket->startAccepting();
1956
1957   // Connect to the server socket
1958   EventBase* eventBase = serverSocket->getEventBase();
1959   folly::SocketAddress serverAddress;
1960   serverSocket->getAddress(&serverAddress);
1961   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1962
1963   // Loop to process all events
1964   eventBase->loop();
1965
1966   // Verify that the server accepted a connection
1967   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1968   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1969                     TestAcceptCallback::TYPE_START);
1970   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1971                     TestAcceptCallback::TYPE_ACCEPT);
1972   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1973                     TestAcceptCallback::TYPE_STOP);
1974 }
1975
1976 /* Verify that we don't leak sockets if we are destroyed()
1977  * and there are still writes pending
1978  *
1979  * If destroy() only calls close() instead of closeNow(),
1980  * it would shutdown(writes) on the socket, but it would
1981  * never be close()'d, and the socket would leak
1982  */
1983 TEST(AsyncSocketTest, DestroyCloseTest) {
1984   TestServer server;
1985
1986   // connect()
1987   EventBase clientEB;
1988   EventBase serverEB;
1989   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1990   ConnCallback ccb;
1991   socket->connect(&ccb, server.getAddress(), 30);
1992
1993   // Accept the connection
1994   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1995   ReadCallback rcb;
1996   acceptedSocket->setReadCB(&rcb);
1997
1998   // Write a large buffer to the socket that is larger than kernel buffer
1999   size_t simpleBufLength = 5000000;
2000   char* simpleBuf = new char[simpleBufLength];
2001   memset(simpleBuf, 'a', simpleBufLength);
2002   WriteCallback wcb;
2003
2004   // Let the reads and writes run to completion
2005   int fd = acceptedSocket->getFd();
2006
2007   acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2008   socket.reset();
2009   acceptedSocket.reset();
2010
2011   // Test that server socket was closed
2012   ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2013   CHECK_EQ(sz, -1);
2014   CHECK_EQ(errno, 9);
2015   delete[] simpleBuf;
2016 }
2017
2018 /**
2019  * Test AsyncServerSocket::useExistingSocket()
2020  */
2021 TEST(AsyncSocketTest, ServerExistingSocket) {
2022   EventBase eventBase;
2023
2024   // Test creating a socket, and letting AsyncServerSocket bind and listen
2025   {
2026     // Manually create a socket
2027     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2028     ASSERT_GE(fd, 0);
2029
2030     // Create a server socket
2031     AsyncServerSocket::UniquePtr serverSocket(
2032         new AsyncServerSocket(&eventBase));
2033     serverSocket->useExistingSocket(fd);
2034     folly::SocketAddress address;
2035     serverSocket->getAddress(&address);
2036     address.setPort(0);
2037     serverSocket->bind(address);
2038     serverSocket->listen(16);
2039
2040     // Make sure the socket works
2041     serverSocketSanityTest(serverSocket.get());
2042   }
2043
2044   // Test creating a socket and binding manually,
2045   // then letting AsyncServerSocket listen
2046   {
2047     // Manually create a socket
2048     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2049     ASSERT_GE(fd, 0);
2050     // bind
2051     struct sockaddr_in addr;
2052     addr.sin_family = AF_INET;
2053     addr.sin_port = 0;
2054     addr.sin_addr.s_addr = INADDR_ANY;
2055     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2056                              sizeof(addr)), 0);
2057     // Look up the address that we bound to
2058     folly::SocketAddress boundAddress;
2059     boundAddress.setFromLocalAddress(fd);
2060
2061     // Create a server socket
2062     AsyncServerSocket::UniquePtr serverSocket(
2063         new AsyncServerSocket(&eventBase));
2064     serverSocket->useExistingSocket(fd);
2065     serverSocket->listen(16);
2066
2067     // Make sure AsyncServerSocket reports the same address that we bound to
2068     folly::SocketAddress serverSocketAddress;
2069     serverSocket->getAddress(&serverSocketAddress);
2070     CHECK_EQ(boundAddress, serverSocketAddress);
2071
2072     // Make sure the socket works
2073     serverSocketSanityTest(serverSocket.get());
2074   }
2075
2076   // Test creating a socket, binding and listening manually,
2077   // then giving it to AsyncServerSocket
2078   {
2079     // Manually create a socket
2080     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2081     ASSERT_GE(fd, 0);
2082     // bind
2083     struct sockaddr_in addr;
2084     addr.sin_family = AF_INET;
2085     addr.sin_port = 0;
2086     addr.sin_addr.s_addr = INADDR_ANY;
2087     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2088                              sizeof(addr)), 0);
2089     // Look up the address that we bound to
2090     folly::SocketAddress boundAddress;
2091     boundAddress.setFromLocalAddress(fd);
2092     // listen
2093     CHECK_EQ(listen(fd, 16), 0);
2094
2095     // Create a server socket
2096     AsyncServerSocket::UniquePtr serverSocket(
2097         new AsyncServerSocket(&eventBase));
2098     serverSocket->useExistingSocket(fd);
2099
2100     // Make sure AsyncServerSocket reports the same address that we bound to
2101     folly::SocketAddress serverSocketAddress;
2102     serverSocket->getAddress(&serverSocketAddress);
2103     CHECK_EQ(boundAddress, serverSocketAddress);
2104
2105     // Make sure the socket works
2106     serverSocketSanityTest(serverSocket.get());
2107   }
2108 }
2109
2110 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2111   EventBase eventBase;
2112
2113   // Create a server socket
2114   std::shared_ptr<AsyncServerSocket> serverSocket(
2115       AsyncServerSocket::newSocket(&eventBase));
2116   string path(1, 0);
2117   path.append("/anonymous");
2118   folly::SocketAddress serverAddress;
2119   serverAddress.setFromPath(path);
2120   serverSocket->bind(serverAddress);
2121   serverSocket->listen(16);
2122
2123   // Add a callback to accept one connection then stop the loop
2124   TestAcceptCallback acceptCallback;
2125   acceptCallback.setConnectionAcceptedFn(
2126     [&](int fd, const folly::SocketAddress& addr) {
2127       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2128     });
2129   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
2130     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2131   });
2132   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2133   serverSocket->startAccepting();
2134
2135   // Connect to the server socket
2136   std::shared_ptr<AsyncSocket> socket(
2137       AsyncSocket::newSocket(&eventBase, serverAddress));
2138
2139   eventBase.loop();
2140
2141   // Verify that the server accepted a connection
2142   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2143   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2144                     TestAcceptCallback::TYPE_START);
2145   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2146                     TestAcceptCallback::TYPE_ACCEPT);
2147   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2148                     TestAcceptCallback::TYPE_STOP);
2149   int fd = acceptCallback.getEvents()->at(1).fd;
2150
2151   // The accepted connection should already be in non-blocking mode
2152   int flags = fcntl(fd, F_GETFL, 0);
2153   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2154 }
2155
2156 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2157   EventBase eventBase;
2158   TestConnectionEventCallback connectionEventCallback;
2159
2160   // Create a server socket
2161   std::shared_ptr<AsyncServerSocket> serverSocket(
2162       AsyncServerSocket::newSocket(&eventBase));
2163   serverSocket->setConnectionEventCallback(&connectionEventCallback);
2164   serverSocket->bind(0);
2165   serverSocket->listen(16);
2166   folly::SocketAddress serverAddress;
2167   serverSocket->getAddress(&serverAddress);
2168
2169   // Add a callback to accept one connection then stop the loop
2170   TestAcceptCallback acceptCallback;
2171   acceptCallback.setConnectionAcceptedFn(
2172     [&](int fd, const folly::SocketAddress& addr) {
2173       serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2174     });
2175   acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
2176     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2177   });
2178   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2179   serverSocket->startAccepting();
2180
2181   // Connect to the server socket
2182   std::shared_ptr<AsyncSocket> socket(
2183       AsyncSocket::newSocket(&eventBase, serverAddress));
2184
2185   eventBase.loop();
2186
2187   // Validate the connection event counters
2188   ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2189   ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2190   ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2191   ASSERT_EQ(
2192       connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2193   ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2194   ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2195   ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2196   ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2197 }