Switch uses of networking headers to <folly/portability/Sockets.h>
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/RWSpinLock.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/Random.h>
23
24 #include <folly/io/IOBuf.h>
25 #include <folly/io/async/test/AsyncSocketTest.h>
26 #include <folly/io/async/test/Util.h>
27 #include <folly/portability/Sockets.h>
28 #include <folly/portability/Unistd.h>
29 #include <folly/test/SocketAddressTestHelper.h>
30
31 #include <gtest/gtest.h>
32 #include <boost/scoped_array.hpp>
33 #include <iostream>
34 #include <fcntl.h>
35 #include <sys/types.h>
36 #include <thread>
37
38 using namespace boost;
39
40 using std::string;
41 using std::vector;
42 using std::min;
43 using std::cerr;
44 using std::endl;
45 using std::unique_ptr;
46 using std::chrono::milliseconds;
47 using boost::scoped_array;
48
49 using namespace folly;
50
51 class DelayedWrite: public AsyncTimeout {
52  public:
53   DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
54       unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
55       bool cork, bool lastWrite = false):
56     AsyncTimeout(socket->getEventBase()),
57     socket_(socket),
58     bufs_(std::move(bufs)),
59     wcb_(wcb),
60     cork_(cork),
61     lastWrite_(lastWrite) {}
62
63  private:
64   void timeoutExpired() noexcept override {
65     WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
66     socket_->writeChain(wcb_, std::move(bufs_), flags);
67     if (lastWrite_) {
68       socket_->shutdownWrite();
69     }
70   }
71
72   std::shared_ptr<AsyncSocket> socket_;
73   unique_ptr<IOBuf> bufs_;
74   AsyncTransportWrapper::WriteCallback* wcb_;
75   bool cork_;
76   bool lastWrite_;
77 };
78
79 ///////////////////////////////////////////////////////////////////////////
80 // connect() tests
81 ///////////////////////////////////////////////////////////////////////////
82
83 /**
84  * Test connecting to a server
85  */
86 TEST(AsyncSocketTest, Connect) {
87   // Start listening on a local port
88   TestServer server;
89
90   // Connect using a AsyncSocket
91   EventBase evb;
92   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
93   ConnCallback cb;
94   socket->connect(&cb, server.getAddress(), 30);
95
96   evb.loop();
97
98   CHECK_EQ(cb.state, STATE_SUCCEEDED);
99   EXPECT_LE(0, socket->getConnectTime().count());
100   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
101 }
102
103 /**
104  * Test connecting to a server that isn't listening
105  */
106 TEST(AsyncSocketTest, ConnectRefused) {
107   EventBase evb;
108
109   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
110
111   // Hopefully nothing is actually listening on this address
112   folly::SocketAddress addr("127.0.0.1", 65535);
113   ConnCallback cb;
114   socket->connect(&cb, addr, 30);
115
116   evb.loop();
117
118   CHECK_EQ(cb.state, STATE_FAILED);
119   CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
120   EXPECT_LE(0, socket->getConnectTime().count());
121   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
122 }
123
124 /**
125  * Test connection timeout
126  */
127 TEST(AsyncSocketTest, ConnectTimeout) {
128   EventBase evb;
129
130   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
131
132   // Try connecting to server that won't respond.
133   //
134   // This depends somewhat on the network where this test is run.
135   // Hopefully this IP will be routable but unresponsive.
136   // (Alternatively, we could try listening on a local raw socket, but that
137   // normally requires root privileges.)
138   auto host =
139       SocketAddressTestHelper::isIPv6Enabled() ?
140       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
141       SocketAddressTestHelper::isIPv4Enabled() ?
142       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
143       nullptr;
144   SocketAddress addr(host, 65535);
145   ConnCallback cb;
146   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
147
148   evb.loop();
149
150   CHECK_EQ(cb.state, STATE_FAILED);
151   CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
152
153   // Verify that we can still get the peer address after a timeout.
154   // Use case is if the client was created from a client pool, and we want
155   // to log which peer failed.
156   folly::SocketAddress peer;
157   socket->getPeerAddress(&peer);
158   CHECK_EQ(peer, addr);
159   EXPECT_LE(0, socket->getConnectTime().count());
160   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
161 }
162
163 /**
164  * Test writing immediately after connecting, without waiting for connect
165  * to finish.
166  */
167 TEST(AsyncSocketTest, ConnectAndWrite) {
168   TestServer server;
169
170   // connect()
171   EventBase evb;
172   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
173   ConnCallback ccb;
174   socket->connect(&ccb, server.getAddress(), 30);
175
176   // write()
177   char buf[128];
178   memset(buf, 'a', sizeof(buf));
179   WriteCallback wcb;
180   socket->write(&wcb, buf, sizeof(buf));
181
182   // Loop.  We don't bother accepting on the server socket yet.
183   // The kernel should be able to buffer the write request so it can succeed.
184   evb.loop();
185
186   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
187   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
188
189   // Make sure the server got a connection and received the data
190   socket->close();
191   server.verifyConnection(buf, sizeof(buf));
192
193   ASSERT_TRUE(socket->isClosedBySelf());
194   ASSERT_FALSE(socket->isClosedByPeer());
195   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
196 }
197
198 /**
199  * Test connecting using a nullptr connect callback.
200  */
201 TEST(AsyncSocketTest, ConnectNullCallback) {
202   TestServer server;
203
204   // connect()
205   EventBase evb;
206   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
207   socket->connect(nullptr, server.getAddress(), 30);
208
209   // write some data, just so we have some way of verifing
210   // that the socket works correctly after connecting
211   char buf[128];
212   memset(buf, 'a', sizeof(buf));
213   WriteCallback wcb;
214   socket->write(&wcb, buf, sizeof(buf));
215
216   evb.loop();
217
218   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
219
220   // Make sure the server got a connection and received the data
221   socket->close();
222   server.verifyConnection(buf, sizeof(buf));
223
224   ASSERT_TRUE(socket->isClosedBySelf());
225   ASSERT_FALSE(socket->isClosedByPeer());
226 }
227
228 /**
229  * Test calling both write() and close() immediately after connecting, without
230  * waiting for connect to finish.
231  *
232  * This exercises the STATE_CONNECTING_CLOSING code.
233  */
234 TEST(AsyncSocketTest, ConnectWriteAndClose) {
235   TestServer server;
236
237   // connect()
238   EventBase evb;
239   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
240   ConnCallback ccb;
241   socket->connect(&ccb, server.getAddress(), 30);
242
243   // write()
244   char buf[128];
245   memset(buf, 'a', sizeof(buf));
246   WriteCallback wcb;
247   socket->write(&wcb, buf, sizeof(buf));
248
249   // close()
250   socket->close();
251
252   // Loop.  We don't bother accepting on the server socket yet.
253   // The kernel should be able to buffer the write request so it can succeed.
254   evb.loop();
255
256   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
257   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
258
259   // Make sure the server got a connection and received the data
260   server.verifyConnection(buf, sizeof(buf));
261
262   ASSERT_TRUE(socket->isClosedBySelf());
263   ASSERT_FALSE(socket->isClosedByPeer());
264 }
265
266 /**
267  * Test calling close() immediately after connect()
268  */
269 TEST(AsyncSocketTest, ConnectAndClose) {
270   TestServer server;
271
272   // Connect using a AsyncSocket
273   EventBase evb;
274   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
275   ConnCallback ccb;
276   socket->connect(&ccb, server.getAddress(), 30);
277
278   // Hopefully the connect didn't succeed immediately.
279   // If it did, we can't exercise the close-while-connecting code path.
280   if (ccb.state == STATE_SUCCEEDED) {
281     LOG(INFO) << "connect() succeeded immediately; aborting test "
282                        "of close-during-connect behavior";
283     return;
284   }
285
286   socket->close();
287
288   // Loop, although there shouldn't be anything to do.
289   evb.loop();
290
291   // Make sure the connection was aborted
292   CHECK_EQ(ccb.state, STATE_FAILED);
293
294   ASSERT_TRUE(socket->isClosedBySelf());
295   ASSERT_FALSE(socket->isClosedByPeer());
296 }
297
298 /**
299  * Test calling closeNow() immediately after connect()
300  *
301  * This should be identical to the normal close behavior.
302  */
303 TEST(AsyncSocketTest, ConnectAndCloseNow) {
304   TestServer server;
305
306   // Connect using a AsyncSocket
307   EventBase evb;
308   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
309   ConnCallback ccb;
310   socket->connect(&ccb, server.getAddress(), 30);
311
312   // Hopefully the connect didn't succeed immediately.
313   // If it did, we can't exercise the close-while-connecting code path.
314   if (ccb.state == STATE_SUCCEEDED) {
315     LOG(INFO) << "connect() succeeded immediately; aborting test "
316                        "of closeNow()-during-connect behavior";
317     return;
318   }
319
320   socket->closeNow();
321
322   // Loop, although there shouldn't be anything to do.
323   evb.loop();
324
325   // Make sure the connection was aborted
326   CHECK_EQ(ccb.state, STATE_FAILED);
327
328   ASSERT_TRUE(socket->isClosedBySelf());
329   ASSERT_FALSE(socket->isClosedByPeer());
330 }
331
332 /**
333  * Test calling both write() and closeNow() immediately after connecting,
334  * without waiting for connect to finish.
335  *
336  * This should abort the pending write.
337  */
338 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
339   TestServer server;
340
341   // connect()
342   EventBase evb;
343   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
344   ConnCallback ccb;
345   socket->connect(&ccb, server.getAddress(), 30);
346
347   // Hopefully the connect didn't succeed immediately.
348   // If it did, we can't exercise the close-while-connecting code path.
349   if (ccb.state == STATE_SUCCEEDED) {
350     LOG(INFO) << "connect() succeeded immediately; aborting test "
351                        "of write-during-connect behavior";
352     return;
353   }
354
355   // write()
356   char buf[128];
357   memset(buf, 'a', sizeof(buf));
358   WriteCallback wcb;
359   socket->write(&wcb, buf, sizeof(buf));
360
361   // close()
362   socket->closeNow();
363
364   // Loop, although there shouldn't be anything to do.
365   evb.loop();
366
367   CHECK_EQ(ccb.state, STATE_FAILED);
368   CHECK_EQ(wcb.state, STATE_FAILED);
369
370   ASSERT_TRUE(socket->isClosedBySelf());
371   ASSERT_FALSE(socket->isClosedByPeer());
372 }
373
374 /**
375  * Test installing a read callback immediately, before connect() finishes.
376  */
377 TEST(AsyncSocketTest, ConnectAndRead) {
378   TestServer server;
379
380   // connect()
381   EventBase evb;
382   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
383   ConnCallback ccb;
384   socket->connect(&ccb, server.getAddress(), 30);
385
386   ReadCallback rcb;
387   socket->setReadCB(&rcb);
388
389   // Even though we haven't looped yet, we should be able to accept
390   // the connection and send data to it.
391   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
392   uint8_t buf[128];
393   memset(buf, 'a', sizeof(buf));
394   acceptedSocket->write(buf, sizeof(buf));
395   acceptedSocket->flush();
396   acceptedSocket->close();
397
398   // Loop, although there shouldn't be anything to do.
399   evb.loop();
400
401   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
402   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
403   CHECK_EQ(rcb.buffers.size(), 1);
404   CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
405   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
406
407   ASSERT_FALSE(socket->isClosedBySelf());
408   ASSERT_FALSE(socket->isClosedByPeer());
409 }
410
411 /**
412  * Test installing a read callback and then closing immediately before the
413  * connect attempt finishes.
414  */
415 TEST(AsyncSocketTest, ConnectReadAndClose) {
416   TestServer server;
417
418   // connect()
419   EventBase evb;
420   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
421   ConnCallback ccb;
422   socket->connect(&ccb, server.getAddress(), 30);
423
424   // Hopefully the connect didn't succeed immediately.
425   // If it did, we can't exercise the close-while-connecting code path.
426   if (ccb.state == STATE_SUCCEEDED) {
427     LOG(INFO) << "connect() succeeded immediately; aborting test "
428                        "of read-during-connect behavior";
429     return;
430   }
431
432   ReadCallback rcb;
433   socket->setReadCB(&rcb);
434
435   // close()
436   socket->close();
437
438   // Loop, although there shouldn't be anything to do.
439   evb.loop();
440
441   CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
442   CHECK_EQ(rcb.buffers.size(), 0);
443   CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
444
445   ASSERT_TRUE(socket->isClosedBySelf());
446   ASSERT_FALSE(socket->isClosedByPeer());
447 }
448
449 /**
450  * Test both writing and installing a read callback immediately,
451  * before connect() finishes.
452  */
453 TEST(AsyncSocketTest, ConnectWriteAndRead) {
454   TestServer server;
455
456   // connect()
457   EventBase evb;
458   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
459   ConnCallback ccb;
460   socket->connect(&ccb, server.getAddress(), 30);
461
462   // write()
463   char buf1[128];
464   memset(buf1, 'a', sizeof(buf1));
465   WriteCallback wcb;
466   socket->write(&wcb, buf1, sizeof(buf1));
467
468   // set a read callback
469   ReadCallback rcb;
470   socket->setReadCB(&rcb);
471
472   // Even though we haven't looped yet, we should be able to accept
473   // the connection and send data to it.
474   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
475   uint8_t buf2[128];
476   memset(buf2, 'b', sizeof(buf2));
477   acceptedSocket->write(buf2, sizeof(buf2));
478   acceptedSocket->flush();
479
480   // shut down the write half of acceptedSocket, so that the AsyncSocket
481   // will stop reading and we can break out of the event loop.
482   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
483
484   // Loop
485   evb.loop();
486
487   // Make sure the connect succeeded
488   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
489
490   // Make sure the AsyncSocket read the data written by the accepted socket
491   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
492   CHECK_EQ(rcb.buffers.size(), 1);
493   CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
494   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
495
496   // Close the AsyncSocket so we'll see EOF on acceptedSocket
497   socket->close();
498
499   // Make sure the accepted socket saw the data written by the AsyncSocket
500   uint8_t readbuf[sizeof(buf1)];
501   acceptedSocket->readAll(readbuf, sizeof(readbuf));
502   CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
503   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
504   CHECK_EQ(bytesRead, 0);
505
506   ASSERT_FALSE(socket->isClosedBySelf());
507   ASSERT_TRUE(socket->isClosedByPeer());
508 }
509
510 /**
511  * Test writing to the socket then shutting down writes before the connect
512  * attempt finishes.
513  */
514 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
515   TestServer server;
516
517   // connect()
518   EventBase evb;
519   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
520   ConnCallback ccb;
521   socket->connect(&ccb, server.getAddress(), 30);
522
523   // Hopefully the connect didn't succeed immediately.
524   // If it did, we can't exercise the write-while-connecting code path.
525   if (ccb.state == STATE_SUCCEEDED) {
526     LOG(INFO) << "connect() succeeded immediately; skipping test";
527     return;
528   }
529
530   // Ask to write some data
531   char wbuf[128];
532   memset(wbuf, 'a', sizeof(wbuf));
533   WriteCallback wcb;
534   socket->write(&wcb, wbuf, sizeof(wbuf));
535   socket->shutdownWrite();
536
537   // Shutdown writes
538   socket->shutdownWrite();
539
540   // Even though we haven't looped yet, we should be able to accept
541   // the connection.
542   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
543
544   // Since the connection is still in progress, there should be no data to
545   // read yet.  Verify that the accepted socket is not readable.
546   struct pollfd fds[1];
547   fds[0].fd = acceptedSocket->getSocketFD();
548   fds[0].events = POLLIN;
549   fds[0].revents = 0;
550   int rc = poll(fds, 1, 0);
551   CHECK_EQ(rc, 0);
552
553   // Write data to the accepted socket
554   uint8_t acceptedWbuf[192];
555   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
556   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
557   acceptedSocket->flush();
558
559   // Loop
560   evb.loop();
561
562   // The loop should have completed the connection, written the queued data,
563   // and shutdown writes on the socket.
564   //
565   // Check that the connection was completed successfully and that the write
566   // callback succeeded.
567   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
568   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
569
570   // Check that we can read the data that was written to the socket, and that
571   // we see an EOF, since its socket was half-shutdown.
572   uint8_t readbuf[sizeof(wbuf)];
573   acceptedSocket->readAll(readbuf, sizeof(readbuf));
574   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
575   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
576   CHECK_EQ(bytesRead, 0);
577
578   // Close the accepted socket.  This will cause it to see EOF
579   // and uninstall the read callback when we loop next.
580   acceptedSocket->close();
581
582   // Install a read callback, then loop again.
583   ReadCallback rcb;
584   socket->setReadCB(&rcb);
585   evb.loop();
586
587   // This loop should have read the data and seen the EOF
588   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
589   CHECK_EQ(rcb.buffers.size(), 1);
590   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
591   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
592                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
593
594   ASSERT_FALSE(socket->isClosedBySelf());
595   ASSERT_FALSE(socket->isClosedByPeer());
596 }
597
598 /**
599  * Test reading, writing, and shutting down writes before the connect attempt
600  * finishes.
601  */
602 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
603   TestServer server;
604
605   // connect()
606   EventBase evb;
607   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
608   ConnCallback ccb;
609   socket->connect(&ccb, server.getAddress(), 30);
610
611   // Hopefully the connect didn't succeed immediately.
612   // If it did, we can't exercise the write-while-connecting code path.
613   if (ccb.state == STATE_SUCCEEDED) {
614     LOG(INFO) << "connect() succeeded immediately; skipping test";
615     return;
616   }
617
618   // Install a read callback
619   ReadCallback rcb;
620   socket->setReadCB(&rcb);
621
622   // Ask to write some data
623   char wbuf[128];
624   memset(wbuf, 'a', sizeof(wbuf));
625   WriteCallback wcb;
626   socket->write(&wcb, wbuf, sizeof(wbuf));
627
628   // Shutdown writes
629   socket->shutdownWrite();
630
631   // Even though we haven't looped yet, we should be able to accept
632   // the connection.
633   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
634
635   // Since the connection is still in progress, there should be no data to
636   // read yet.  Verify that the accepted socket is not readable.
637   struct pollfd fds[1];
638   fds[0].fd = acceptedSocket->getSocketFD();
639   fds[0].events = POLLIN;
640   fds[0].revents = 0;
641   int rc = poll(fds, 1, 0);
642   CHECK_EQ(rc, 0);
643
644   // Write data to the accepted socket
645   uint8_t acceptedWbuf[192];
646   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
647   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
648   acceptedSocket->flush();
649   // Shutdown writes to the accepted socket.  This will cause it to see EOF
650   // and uninstall the read callback.
651   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
652
653   // Loop
654   evb.loop();
655
656   // The loop should have completed the connection, written the queued data,
657   // shutdown writes on the socket, read the data we wrote to it, and see the
658   // EOF.
659   //
660   // Check that the connection was completed successfully and that the read
661   // and write callbacks were invoked as expected.
662   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
663   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
664   CHECK_EQ(rcb.buffers.size(), 1);
665   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
666   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
667                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
668   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
669
670   // Check that we can read the data that was written to the socket, and that
671   // we see an EOF, since its socket was half-shutdown.
672   uint8_t readbuf[sizeof(wbuf)];
673   acceptedSocket->readAll(readbuf, sizeof(readbuf));
674   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
675   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
676   CHECK_EQ(bytesRead, 0);
677
678   // Fully close both sockets
679   acceptedSocket->close();
680   socket->close();
681
682   ASSERT_FALSE(socket->isClosedBySelf());
683   ASSERT_TRUE(socket->isClosedByPeer());
684 }
685
686 /**
687  * Test reading, writing, and calling shutdownWriteNow() before the
688  * connect attempt finishes.
689  */
690 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
691   TestServer server;
692
693   // connect()
694   EventBase evb;
695   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
696   ConnCallback ccb;
697   socket->connect(&ccb, server.getAddress(), 30);
698
699   // Hopefully the connect didn't succeed immediately.
700   // If it did, we can't exercise the write-while-connecting code path.
701   if (ccb.state == STATE_SUCCEEDED) {
702     LOG(INFO) << "connect() succeeded immediately; skipping test";
703     return;
704   }
705
706   // Install a read callback
707   ReadCallback rcb;
708   socket->setReadCB(&rcb);
709
710   // Ask to write some data
711   char wbuf[128];
712   memset(wbuf, 'a', sizeof(wbuf));
713   WriteCallback wcb;
714   socket->write(&wcb, wbuf, sizeof(wbuf));
715
716   // Shutdown writes immediately.
717   // This should immediately discard the data that we just tried to write.
718   socket->shutdownWriteNow();
719
720   // Verify that writeError() was invoked on the write callback.
721   CHECK_EQ(wcb.state, STATE_FAILED);
722   CHECK_EQ(wcb.bytesWritten, 0);
723
724   // Even though we haven't looped yet, we should be able to accept
725   // the connection.
726   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
727
728   // Since the connection is still in progress, there should be no data to
729   // read yet.  Verify that the accepted socket is not readable.
730   struct pollfd fds[1];
731   fds[0].fd = acceptedSocket->getSocketFD();
732   fds[0].events = POLLIN;
733   fds[0].revents = 0;
734   int rc = poll(fds, 1, 0);
735   CHECK_EQ(rc, 0);
736
737   // Write data to the accepted socket
738   uint8_t acceptedWbuf[192];
739   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
740   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
741   acceptedSocket->flush();
742   // Shutdown writes to the accepted socket.  This will cause it to see EOF
743   // and uninstall the read callback.
744   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
745
746   // Loop
747   evb.loop();
748
749   // The loop should have completed the connection, written the queued data,
750   // shutdown writes on the socket, read the data we wrote to it, and see the
751   // EOF.
752   //
753   // Check that the connection was completed successfully and that the read
754   // callback was invoked as expected.
755   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
756   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
757   CHECK_EQ(rcb.buffers.size(), 1);
758   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
759   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
760                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
761
762   // Since we used shutdownWriteNow(), it should have discarded all pending
763   // write data.  Verify we see an immediate EOF when reading from the accepted
764   // socket.
765   uint8_t readbuf[sizeof(wbuf)];
766   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
767   CHECK_EQ(bytesRead, 0);
768
769   // Fully close both sockets
770   acceptedSocket->close();
771   socket->close();
772
773   ASSERT_FALSE(socket->isClosedBySelf());
774   ASSERT_TRUE(socket->isClosedByPeer());
775 }
776
777 // Helper function for use in testConnectOptWrite()
778 // Temporarily disable the read callback
779 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
780   // Uninstall the read callback
781   socket->setReadCB(nullptr);
782   // Schedule the read callback to be reinstalled after 1ms
783   socket->getEventBase()->runInLoop(
784       std::bind(&AsyncSocket::setReadCB, socket, rcb));
785 }
786
787 /**
788  * Test connect+write, then have the connect callback perform another write.
789  *
790  * This tests interaction of the optimistic writing after connect with
791  * additional write attempts that occur in the connect callback.
792  */
793 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
794   TestServer server;
795   EventBase evb;
796   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
797
798   // connect()
799   ConnCallback ccb;
800   socket->connect(&ccb, server.getAddress(), 30);
801
802   // Hopefully the connect didn't succeed immediately.
803   // If it did, we can't exercise the optimistic write code path.
804   if (ccb.state == STATE_SUCCEEDED) {
805     LOG(INFO) << "connect() succeeded immediately; aborting test "
806                        "of optimistic write behavior";
807     return;
808   }
809
810   // Tell the connect callback to perform a write when the connect succeeds
811   WriteCallback wcb2;
812   scoped_array<char> buf2(new char[size2]);
813   memset(buf2.get(), 'b', size2);
814   if (size2 > 0) {
815     ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
816     // Tell the second write callback to close the connection when it is done
817     wcb2.successCallback = [&] { socket->closeNow(); };
818   }
819
820   // Schedule one write() immediately, before the connect finishes
821   scoped_array<char> buf1(new char[size1]);
822   memset(buf1.get(), 'a', size1);
823   WriteCallback wcb1;
824   if (size1 > 0) {
825     socket->write(&wcb1, buf1.get(), size1);
826   }
827
828   if (close) {
829     // immediately perform a close, before connect() completes
830     socket->close();
831   }
832
833   // Start reading from the other endpoint after 10ms.
834   // If we're using large buffers, we have to read so that the writes don't
835   // block forever.
836   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
837   ReadCallback rcb;
838   rcb.dataAvailableCallback = std::bind(tmpDisableReads,
839                                         acceptedSocket.get(), &rcb);
840   socket->getEventBase()->tryRunAfterDelay(
841       std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
842       10);
843
844   // Loop.  We don't bother accepting on the server socket yet.
845   // The kernel should be able to buffer the write request so it can succeed.
846   evb.loop();
847
848   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
849   if (size1 > 0) {
850     CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
851   }
852   if (size2 > 0) {
853     CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
854   }
855
856   socket->close();
857
858   // Make sure the read callback received all of the data
859   size_t bytesRead = 0;
860   for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
861        it != rcb.buffers.end();
862        ++it) {
863     size_t start = bytesRead;
864     bytesRead += it->length;
865     size_t end = bytesRead;
866     if (start < size1) {
867       size_t cmpLen = min(size1, end) - start;
868       CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
869     }
870     if (end > size1 && end <= size1 + size2) {
871       size_t itOffset;
872       size_t buf2Offset;
873       size_t cmpLen;
874       if (start >= size1) {
875         itOffset = 0;
876         buf2Offset = start - size1;
877         cmpLen = end - start;
878       } else {
879         itOffset = size1 - start;
880         buf2Offset = 0;
881         cmpLen = end - size1;
882       }
883       CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
884                                cmpLen),
885                         0);
886     }
887   }
888   CHECK_EQ(bytesRead, size1 + size2);
889 }
890
891 TEST(AsyncSocketTest, ConnectCallbackWrite) {
892   // Test using small writes that should both succeed immediately
893   testConnectOptWrite(100, 200);
894
895   // Test using a large buffer in the connect callback, that should block
896   const size_t largeSize = 8*1024*1024;
897   testConnectOptWrite(100, largeSize);
898
899   // Test using a large initial write
900   testConnectOptWrite(largeSize, 100);
901
902   // Test using two large buffers
903   testConnectOptWrite(largeSize, largeSize);
904
905   // Test a small write in the connect callback,
906   // but no immediate write before connect completes
907   testConnectOptWrite(0, 64);
908
909   // Test a large write in the connect callback,
910   // but no immediate write before connect completes
911   testConnectOptWrite(0, largeSize);
912
913   // Test connect, a small write, then immediately call close() before connect
914   // completes
915   testConnectOptWrite(211, 0, true);
916
917   // Test connect, a large immediate write (that will block), then immediately
918   // call close() before connect completes
919   testConnectOptWrite(largeSize, 0, true);
920 }
921
922 ///////////////////////////////////////////////////////////////////////////
923 // write() related tests
924 ///////////////////////////////////////////////////////////////////////////
925
926 /**
927  * Test writing using a nullptr callback
928  */
929 TEST(AsyncSocketTest, WriteNullCallback) {
930   TestServer server;
931
932   // connect()
933   EventBase evb;
934   std::shared_ptr<AsyncSocket> socket =
935     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
936   evb.loop(); // loop until the socket is connected
937
938   // write() with a nullptr callback
939   char buf[128];
940   memset(buf, 'a', sizeof(buf));
941   socket->write(nullptr, buf, sizeof(buf));
942
943   evb.loop(); // loop until the data is sent
944
945   // Make sure the server got a connection and received the data
946   socket->close();
947   server.verifyConnection(buf, sizeof(buf));
948
949   ASSERT_TRUE(socket->isClosedBySelf());
950   ASSERT_FALSE(socket->isClosedByPeer());
951 }
952
953 /**
954  * Test writing with a send timeout
955  */
956 TEST(AsyncSocketTest, WriteTimeout) {
957   TestServer server;
958
959   // connect()
960   EventBase evb;
961   std::shared_ptr<AsyncSocket> socket =
962     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
963   evb.loop(); // loop until the socket is connected
964
965   // write() a large chunk of data, with no-one on the other end reading
966   size_t writeLength = 8*1024*1024;
967   uint32_t timeout = 200;
968   socket->setSendTimeout(timeout);
969   scoped_array<char> buf(new char[writeLength]);
970   memset(buf.get(), 'a', writeLength);
971   WriteCallback wcb;
972   socket->write(&wcb, buf.get(), writeLength);
973
974   TimePoint start;
975   evb.loop();
976   TimePoint end;
977
978   // Make sure the write attempt timed out as requested
979   CHECK_EQ(wcb.state, STATE_FAILED);
980   CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
981
982   // Check that the write timed out within a reasonable period of time.
983   // We don't check for exactly the specified timeout, since AsyncSocket only
984   // times out when it hasn't made progress for that period of time.
985   //
986   // On linux, the first write sends a few hundred kb of data, then blocks for
987   // writability, and then unblocks again after 40ms and is able to write
988   // another smaller of data before blocking permanently.  Therefore it doesn't
989   // time out until 40ms + timeout.
990   //
991   // I haven't fully verified the cause of this, but I believe it probably
992   // occurs because the receiving end delays sending an ack for up to 40ms.
993   // (This is the default value for TCP_DELACK_MIN.)  Once the sender receives
994   // the ack, it can send some more data.  However, after that point the
995   // receiver's kernel buffer is full.  This 40ms delay happens even with
996   // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints.  However, the
997   // kernel may be automatically disabling TCP_QUICKACK after receiving some
998   // data.
999   //
1000   // For now, we simply check that the timeout occurred within 160ms of
1001   // the requested value.
1002   T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1003 }
1004
1005 /**
1006  * Test writing to a socket that the remote endpoint has closed
1007  */
1008 TEST(AsyncSocketTest, WritePipeError) {
1009   TestServer server;
1010
1011   // connect()
1012   EventBase evb;
1013   std::shared_ptr<AsyncSocket> socket =
1014     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1015   socket->setSendTimeout(1000);
1016   evb.loop(); // loop until the socket is connected
1017
1018   // accept and immediately close the socket
1019   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1020   acceptedSocket.reset();
1021
1022   // write() a large chunk of data
1023   size_t writeLength = 8*1024*1024;
1024   scoped_array<char> buf(new char[writeLength]);
1025   memset(buf.get(), 'a', writeLength);
1026   WriteCallback wcb;
1027   socket->write(&wcb, buf.get(), writeLength);
1028
1029   evb.loop();
1030
1031   // Make sure the write failed.
1032   // It would be nice if AsyncSocketException could convey the errno value,
1033   // so that we could check for EPIPE
1034   CHECK_EQ(wcb.state, STATE_FAILED);
1035   CHECK_EQ(wcb.exception.getType(),
1036                     AsyncSocketException::INTERNAL_ERROR);
1037
1038   ASSERT_FALSE(socket->isClosedBySelf());
1039   ASSERT_FALSE(socket->isClosedByPeer());
1040 }
1041
1042 /**
1043  * Test writing a mix of simple buffers and IOBufs
1044  */
1045 TEST(AsyncSocketTest, WriteIOBuf) {
1046   TestServer server;
1047
1048   // connect()
1049   EventBase evb;
1050   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1051   ConnCallback ccb;
1052   socket->connect(&ccb, server.getAddress(), 30);
1053
1054   // Accept the connection
1055   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1056   ReadCallback rcb;
1057   acceptedSocket->setReadCB(&rcb);
1058
1059   // Write a simple buffer to the socket
1060   size_t simpleBufLength = 5;
1061   char simpleBuf[simpleBufLength];
1062   memset(simpleBuf, 'a', simpleBufLength);
1063   WriteCallback wcb;
1064   socket->write(&wcb, simpleBuf, simpleBufLength);
1065
1066   // Write a single-element IOBuf chain
1067   size_t buf1Length = 7;
1068   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1069   memset(buf1->writableData(), 'b', buf1Length);
1070   buf1->append(buf1Length);
1071   unique_ptr<IOBuf> buf1Copy(buf1->clone());
1072   WriteCallback wcb2;
1073   socket->writeChain(&wcb2, std::move(buf1));
1074
1075   // Write a multiple-element IOBuf chain
1076   size_t buf2Length = 11;
1077   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1078   memset(buf2->writableData(), 'c', buf2Length);
1079   buf2->append(buf2Length);
1080   size_t buf3Length = 13;
1081   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1082   memset(buf3->writableData(), 'd', buf3Length);
1083   buf3->append(buf3Length);
1084   buf2->appendChain(std::move(buf3));
1085   unique_ptr<IOBuf> buf2Copy(buf2->clone());
1086   buf2Copy->coalesce();
1087   WriteCallback wcb3;
1088   socket->writeChain(&wcb3, std::move(buf2));
1089   socket->shutdownWrite();
1090
1091   // Let the reads and writes run to completion
1092   evb.loop();
1093
1094   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1095   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1096   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1097
1098   // Make sure the reader got the right data in the right order
1099   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1100   CHECK_EQ(rcb.buffers.size(), 1);
1101   CHECK_EQ(rcb.buffers[0].length,
1102       simpleBufLength + buf1Length + buf2Length + buf3Length);
1103   CHECK_EQ(
1104       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1105   CHECK_EQ(
1106       memcmp(rcb.buffers[0].buffer + simpleBufLength,
1107           buf1Copy->data(), buf1Copy->length()), 0);
1108   CHECK_EQ(
1109       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1110           buf2Copy->data(), buf2Copy->length()), 0);
1111
1112   acceptedSocket->close();
1113   socket->close();
1114
1115   ASSERT_TRUE(socket->isClosedBySelf());
1116   ASSERT_FALSE(socket->isClosedByPeer());
1117 }
1118
1119 TEST(AsyncSocketTest, WriteIOBufCorked) {
1120   TestServer server;
1121
1122   // connect()
1123   EventBase evb;
1124   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1125   ConnCallback ccb;
1126   socket->connect(&ccb, server.getAddress(), 30);
1127
1128   // Accept the connection
1129   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1130   ReadCallback rcb;
1131   acceptedSocket->setReadCB(&rcb);
1132
1133   // Do three writes, 100ms apart, with the "cork" flag set
1134   // on the second write.  The reader should see the first write
1135   // arrive by itself, followed by the second and third writes
1136   // arriving together.
1137   size_t buf1Length = 5;
1138   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1139   memset(buf1->writableData(), 'a', buf1Length);
1140   buf1->append(buf1Length);
1141   size_t buf2Length = 7;
1142   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1143   memset(buf2->writableData(), 'b', buf2Length);
1144   buf2->append(buf2Length);
1145   size_t buf3Length = 11;
1146   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1147   memset(buf3->writableData(), 'c', buf3Length);
1148   buf3->append(buf3Length);
1149   WriteCallback wcb1;
1150   socket->writeChain(&wcb1, std::move(buf1));
1151   WriteCallback wcb2;
1152   DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1153   write2.scheduleTimeout(100);
1154   WriteCallback wcb3;
1155   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1156   write3.scheduleTimeout(140);
1157
1158   evb.loop();
1159   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1160   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1161   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1162   if (wcb3.state != STATE_SUCCEEDED) {
1163     throw(wcb3.exception);
1164   }
1165   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1166
1167   // Make sure the reader got the data with the right grouping
1168   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1169   CHECK_EQ(rcb.buffers.size(), 2);
1170   CHECK_EQ(rcb.buffers[0].length, buf1Length);
1171   CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1172
1173   acceptedSocket->close();
1174   socket->close();
1175
1176   ASSERT_TRUE(socket->isClosedBySelf());
1177   ASSERT_FALSE(socket->isClosedByPeer());
1178 }
1179
1180 /**
1181  * Test performing a zero-length write
1182  */
1183 TEST(AsyncSocketTest, ZeroLengthWrite) {
1184   TestServer server;
1185
1186   // connect()
1187   EventBase evb;
1188   std::shared_ptr<AsyncSocket> socket =
1189     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1190   evb.loop(); // loop until the socket is connected
1191
1192   auto acceptedSocket = server.acceptAsync(&evb);
1193   ReadCallback rcb;
1194   acceptedSocket->setReadCB(&rcb);
1195
1196   size_t len1 = 1024*1024;
1197   size_t len2 = 1024*1024;
1198   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1199   memset(buf.get(), 'a', len1);
1200   memset(buf.get(), 'b', len2);
1201
1202   WriteCallback wcb1;
1203   WriteCallback wcb2;
1204   WriteCallback wcb3;
1205   WriteCallback wcb4;
1206   socket->write(&wcb1, buf.get(), 0);
1207   socket->write(&wcb2, buf.get(), len1);
1208   socket->write(&wcb3, buf.get() + len1, 0);
1209   socket->write(&wcb4, buf.get() + len1, len2);
1210   socket->close();
1211
1212   evb.loop(); // loop until the data is sent
1213
1214   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1215   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1216   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1217   CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1218   rcb.verifyData(buf.get(), len1 + len2);
1219
1220   ASSERT_TRUE(socket->isClosedBySelf());
1221   ASSERT_FALSE(socket->isClosedByPeer());
1222 }
1223
1224 TEST(AsyncSocketTest, ZeroLengthWritev) {
1225   TestServer server;
1226
1227   // connect()
1228   EventBase evb;
1229   std::shared_ptr<AsyncSocket> socket =
1230     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1231   evb.loop(); // loop until the socket is connected
1232
1233   auto acceptedSocket = server.acceptAsync(&evb);
1234   ReadCallback rcb;
1235   acceptedSocket->setReadCB(&rcb);
1236
1237   size_t len1 = 1024*1024;
1238   size_t len2 = 1024*1024;
1239   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1240   memset(buf.get(), 'a', len1);
1241   memset(buf.get(), 'b', len2);
1242
1243   WriteCallback wcb;
1244   size_t iovCount = 4;
1245   struct iovec iov[iovCount];
1246   iov[0].iov_base = buf.get();
1247   iov[0].iov_len = len1;
1248   iov[1].iov_base = buf.get() + len1;
1249   iov[1].iov_len = 0;
1250   iov[2].iov_base = buf.get() + len1;
1251   iov[2].iov_len = len2;
1252   iov[3].iov_base = buf.get() + len1 + len2;
1253   iov[3].iov_len = 0;
1254
1255   socket->writev(&wcb, iov, iovCount);
1256   socket->close();
1257   evb.loop(); // loop until the data is sent
1258
1259   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1260   rcb.verifyData(buf.get(), len1 + len2);
1261
1262   ASSERT_TRUE(socket->isClosedBySelf());
1263   ASSERT_FALSE(socket->isClosedByPeer());
1264 }
1265
1266 ///////////////////////////////////////////////////////////////////////////
1267 // close() related tests
1268 ///////////////////////////////////////////////////////////////////////////
1269
1270 /**
1271  * Test calling close() with pending writes when the socket is already closing.
1272  */
1273 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1274   TestServer server;
1275
1276   // connect()
1277   EventBase evb;
1278   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1279   ConnCallback ccb;
1280   socket->connect(&ccb, server.getAddress(), 30);
1281
1282   // accept the socket on the server side
1283   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1284
1285   // Loop to ensure the connect has completed
1286   evb.loop();
1287
1288   // Make sure we are connected
1289   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1290
1291   // Schedule pending writes, until several write attempts have blocked
1292   char buf[128];
1293   memset(buf, 'a', sizeof(buf));
1294   typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1295   WriteCallbackVector writeCallbacks;
1296
1297   writeCallbacks.reserve(5);
1298   while (writeCallbacks.size() < 5) {
1299     std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1300
1301     socket->write(wcb.get(), buf, sizeof(buf));
1302     if (wcb->state == STATE_SUCCEEDED) {
1303       // Succeeded immediately.  Keep performing more writes
1304       continue;
1305     }
1306
1307     // This write is blocked.
1308     // Have the write callback call close() when writeError() is invoked
1309     wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1310     writeCallbacks.push_back(wcb);
1311   }
1312
1313   // Call closeNow() to immediately fail the pending writes
1314   socket->closeNow();
1315
1316   // Make sure writeError() was invoked on all of the pending write callbacks
1317   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1318        it != writeCallbacks.end();
1319        ++it) {
1320     CHECK_EQ((*it)->state, STATE_FAILED);
1321   }
1322
1323   ASSERT_TRUE(socket->isClosedBySelf());
1324   ASSERT_FALSE(socket->isClosedByPeer());
1325 }
1326
1327 ///////////////////////////////////////////////////////////////////////////
1328 // ImmediateRead related tests
1329 ///////////////////////////////////////////////////////////////////////////
1330
1331 /* AsyncSocket use to verify immediate read works */
1332 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1333  public:
1334   bool immediateReadCalled = false;
1335   explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1336  protected:
1337   void checkForImmediateRead() noexcept override {
1338     immediateReadCalled = true;
1339     AsyncSocket::handleRead();
1340   }
1341 };
1342
1343 TEST(AsyncSocket, ConnectReadImmediateRead) {
1344   TestServer server;
1345
1346   const size_t maxBufferSz = 100;
1347   const size_t maxReadsPerEvent = 1;
1348   const size_t expectedDataSz = maxBufferSz * 3;
1349   char expectedData[expectedDataSz];
1350   memset(expectedData, 'j', expectedDataSz);
1351
1352   EventBase evb;
1353   ReadCallback rcb(maxBufferSz);
1354   AsyncSocketImmediateRead socket(&evb);
1355   socket.connect(nullptr, server.getAddress(), 30);
1356
1357   evb.loop(); // loop until the socket is connected
1358
1359   socket.setReadCB(&rcb);
1360   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1361   socket.immediateReadCalled = false;
1362
1363   auto acceptedSocket = server.acceptAsync(&evb);
1364
1365   ReadCallback rcbServer;
1366   WriteCallback wcbServer;
1367   rcbServer.dataAvailableCallback = [&]() {
1368     if (rcbServer.dataRead() == expectedDataSz) {
1369       // write back all data read
1370       rcbServer.verifyData(expectedData, expectedDataSz);
1371       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1372       acceptedSocket->close();
1373     }
1374   };
1375   acceptedSocket->setReadCB(&rcbServer);
1376
1377   // write data
1378   WriteCallback wcb1;
1379   socket.write(&wcb1, expectedData, expectedDataSz);
1380   evb.loop();
1381   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1382   rcb.verifyData(expectedData, expectedDataSz);
1383   CHECK_EQ(socket.immediateReadCalled, true);
1384
1385   ASSERT_FALSE(socket.isClosedBySelf());
1386   ASSERT_FALSE(socket.isClosedByPeer());
1387 }
1388
1389 TEST(AsyncSocket, ConnectReadUninstallRead) {
1390   TestServer server;
1391
1392   const size_t maxBufferSz = 100;
1393   const size_t maxReadsPerEvent = 1;
1394   const size_t expectedDataSz = maxBufferSz * 3;
1395   char expectedData[expectedDataSz];
1396   memset(expectedData, 'k', expectedDataSz);
1397
1398   EventBase evb;
1399   ReadCallback rcb(maxBufferSz);
1400   AsyncSocketImmediateRead socket(&evb);
1401   socket.connect(nullptr, server.getAddress(), 30);
1402
1403   evb.loop(); // loop until the socket is connected
1404
1405   socket.setReadCB(&rcb);
1406   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1407   socket.immediateReadCalled = false;
1408
1409   auto acceptedSocket = server.acceptAsync(&evb);
1410
1411   ReadCallback rcbServer;
1412   WriteCallback wcbServer;
1413   rcbServer.dataAvailableCallback = [&]() {
1414     if (rcbServer.dataRead() == expectedDataSz) {
1415       // write back all data read
1416       rcbServer.verifyData(expectedData, expectedDataSz);
1417       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1418       acceptedSocket->close();
1419     }
1420   };
1421   acceptedSocket->setReadCB(&rcbServer);
1422
1423   rcb.dataAvailableCallback = [&]() {
1424     // we read data and reset readCB
1425     socket.setReadCB(nullptr);
1426   };
1427
1428   // write data
1429   WriteCallback wcb;
1430   socket.write(&wcb, expectedData, expectedDataSz);
1431   evb.loop();
1432   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1433
1434   /* we shoud've only read maxBufferSz data since readCallback_
1435    * was reset in dataAvailableCallback */
1436   CHECK_EQ(rcb.dataRead(), maxBufferSz);
1437   CHECK_EQ(socket.immediateReadCalled, false);
1438
1439   ASSERT_FALSE(socket.isClosedBySelf());
1440   ASSERT_FALSE(socket.isClosedByPeer());
1441 }
1442
1443 // TODO:
1444 // - Test connect() and have the connect callback set the read callback
1445 // - Test connect() and have the connect callback unset the read callback
1446 // - Test reading/writing/closing/destroying the socket in the connect callback
1447 // - Test reading/writing/closing/destroying the socket in the read callback
1448 // - Test reading/writing/closing/destroying the socket in the write callback
1449 // - Test one-way shutdown behavior
1450 // - Test changing the EventBase
1451 //
1452 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1453 //   in connectSuccess(), readDataAvailable(), writeSuccess()
1454
1455
1456 ///////////////////////////////////////////////////////////////////////////
1457 // AsyncServerSocket tests
1458 ///////////////////////////////////////////////////////////////////////////
1459 namespace {
1460 /**
1461  * Helper ConnectionEventCallback class for the test code.
1462  * It maintains counters protected by a spin lock.
1463  */
1464 class TestConnectionEventCallback :
1465   public AsyncServerSocket::ConnectionEventCallback {
1466  public:
1467   virtual void onConnectionAccepted(
1468       const int /* socket */,
1469       const SocketAddress& /* addr */) noexcept override {
1470     folly::RWSpinLock::WriteHolder holder(spinLock_);
1471     connectionAccepted_++;
1472   }
1473
1474   virtual void onConnectionAcceptError(const int /* err */) noexcept override {
1475     folly::RWSpinLock::WriteHolder holder(spinLock_);
1476     connectionAcceptedError_++;
1477   }
1478
1479   virtual void onConnectionDropped(
1480       const int /* socket */,
1481       const SocketAddress& /* addr */) noexcept override {
1482     folly::RWSpinLock::WriteHolder holder(spinLock_);
1483     connectionDropped_++;
1484   }
1485
1486   virtual void onConnectionEnqueuedForAcceptorCallback(
1487       const int /* socket */,
1488       const SocketAddress& /* addr */) noexcept override {
1489     folly::RWSpinLock::WriteHolder holder(spinLock_);
1490     connectionEnqueuedForAcceptCallback_++;
1491   }
1492
1493   virtual void onConnectionDequeuedByAcceptorCallback(
1494       const int /* socket */,
1495       const SocketAddress& /* addr */) noexcept override {
1496     folly::RWSpinLock::WriteHolder holder(spinLock_);
1497     connectionDequeuedByAcceptCallback_++;
1498   }
1499
1500   virtual void onBackoffStarted() noexcept override {
1501     folly::RWSpinLock::WriteHolder holder(spinLock_);
1502     backoffStarted_++;
1503   }
1504
1505   virtual void onBackoffEnded() noexcept override {
1506     folly::RWSpinLock::WriteHolder holder(spinLock_);
1507     backoffEnded_++;
1508   }
1509
1510   virtual void onBackoffError() noexcept override {
1511     folly::RWSpinLock::WriteHolder holder(spinLock_);
1512     backoffError_++;
1513   }
1514
1515   unsigned int getConnectionAccepted() const {
1516     folly::RWSpinLock::ReadHolder holder(spinLock_);
1517     return connectionAccepted_;
1518   }
1519
1520   unsigned int getConnectionAcceptedError() const {
1521     folly::RWSpinLock::ReadHolder holder(spinLock_);
1522     return connectionAcceptedError_;
1523   }
1524
1525   unsigned int getConnectionDropped() const {
1526     folly::RWSpinLock::ReadHolder holder(spinLock_);
1527     return connectionDropped_;
1528   }
1529
1530   unsigned int getConnectionEnqueuedForAcceptCallback() const {
1531     folly::RWSpinLock::ReadHolder holder(spinLock_);
1532     return connectionEnqueuedForAcceptCallback_;
1533   }
1534
1535   unsigned int getConnectionDequeuedByAcceptCallback() const {
1536     folly::RWSpinLock::ReadHolder holder(spinLock_);
1537     return connectionDequeuedByAcceptCallback_;
1538   }
1539
1540   unsigned int getBackoffStarted() const {
1541     folly::RWSpinLock::ReadHolder holder(spinLock_);
1542     return backoffStarted_;
1543   }
1544
1545   unsigned int getBackoffEnded() const {
1546     folly::RWSpinLock::ReadHolder holder(spinLock_);
1547     return backoffEnded_;
1548   }
1549
1550   unsigned int getBackoffError() const {
1551     folly::RWSpinLock::ReadHolder holder(spinLock_);
1552     return backoffError_;
1553   }
1554
1555  private:
1556   mutable folly::RWSpinLock spinLock_;
1557   unsigned int connectionAccepted_{0};
1558   unsigned int connectionAcceptedError_{0};
1559   unsigned int connectionDropped_{0};
1560   unsigned int connectionEnqueuedForAcceptCallback_{0};
1561   unsigned int connectionDequeuedByAcceptCallback_{0};
1562   unsigned int backoffStarted_{0};
1563   unsigned int backoffEnded_{0};
1564   unsigned int backoffError_{0};
1565 };
1566
1567 /**
1568  * Helper AcceptCallback class for the test code
1569  * It records the callbacks that were invoked, and also supports calling
1570  * generic std::function objects in each callback.
1571  */
1572 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1573  public:
1574   enum EventType {
1575     TYPE_START,
1576     TYPE_ACCEPT,
1577     TYPE_ERROR,
1578     TYPE_STOP
1579   };
1580   struct EventInfo {
1581     EventInfo(int fd, const folly::SocketAddress& addr)
1582       : type(TYPE_ACCEPT),
1583         fd(fd),
1584         address(addr),
1585         errorMsg() {}
1586     explicit EventInfo(const std::string& msg)
1587       : type(TYPE_ERROR),
1588         fd(-1),
1589         address(),
1590         errorMsg(msg) {}
1591     explicit EventInfo(EventType et)
1592       : type(et),
1593         fd(-1),
1594         address(),
1595         errorMsg() {}
1596
1597     EventType type;
1598     int fd;  // valid for TYPE_ACCEPT
1599     folly::SocketAddress address;  // valid for TYPE_ACCEPT
1600     string errorMsg;  // valid for TYPE_ERROR
1601   };
1602   typedef std::deque<EventInfo> EventList;
1603
1604   TestAcceptCallback()
1605     : connectionAcceptedFn_(),
1606       acceptErrorFn_(),
1607       acceptStoppedFn_(),
1608       events_() {}
1609
1610   std::deque<EventInfo>* getEvents() {
1611     return &events_;
1612   }
1613
1614   void setConnectionAcceptedFn(
1615       const std::function<void(int, const folly::SocketAddress&)>& fn) {
1616     connectionAcceptedFn_ = fn;
1617   }
1618   void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1619     acceptErrorFn_ = fn;
1620   }
1621   void setAcceptStartedFn(const std::function<void()>& fn) {
1622     acceptStartedFn_ = fn;
1623   }
1624   void setAcceptStoppedFn(const std::function<void()>& fn) {
1625     acceptStoppedFn_ = fn;
1626   }
1627
1628   void connectionAccepted(
1629       int fd, const folly::SocketAddress& clientAddr) noexcept override {
1630     events_.emplace_back(fd, clientAddr);
1631
1632     if (connectionAcceptedFn_) {
1633       connectionAcceptedFn_(fd, clientAddr);
1634     }
1635   }
1636   void acceptError(const std::exception& ex) noexcept override {
1637     events_.emplace_back(ex.what());
1638
1639     if (acceptErrorFn_) {
1640       acceptErrorFn_(ex);
1641     }
1642   }
1643   void acceptStarted() noexcept override {
1644     events_.emplace_back(TYPE_START);
1645
1646     if (acceptStartedFn_) {
1647       acceptStartedFn_();
1648     }
1649   }
1650   void acceptStopped() noexcept override {
1651     events_.emplace_back(TYPE_STOP);
1652
1653     if (acceptStoppedFn_) {
1654       acceptStoppedFn_();
1655     }
1656   }
1657
1658  private:
1659   std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1660   std::function<void(const std::exception&)> acceptErrorFn_;
1661   std::function<void()> acceptStartedFn_;
1662   std::function<void()> acceptStoppedFn_;
1663
1664   std::deque<EventInfo> events_;
1665 };
1666 }
1667
1668 /**
1669  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1670  */
1671 TEST(AsyncSocketTest, ServerAcceptOptions) {
1672   EventBase eventBase;
1673
1674   // Create a server socket
1675   std::shared_ptr<AsyncServerSocket> serverSocket(
1676       AsyncServerSocket::newSocket(&eventBase));
1677   serverSocket->bind(0);
1678   serverSocket->listen(16);
1679   folly::SocketAddress serverAddress;
1680   serverSocket->getAddress(&serverAddress);
1681
1682   // Add a callback to accept one connection then stop the loop
1683   TestAcceptCallback acceptCallback;
1684   acceptCallback.setConnectionAcceptedFn(
1685       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1686         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1687       });
1688   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1689     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1690   });
1691   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1692   serverSocket->startAccepting();
1693
1694   // Connect to the server socket
1695   std::shared_ptr<AsyncSocket> socket(
1696       AsyncSocket::newSocket(&eventBase, serverAddress));
1697
1698   eventBase.loop();
1699
1700   // Verify that the server accepted a connection
1701   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1702   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1703                     TestAcceptCallback::TYPE_START);
1704   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1705                     TestAcceptCallback::TYPE_ACCEPT);
1706   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1707                     TestAcceptCallback::TYPE_STOP);
1708   int fd = acceptCallback.getEvents()->at(1).fd;
1709
1710   // The accepted connection should already be in non-blocking mode
1711   int flags = fcntl(fd, F_GETFL, 0);
1712   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1713
1714 #ifndef TCP_NOPUSH
1715   // The accepted connection should already have TCP_NODELAY set
1716   int value;
1717   socklen_t valueLength = sizeof(value);
1718   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1719   CHECK_EQ(rc, 0);
1720   CHECK_EQ(value, 1);
1721 #endif
1722 }
1723
1724 /**
1725  * Test AsyncServerSocket::removeAcceptCallback()
1726  */
1727 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1728   // Create a new AsyncServerSocket
1729   EventBase eventBase;
1730   std::shared_ptr<AsyncServerSocket> serverSocket(
1731       AsyncServerSocket::newSocket(&eventBase));
1732   serverSocket->bind(0);
1733   serverSocket->listen(16);
1734   folly::SocketAddress serverAddress;
1735   serverSocket->getAddress(&serverAddress);
1736
1737   // Add several accept callbacks
1738   TestAcceptCallback cb1;
1739   TestAcceptCallback cb2;
1740   TestAcceptCallback cb3;
1741   TestAcceptCallback cb4;
1742   TestAcceptCallback cb5;
1743   TestAcceptCallback cb6;
1744   TestAcceptCallback cb7;
1745
1746   // Test having callbacks remove other callbacks before them on the list,
1747   // after them on the list, or removing themselves.
1748   //
1749   // Have callback 2 remove callback 3 and callback 5 the first time it is
1750   // called.
1751   int cb2Count = 0;
1752   cb1.setConnectionAcceptedFn([&](int /* fd */,
1753                                   const folly::SocketAddress& /* addr */) {
1754     std::shared_ptr<AsyncSocket> sock2(
1755         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1756   });
1757   cb3.setConnectionAcceptedFn(
1758       [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1759   cb4.setConnectionAcceptedFn(
1760       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1761         std::shared_ptr<AsyncSocket> sock3(
1762             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1763       });
1764   cb5.setConnectionAcceptedFn(
1765       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1766         std::shared_ptr<AsyncSocket> sock5(
1767             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1768
1769       });
1770   cb2.setConnectionAcceptedFn(
1771       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1772         if (cb2Count == 0) {
1773           serverSocket->removeAcceptCallback(&cb3, nullptr);
1774           serverSocket->removeAcceptCallback(&cb5, nullptr);
1775         }
1776         ++cb2Count;
1777       });
1778   // Have callback 6 remove callback 4 the first time it is called,
1779   // and destroy the server socket the second time it is called
1780   int cb6Count = 0;
1781   cb6.setConnectionAcceptedFn(
1782       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1783         if (cb6Count == 0) {
1784           serverSocket->removeAcceptCallback(&cb4, nullptr);
1785           std::shared_ptr<AsyncSocket> sock6(
1786               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1787           std::shared_ptr<AsyncSocket> sock7(
1788               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1789           std::shared_ptr<AsyncSocket> sock8(
1790               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1791
1792         } else {
1793           serverSocket.reset();
1794         }
1795         ++cb6Count;
1796       });
1797   // Have callback 7 remove itself
1798   cb7.setConnectionAcceptedFn(
1799       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1800         serverSocket->removeAcceptCallback(&cb7, nullptr);
1801       });
1802
1803   serverSocket->addAcceptCallback(&cb1, nullptr);
1804   serverSocket->addAcceptCallback(&cb2, nullptr);
1805   serverSocket->addAcceptCallback(&cb3, nullptr);
1806   serverSocket->addAcceptCallback(&cb4, nullptr);
1807   serverSocket->addAcceptCallback(&cb5, nullptr);
1808   serverSocket->addAcceptCallback(&cb6, nullptr);
1809   serverSocket->addAcceptCallback(&cb7, nullptr);
1810   serverSocket->startAccepting();
1811
1812   // Make several connections to the socket
1813   std::shared_ptr<AsyncSocket> sock1(
1814       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1815   std::shared_ptr<AsyncSocket> sock4(
1816       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1817
1818   // Loop until we are stopped
1819   eventBase.loop();
1820
1821   // Check to make sure that the expected callbacks were invoked.
1822   //
1823   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1824   // the AcceptCallbacks in round-robin fashion, in the order that they were
1825   // added.  The code is implemented this way right now, but the API doesn't
1826   // explicitly require it be done this way.  If we change the code not to be
1827   // exactly round robin in the future, we can simplify the test checks here.
1828   // (We'll also need to update the termination code, since we expect cb6 to
1829   // get called twice to terminate the loop.)
1830   CHECK_EQ(cb1.getEvents()->size(), 4);
1831   CHECK_EQ(cb1.getEvents()->at(0).type,
1832                     TestAcceptCallback::TYPE_START);
1833   CHECK_EQ(cb1.getEvents()->at(1).type,
1834                     TestAcceptCallback::TYPE_ACCEPT);
1835   CHECK_EQ(cb1.getEvents()->at(2).type,
1836                     TestAcceptCallback::TYPE_ACCEPT);
1837   CHECK_EQ(cb1.getEvents()->at(3).type,
1838                     TestAcceptCallback::TYPE_STOP);
1839
1840   CHECK_EQ(cb2.getEvents()->size(), 4);
1841   CHECK_EQ(cb2.getEvents()->at(0).type,
1842                     TestAcceptCallback::TYPE_START);
1843   CHECK_EQ(cb2.getEvents()->at(1).type,
1844                     TestAcceptCallback::TYPE_ACCEPT);
1845   CHECK_EQ(cb2.getEvents()->at(2).type,
1846                     TestAcceptCallback::TYPE_ACCEPT);
1847   CHECK_EQ(cb2.getEvents()->at(3).type,
1848                     TestAcceptCallback::TYPE_STOP);
1849
1850   CHECK_EQ(cb3.getEvents()->size(), 2);
1851   CHECK_EQ(cb3.getEvents()->at(0).type,
1852                     TestAcceptCallback::TYPE_START);
1853   CHECK_EQ(cb3.getEvents()->at(1).type,
1854                     TestAcceptCallback::TYPE_STOP);
1855
1856   CHECK_EQ(cb4.getEvents()->size(), 3);
1857   CHECK_EQ(cb4.getEvents()->at(0).type,
1858                     TestAcceptCallback::TYPE_START);
1859   CHECK_EQ(cb4.getEvents()->at(1).type,
1860                     TestAcceptCallback::TYPE_ACCEPT);
1861   CHECK_EQ(cb4.getEvents()->at(2).type,
1862                     TestAcceptCallback::TYPE_STOP);
1863
1864   CHECK_EQ(cb5.getEvents()->size(), 2);
1865   CHECK_EQ(cb5.getEvents()->at(0).type,
1866                     TestAcceptCallback::TYPE_START);
1867   CHECK_EQ(cb5.getEvents()->at(1).type,
1868                     TestAcceptCallback::TYPE_STOP);
1869
1870   CHECK_EQ(cb6.getEvents()->size(), 4);
1871   CHECK_EQ(cb6.getEvents()->at(0).type,
1872                     TestAcceptCallback::TYPE_START);
1873   CHECK_EQ(cb6.getEvents()->at(1).type,
1874                     TestAcceptCallback::TYPE_ACCEPT);
1875   CHECK_EQ(cb6.getEvents()->at(2).type,
1876                     TestAcceptCallback::TYPE_ACCEPT);
1877   CHECK_EQ(cb6.getEvents()->at(3).type,
1878                     TestAcceptCallback::TYPE_STOP);
1879
1880   CHECK_EQ(cb7.getEvents()->size(), 3);
1881   CHECK_EQ(cb7.getEvents()->at(0).type,
1882                     TestAcceptCallback::TYPE_START);
1883   CHECK_EQ(cb7.getEvents()->at(1).type,
1884                     TestAcceptCallback::TYPE_ACCEPT);
1885   CHECK_EQ(cb7.getEvents()->at(2).type,
1886                     TestAcceptCallback::TYPE_STOP);
1887 }
1888
1889 /**
1890  * Test AsyncServerSocket::removeAcceptCallback()
1891  */
1892 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1893   // Create a new AsyncServerSocket
1894   EventBase eventBase;
1895   std::shared_ptr<AsyncServerSocket> serverSocket(
1896       AsyncServerSocket::newSocket(&eventBase));
1897   serverSocket->bind(0);
1898   serverSocket->listen(16);
1899   folly::SocketAddress serverAddress;
1900   serverSocket->getAddress(&serverAddress);
1901
1902   // Add several accept callbacks
1903   TestAcceptCallback cb1;
1904   auto thread_id = pthread_self();
1905   cb1.setAcceptStartedFn([&](){
1906     CHECK_NE(thread_id, pthread_self());
1907     thread_id = pthread_self();
1908   });
1909   cb1.setConnectionAcceptedFn(
1910       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1911         CHECK_EQ(thread_id, pthread_self());
1912         serverSocket->removeAcceptCallback(&cb1, nullptr);
1913       });
1914   cb1.setAcceptStoppedFn([&](){
1915     CHECK_EQ(thread_id, pthread_self());
1916   });
1917
1918   // Test having callbacks remove other callbacks before them on the list,
1919   serverSocket->addAcceptCallback(&cb1, nullptr);
1920   serverSocket->startAccepting();
1921
1922   // Make several connections to the socket
1923   std::shared_ptr<AsyncSocket> sock1(
1924       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1925
1926   // Loop in another thread
1927   auto other = std::thread([&](){
1928     eventBase.loop();
1929   });
1930   other.join();
1931
1932   // Check to make sure that the expected callbacks were invoked.
1933   //
1934   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1935   // the AcceptCallbacks in round-robin fashion, in the order that they were
1936   // added.  The code is implemented this way right now, but the API doesn't
1937   // explicitly require it be done this way.  If we change the code not to be
1938   // exactly round robin in the future, we can simplify the test checks here.
1939   // (We'll also need to update the termination code, since we expect cb6 to
1940   // get called twice to terminate the loop.)
1941   CHECK_EQ(cb1.getEvents()->size(), 3);
1942   CHECK_EQ(cb1.getEvents()->at(0).type,
1943                     TestAcceptCallback::TYPE_START);
1944   CHECK_EQ(cb1.getEvents()->at(1).type,
1945                     TestAcceptCallback::TYPE_ACCEPT);
1946   CHECK_EQ(cb1.getEvents()->at(2).type,
1947                     TestAcceptCallback::TYPE_STOP);
1948
1949 }
1950
1951 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1952   // Add a callback to accept one connection then stop accepting
1953   TestAcceptCallback acceptCallback;
1954   acceptCallback.setConnectionAcceptedFn(
1955       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1956         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1957       });
1958   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1959     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1960   });
1961   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1962   serverSocket->startAccepting();
1963
1964   // Connect to the server socket
1965   EventBase* eventBase = serverSocket->getEventBase();
1966   folly::SocketAddress serverAddress;
1967   serverSocket->getAddress(&serverAddress);
1968   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1969
1970   // Loop to process all events
1971   eventBase->loop();
1972
1973   // Verify that the server accepted a connection
1974   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1975   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1976                     TestAcceptCallback::TYPE_START);
1977   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1978                     TestAcceptCallback::TYPE_ACCEPT);
1979   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1980                     TestAcceptCallback::TYPE_STOP);
1981 }
1982
1983 /* Verify that we don't leak sockets if we are destroyed()
1984  * and there are still writes pending
1985  *
1986  * If destroy() only calls close() instead of closeNow(),
1987  * it would shutdown(writes) on the socket, but it would
1988  * never be close()'d, and the socket would leak
1989  */
1990 TEST(AsyncSocketTest, DestroyCloseTest) {
1991   TestServer server;
1992
1993   // connect()
1994   EventBase clientEB;
1995   EventBase serverEB;
1996   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1997   ConnCallback ccb;
1998   socket->connect(&ccb, server.getAddress(), 30);
1999
2000   // Accept the connection
2001   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
2002   ReadCallback rcb;
2003   acceptedSocket->setReadCB(&rcb);
2004
2005   // Write a large buffer to the socket that is larger than kernel buffer
2006   size_t simpleBufLength = 5000000;
2007   char* simpleBuf = new char[simpleBufLength];
2008   memset(simpleBuf, 'a', simpleBufLength);
2009   WriteCallback wcb;
2010
2011   // Let the reads and writes run to completion
2012   int fd = acceptedSocket->getFd();
2013
2014   acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2015   socket.reset();
2016   acceptedSocket.reset();
2017
2018   // Test that server socket was closed
2019   ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2020   CHECK_EQ(sz, -1);
2021   CHECK_EQ(errno, 9);
2022   delete[] simpleBuf;
2023 }
2024
2025 /**
2026  * Test AsyncServerSocket::useExistingSocket()
2027  */
2028 TEST(AsyncSocketTest, ServerExistingSocket) {
2029   EventBase eventBase;
2030
2031   // Test creating a socket, and letting AsyncServerSocket bind and listen
2032   {
2033     // Manually create a socket
2034     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2035     ASSERT_GE(fd, 0);
2036
2037     // Create a server socket
2038     AsyncServerSocket::UniquePtr serverSocket(
2039         new AsyncServerSocket(&eventBase));
2040     serverSocket->useExistingSocket(fd);
2041     folly::SocketAddress address;
2042     serverSocket->getAddress(&address);
2043     address.setPort(0);
2044     serverSocket->bind(address);
2045     serverSocket->listen(16);
2046
2047     // Make sure the socket works
2048     serverSocketSanityTest(serverSocket.get());
2049   }
2050
2051   // Test creating a socket and binding manually,
2052   // then letting AsyncServerSocket listen
2053   {
2054     // Manually create a socket
2055     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2056     ASSERT_GE(fd, 0);
2057     // bind
2058     struct sockaddr_in addr;
2059     addr.sin_family = AF_INET;
2060     addr.sin_port = 0;
2061     addr.sin_addr.s_addr = INADDR_ANY;
2062     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2063                              sizeof(addr)), 0);
2064     // Look up the address that we bound to
2065     folly::SocketAddress boundAddress;
2066     boundAddress.setFromLocalAddress(fd);
2067
2068     // Create a server socket
2069     AsyncServerSocket::UniquePtr serverSocket(
2070         new AsyncServerSocket(&eventBase));
2071     serverSocket->useExistingSocket(fd);
2072     serverSocket->listen(16);
2073
2074     // Make sure AsyncServerSocket reports the same address that we bound to
2075     folly::SocketAddress serverSocketAddress;
2076     serverSocket->getAddress(&serverSocketAddress);
2077     CHECK_EQ(boundAddress, serverSocketAddress);
2078
2079     // Make sure the socket works
2080     serverSocketSanityTest(serverSocket.get());
2081   }
2082
2083   // Test creating a socket, binding and listening manually,
2084   // then giving it to AsyncServerSocket
2085   {
2086     // Manually create a socket
2087     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2088     ASSERT_GE(fd, 0);
2089     // bind
2090     struct sockaddr_in addr;
2091     addr.sin_family = AF_INET;
2092     addr.sin_port = 0;
2093     addr.sin_addr.s_addr = INADDR_ANY;
2094     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2095                              sizeof(addr)), 0);
2096     // Look up the address that we bound to
2097     folly::SocketAddress boundAddress;
2098     boundAddress.setFromLocalAddress(fd);
2099     // listen
2100     CHECK_EQ(listen(fd, 16), 0);
2101
2102     // Create a server socket
2103     AsyncServerSocket::UniquePtr serverSocket(
2104         new AsyncServerSocket(&eventBase));
2105     serverSocket->useExistingSocket(fd);
2106
2107     // Make sure AsyncServerSocket reports the same address that we bound to
2108     folly::SocketAddress serverSocketAddress;
2109     serverSocket->getAddress(&serverSocketAddress);
2110     CHECK_EQ(boundAddress, serverSocketAddress);
2111
2112     // Make sure the socket works
2113     serverSocketSanityTest(serverSocket.get());
2114   }
2115 }
2116
2117 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2118   EventBase eventBase;
2119
2120   // Create a server socket
2121   std::shared_ptr<AsyncServerSocket> serverSocket(
2122       AsyncServerSocket::newSocket(&eventBase));
2123   string path(1, 0);
2124   path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2125   folly::SocketAddress serverAddress;
2126   serverAddress.setFromPath(path);
2127   serverSocket->bind(serverAddress);
2128   serverSocket->listen(16);
2129
2130   // Add a callback to accept one connection then stop the loop
2131   TestAcceptCallback acceptCallback;
2132   acceptCallback.setConnectionAcceptedFn(
2133       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2134         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2135       });
2136   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2137     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2138   });
2139   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2140   serverSocket->startAccepting();
2141
2142   // Connect to the server socket
2143   std::shared_ptr<AsyncSocket> socket(
2144       AsyncSocket::newSocket(&eventBase, serverAddress));
2145
2146   eventBase.loop();
2147
2148   // Verify that the server accepted a connection
2149   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2150   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2151                     TestAcceptCallback::TYPE_START);
2152   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2153                     TestAcceptCallback::TYPE_ACCEPT);
2154   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2155                     TestAcceptCallback::TYPE_STOP);
2156   int fd = acceptCallback.getEvents()->at(1).fd;
2157
2158   // The accepted connection should already be in non-blocking mode
2159   int flags = fcntl(fd, F_GETFL, 0);
2160   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2161 }
2162
2163 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2164   EventBase eventBase;
2165   TestConnectionEventCallback connectionEventCallback;
2166
2167   // Create a server socket
2168   std::shared_ptr<AsyncServerSocket> serverSocket(
2169       AsyncServerSocket::newSocket(&eventBase));
2170   serverSocket->setConnectionEventCallback(&connectionEventCallback);
2171   serverSocket->bind(0);
2172   serverSocket->listen(16);
2173   folly::SocketAddress serverAddress;
2174   serverSocket->getAddress(&serverAddress);
2175
2176   // Add a callback to accept one connection then stop the loop
2177   TestAcceptCallback acceptCallback;
2178   acceptCallback.setConnectionAcceptedFn(
2179       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2180         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2181       });
2182   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2183     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2184   });
2185   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2186   serverSocket->startAccepting();
2187
2188   // Connect to the server socket
2189   std::shared_ptr<AsyncSocket> socket(
2190       AsyncSocket::newSocket(&eventBase, serverAddress));
2191
2192   eventBase.loop();
2193
2194   // Validate the connection event counters
2195   ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2196   ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2197   ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2198   ASSERT_EQ(
2199       connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2200   ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2201   ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2202   ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2203   ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2204 }
2205
2206 /**
2207  * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2208  */
2209 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2210   EventBase eventBase;
2211
2212   // Counter of how many connections have been accepted
2213   int count = 0;
2214
2215   // Create a server socket
2216   auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2217   serverSocket->bind(0);
2218   serverSocket->listen(16);
2219   folly::SocketAddress serverAddress;
2220   serverSocket->getAddress(&serverAddress);
2221
2222   // Add a callback to accept connections
2223   TestAcceptCallback acceptCallback;
2224   acceptCallback.setConnectionAcceptedFn(
2225       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2226         count++;
2227         CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2228
2229         if (count == 4) {
2230           // all messages are processed, remove accept callback
2231           serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2232         }
2233       });
2234   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2235     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2236   });
2237   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2238   serverSocket->startAccepting();
2239
2240   // Connect to the server socket, 4 clients, there are 4 connections
2241   auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2242   auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2243   auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2244   auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2245
2246   eventBase.loop();
2247 }
2248
2249 /**
2250  * Test AsyncTransport::BufferCallback
2251  */
2252 TEST(AsyncSocketTest, BufferTest) {
2253   TestServer server;
2254
2255   EventBase evb;
2256   AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2257   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2258   ConnCallback ccb;
2259   socket->connect(&ccb, server.getAddress(), 30, option);
2260
2261   char buf[100 * 1024];
2262   memset(buf, 'c', sizeof(buf));
2263   WriteCallback wcb;
2264   BufferCallback bcb;
2265   socket->setBufferCallback(&bcb);
2266   socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2267
2268   evb.loop();
2269   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2270   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
2271
2272   ASSERT_TRUE(bcb.hasBuffered());
2273   ASSERT_TRUE(bcb.hasBufferCleared());
2274
2275   socket->close();
2276   server.verifyConnection(buf, sizeof(buf));
2277
2278   ASSERT_TRUE(socket->isClosedBySelf());
2279   ASSERT_FALSE(socket->isClosedByPeer());
2280 }
2281
2282 TEST(AsyncSocketTest, BufferCallbackKill) {
2283   TestServer server;
2284   EventBase evb;
2285   AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2286   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2287   ConnCallback ccb;
2288   socket->connect(&ccb, server.getAddress(), 30, option);
2289   evb.loopOnce();
2290
2291   char buf[100 * 1024];
2292   memset(buf, 'c', sizeof(buf));
2293   BufferCallback* bcb = new BufferCallback;
2294   socket->setBufferCallback(bcb);
2295   WriteCallback wcb;
2296   wcb.successCallback = [&] {
2297     ASSERT_TRUE(socket.unique());
2298     socket.reset();
2299   };
2300
2301   // This will trigger AsyncSocket::handleWrite,
2302   // which calls WriteCallback::writeSuccess,
2303   // which calls wcb.successCallback above,
2304   // which tries to delete socket
2305   // Then, the socket will also try to use this BufferCallback
2306   // And that should crash us, if there is no DestructorGuard on the stack
2307   socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2308
2309   evb.loop();
2310   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2311 }