Switch some assertions to std::thread rather than pthread
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/ExceptionWrapper.h>
17 #include <folly/RWSpinLock.h>
18 #include <folly/Random.h>
19 #include <folly/SocketAddress.h>
20 #include <folly/io/async/AsyncServerSocket.h>
21 #include <folly/io/async/AsyncSocket.h>
22 #include <folly/io/async/AsyncTimeout.h>
23 #include <folly/io/async/EventBase.h>
24
25 #include <folly/io/IOBuf.h>
26 #include <folly/io/async/test/AsyncSocketTest.h>
27 #include <folly/io/async/test/Util.h>
28 #include <folly/portability/Sockets.h>
29 #include <folly/portability/Unistd.h>
30 #include <folly/test/SocketAddressTestHelper.h>
31
32 #include <boost/scoped_array.hpp>
33 #include <fcntl.h>
34 #include <gmock/gmock.h>
35 #include <gtest/gtest.h>
36 #include <sys/types.h>
37 #include <iostream>
38 #include <thread>
39
40 using namespace boost;
41
42 using std::string;
43 using std::vector;
44 using std::min;
45 using std::cerr;
46 using std::endl;
47 using std::unique_ptr;
48 using std::chrono::milliseconds;
49 using boost::scoped_array;
50
51 using namespace folly;
52 using namespace testing;
53
54 class DelayedWrite: public AsyncTimeout {
55  public:
56   DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
57       unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
58       bool cork, bool lastWrite = false):
59     AsyncTimeout(socket->getEventBase()),
60     socket_(socket),
61     bufs_(std::move(bufs)),
62     wcb_(wcb),
63     cork_(cork),
64     lastWrite_(lastWrite) {}
65
66  private:
67   void timeoutExpired() noexcept override {
68     WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
69     socket_->writeChain(wcb_, std::move(bufs_), flags);
70     if (lastWrite_) {
71       socket_->shutdownWrite();
72     }
73   }
74
75   std::shared_ptr<AsyncSocket> socket_;
76   unique_ptr<IOBuf> bufs_;
77   AsyncTransportWrapper::WriteCallback* wcb_;
78   bool cork_;
79   bool lastWrite_;
80 };
81
82 ///////////////////////////////////////////////////////////////////////////
83 // connect() tests
84 ///////////////////////////////////////////////////////////////////////////
85
86 /**
87  * Test connecting to a server
88  */
89 TEST(AsyncSocketTest, Connect) {
90   // Start listening on a local port
91   TestServer server;
92
93   // Connect using a AsyncSocket
94   EventBase evb;
95   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
96   ConnCallback cb;
97   socket->connect(&cb, server.getAddress(), 30);
98
99   evb.loop();
100
101   CHECK_EQ(cb.state, STATE_SUCCEEDED);
102   EXPECT_LE(0, socket->getConnectTime().count());
103   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
104 }
105
106 enum class TFOState {
107   DISABLED,
108   ENABLED,
109 };
110
111 class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
112
113 std::vector<TFOState> getTestingValues() {
114   std::vector<TFOState> vals;
115   vals.emplace_back(TFOState::DISABLED);
116
117 #if FOLLY_ALLOW_TFO
118   vals.emplace_back(TFOState::ENABLED);
119 #endif
120   return vals;
121 }
122
123 INSTANTIATE_TEST_CASE_P(
124     ConnectTests,
125     AsyncSocketConnectTest,
126     ::testing::ValuesIn(getTestingValues()));
127
128 /**
129  * Test connecting to a server that isn't listening
130  */
131 TEST(AsyncSocketTest, ConnectRefused) {
132   EventBase evb;
133
134   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
135
136   // Hopefully nothing is actually listening on this address
137   folly::SocketAddress addr("127.0.0.1", 65535);
138   ConnCallback cb;
139   socket->connect(&cb, addr, 30);
140
141   evb.loop();
142
143   EXPECT_EQ(STATE_FAILED, cb.state);
144   EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
145   EXPECT_LE(0, socket->getConnectTime().count());
146   EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
147 }
148
149 /**
150  * Test connection timeout
151  */
152 TEST(AsyncSocketTest, ConnectTimeout) {
153   EventBase evb;
154
155   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
156
157   // Try connecting to server that won't respond.
158   //
159   // This depends somewhat on the network where this test is run.
160   // Hopefully this IP will be routable but unresponsive.
161   // (Alternatively, we could try listening on a local raw socket, but that
162   // normally requires root privileges.)
163   auto host =
164       SocketAddressTestHelper::isIPv6Enabled() ?
165       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
166       SocketAddressTestHelper::isIPv4Enabled() ?
167       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
168       nullptr;
169   SocketAddress addr(host, 65535);
170   ConnCallback cb;
171   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
172
173   evb.loop();
174
175   CHECK_EQ(cb.state, STATE_FAILED);
176   CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
177
178   // Verify that we can still get the peer address after a timeout.
179   // Use case is if the client was created from a client pool, and we want
180   // to log which peer failed.
181   folly::SocketAddress peer;
182   socket->getPeerAddress(&peer);
183   CHECK_EQ(peer, addr);
184   EXPECT_LE(0, socket->getConnectTime().count());
185   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
186 }
187
188 /**
189  * Test writing immediately after connecting, without waiting for connect
190  * to finish.
191  */
192 TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
193   TestServer server;
194
195   // connect()
196   EventBase evb;
197   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
198
199   if (GetParam() == TFOState::ENABLED) {
200     socket->enableTFO();
201   }
202
203   ConnCallback ccb;
204   socket->connect(&ccb, server.getAddress(), 30);
205
206   // write()
207   char buf[128];
208   memset(buf, 'a', sizeof(buf));
209   WriteCallback wcb;
210   socket->write(&wcb, buf, sizeof(buf));
211
212   // Loop.  We don't bother accepting on the server socket yet.
213   // The kernel should be able to buffer the write request so it can succeed.
214   evb.loop();
215
216   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
217   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
218
219   // Make sure the server got a connection and received the data
220   socket->close();
221   server.verifyConnection(buf, sizeof(buf));
222
223   ASSERT_TRUE(socket->isClosedBySelf());
224   ASSERT_FALSE(socket->isClosedByPeer());
225   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
226 }
227
228 /**
229  * Test connecting using a nullptr connect callback.
230  */
231 TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
232   TestServer server;
233
234   // connect()
235   EventBase evb;
236   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
237   if (GetParam() == TFOState::ENABLED) {
238     socket->enableTFO();
239   }
240
241   socket->connect(nullptr, server.getAddress(), 30);
242
243   // write some data, just so we have some way of verifing
244   // that the socket works correctly after connecting
245   char buf[128];
246   memset(buf, 'a', sizeof(buf));
247   WriteCallback wcb;
248   socket->write(&wcb, buf, sizeof(buf));
249
250   evb.loop();
251
252   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
253
254   // Make sure the server got a connection and received the data
255   socket->close();
256   server.verifyConnection(buf, sizeof(buf));
257
258   ASSERT_TRUE(socket->isClosedBySelf());
259   ASSERT_FALSE(socket->isClosedByPeer());
260 }
261
262 /**
263  * Test calling both write() and close() immediately after connecting, without
264  * waiting for connect to finish.
265  *
266  * This exercises the STATE_CONNECTING_CLOSING code.
267  */
268 TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
269   TestServer server;
270
271   // connect()
272   EventBase evb;
273   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
274   if (GetParam() == TFOState::ENABLED) {
275     socket->enableTFO();
276   }
277   ConnCallback ccb;
278   socket->connect(&ccb, server.getAddress(), 30);
279
280   // write()
281   char buf[128];
282   memset(buf, 'a', sizeof(buf));
283   WriteCallback wcb;
284   socket->write(&wcb, buf, sizeof(buf));
285
286   // close()
287   socket->close();
288
289   // Loop.  We don't bother accepting on the server socket yet.
290   // The kernel should be able to buffer the write request so it can succeed.
291   evb.loop();
292
293   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
294   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
295
296   // Make sure the server got a connection and received the data
297   server.verifyConnection(buf, sizeof(buf));
298
299   ASSERT_TRUE(socket->isClosedBySelf());
300   ASSERT_FALSE(socket->isClosedByPeer());
301 }
302
303 /**
304  * Test calling close() immediately after connect()
305  */
306 TEST(AsyncSocketTest, ConnectAndClose) {
307   TestServer server;
308
309   // Connect using a AsyncSocket
310   EventBase evb;
311   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
312   ConnCallback ccb;
313   socket->connect(&ccb, server.getAddress(), 30);
314
315   // Hopefully the connect didn't succeed immediately.
316   // If it did, we can't exercise the close-while-connecting code path.
317   if (ccb.state == STATE_SUCCEEDED) {
318     LOG(INFO) << "connect() succeeded immediately; aborting test "
319                        "of close-during-connect behavior";
320     return;
321   }
322
323   socket->close();
324
325   // Loop, although there shouldn't be anything to do.
326   evb.loop();
327
328   // Make sure the connection was aborted
329   CHECK_EQ(ccb.state, STATE_FAILED);
330
331   ASSERT_TRUE(socket->isClosedBySelf());
332   ASSERT_FALSE(socket->isClosedByPeer());
333 }
334
335 /**
336  * Test calling closeNow() immediately after connect()
337  *
338  * This should be identical to the normal close behavior.
339  */
340 TEST(AsyncSocketTest, ConnectAndCloseNow) {
341   TestServer server;
342
343   // Connect using a AsyncSocket
344   EventBase evb;
345   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
346   ConnCallback ccb;
347   socket->connect(&ccb, server.getAddress(), 30);
348
349   // Hopefully the connect didn't succeed immediately.
350   // If it did, we can't exercise the close-while-connecting code path.
351   if (ccb.state == STATE_SUCCEEDED) {
352     LOG(INFO) << "connect() succeeded immediately; aborting test "
353                        "of closeNow()-during-connect behavior";
354     return;
355   }
356
357   socket->closeNow();
358
359   // Loop, although there shouldn't be anything to do.
360   evb.loop();
361
362   // Make sure the connection was aborted
363   CHECK_EQ(ccb.state, STATE_FAILED);
364
365   ASSERT_TRUE(socket->isClosedBySelf());
366   ASSERT_FALSE(socket->isClosedByPeer());
367 }
368
369 /**
370  * Test calling both write() and closeNow() immediately after connecting,
371  * without waiting for connect to finish.
372  *
373  * This should abort the pending write.
374  */
375 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
376   TestServer server;
377
378   // connect()
379   EventBase evb;
380   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
381   ConnCallback ccb;
382   socket->connect(&ccb, server.getAddress(), 30);
383
384   // Hopefully the connect didn't succeed immediately.
385   // If it did, we can't exercise the close-while-connecting code path.
386   if (ccb.state == STATE_SUCCEEDED) {
387     LOG(INFO) << "connect() succeeded immediately; aborting test "
388                        "of write-during-connect behavior";
389     return;
390   }
391
392   // write()
393   char buf[128];
394   memset(buf, 'a', sizeof(buf));
395   WriteCallback wcb;
396   socket->write(&wcb, buf, sizeof(buf));
397
398   // close()
399   socket->closeNow();
400
401   // Loop, although there shouldn't be anything to do.
402   evb.loop();
403
404   CHECK_EQ(ccb.state, STATE_FAILED);
405   CHECK_EQ(wcb.state, STATE_FAILED);
406
407   ASSERT_TRUE(socket->isClosedBySelf());
408   ASSERT_FALSE(socket->isClosedByPeer());
409 }
410
411 /**
412  * Test installing a read callback immediately, before connect() finishes.
413  */
414 TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
415   TestServer server;
416
417   // connect()
418   EventBase evb;
419   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
420   if (GetParam() == TFOState::ENABLED) {
421     socket->enableTFO();
422   }
423
424   ConnCallback ccb;
425   socket->connect(&ccb, server.getAddress(), 30);
426
427   ReadCallback rcb;
428   socket->setReadCB(&rcb);
429
430   if (GetParam() == TFOState::ENABLED) {
431     // Trigger a connection
432     socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
433   }
434
435   // Even though we haven't looped yet, we should be able to accept
436   // the connection and send data to it.
437   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
438   uint8_t buf[128];
439   memset(buf, 'a', sizeof(buf));
440   acceptedSocket->write(buf, sizeof(buf));
441   acceptedSocket->flush();
442   acceptedSocket->close();
443
444   // Loop, although there shouldn't be anything to do.
445   evb.loop();
446
447   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
448   CHECK_EQ(rcb.buffers.size(), 1);
449   CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
450   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
451
452   ASSERT_FALSE(socket->isClosedBySelf());
453   ASSERT_FALSE(socket->isClosedByPeer());
454 }
455
456 /**
457  * Test installing a read callback and then closing immediately before the
458  * connect attempt finishes.
459  */
460 TEST(AsyncSocketTest, ConnectReadAndClose) {
461   TestServer server;
462
463   // connect()
464   EventBase evb;
465   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
466   ConnCallback ccb;
467   socket->connect(&ccb, server.getAddress(), 30);
468
469   // Hopefully the connect didn't succeed immediately.
470   // If it did, we can't exercise the close-while-connecting code path.
471   if (ccb.state == STATE_SUCCEEDED) {
472     LOG(INFO) << "connect() succeeded immediately; aborting test "
473                        "of read-during-connect behavior";
474     return;
475   }
476
477   ReadCallback rcb;
478   socket->setReadCB(&rcb);
479
480   // close()
481   socket->close();
482
483   // Loop, although there shouldn't be anything to do.
484   evb.loop();
485
486   CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
487   CHECK_EQ(rcb.buffers.size(), 0);
488   CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
489
490   ASSERT_TRUE(socket->isClosedBySelf());
491   ASSERT_FALSE(socket->isClosedByPeer());
492 }
493
494 /**
495  * Test both writing and installing a read callback immediately,
496  * before connect() finishes.
497  */
498 TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
499   TestServer server;
500
501   // connect()
502   EventBase evb;
503   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
504   if (GetParam() == TFOState::ENABLED) {
505     socket->enableTFO();
506   }
507   ConnCallback ccb;
508   socket->connect(&ccb, server.getAddress(), 30);
509
510   // write()
511   char buf1[128];
512   memset(buf1, 'a', sizeof(buf1));
513   WriteCallback wcb;
514   socket->write(&wcb, buf1, sizeof(buf1));
515
516   // set a read callback
517   ReadCallback rcb;
518   socket->setReadCB(&rcb);
519
520   // Even though we haven't looped yet, we should be able to accept
521   // the connection and send data to it.
522   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
523   uint8_t buf2[128];
524   memset(buf2, 'b', sizeof(buf2));
525   acceptedSocket->write(buf2, sizeof(buf2));
526   acceptedSocket->flush();
527
528   // shut down the write half of acceptedSocket, so that the AsyncSocket
529   // will stop reading and we can break out of the event loop.
530   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
531
532   // Loop
533   evb.loop();
534
535   // Make sure the connect succeeded
536   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
537
538   // Make sure the AsyncSocket read the data written by the accepted socket
539   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
540   CHECK_EQ(rcb.buffers.size(), 1);
541   CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
542   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
543
544   // Close the AsyncSocket so we'll see EOF on acceptedSocket
545   socket->close();
546
547   // Make sure the accepted socket saw the data written by the AsyncSocket
548   uint8_t readbuf[sizeof(buf1)];
549   acceptedSocket->readAll(readbuf, sizeof(readbuf));
550   CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
551   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
552   CHECK_EQ(bytesRead, 0);
553
554   ASSERT_FALSE(socket->isClosedBySelf());
555   ASSERT_TRUE(socket->isClosedByPeer());
556 }
557
558 /**
559  * Test writing to the socket then shutting down writes before the connect
560  * attempt finishes.
561  */
562 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
563   TestServer server;
564
565   // connect()
566   EventBase evb;
567   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
568   ConnCallback ccb;
569   socket->connect(&ccb, server.getAddress(), 30);
570
571   // Hopefully the connect didn't succeed immediately.
572   // If it did, we can't exercise the write-while-connecting code path.
573   if (ccb.state == STATE_SUCCEEDED) {
574     LOG(INFO) << "connect() succeeded immediately; skipping test";
575     return;
576   }
577
578   // Ask to write some data
579   char wbuf[128];
580   memset(wbuf, 'a', sizeof(wbuf));
581   WriteCallback wcb;
582   socket->write(&wcb, wbuf, sizeof(wbuf));
583   socket->shutdownWrite();
584
585   // Shutdown writes
586   socket->shutdownWrite();
587
588   // Even though we haven't looped yet, we should be able to accept
589   // the connection.
590   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
591
592   // Since the connection is still in progress, there should be no data to
593   // read yet.  Verify that the accepted socket is not readable.
594   struct pollfd fds[1];
595   fds[0].fd = acceptedSocket->getSocketFD();
596   fds[0].events = POLLIN;
597   fds[0].revents = 0;
598   int rc = poll(fds, 1, 0);
599   CHECK_EQ(rc, 0);
600
601   // Write data to the accepted socket
602   uint8_t acceptedWbuf[192];
603   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
604   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
605   acceptedSocket->flush();
606
607   // Loop
608   evb.loop();
609
610   // The loop should have completed the connection, written the queued data,
611   // and shutdown writes on the socket.
612   //
613   // Check that the connection was completed successfully and that the write
614   // callback succeeded.
615   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
616   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
617
618   // Check that we can read the data that was written to the socket, and that
619   // we see an EOF, since its socket was half-shutdown.
620   uint8_t readbuf[sizeof(wbuf)];
621   acceptedSocket->readAll(readbuf, sizeof(readbuf));
622   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
623   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
624   CHECK_EQ(bytesRead, 0);
625
626   // Close the accepted socket.  This will cause it to see EOF
627   // and uninstall the read callback when we loop next.
628   acceptedSocket->close();
629
630   // Install a read callback, then loop again.
631   ReadCallback rcb;
632   socket->setReadCB(&rcb);
633   evb.loop();
634
635   // This loop should have read the data and seen the EOF
636   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
637   CHECK_EQ(rcb.buffers.size(), 1);
638   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
639   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
640                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
641
642   ASSERT_FALSE(socket->isClosedBySelf());
643   ASSERT_FALSE(socket->isClosedByPeer());
644 }
645
646 /**
647  * Test reading, writing, and shutting down writes before the connect attempt
648  * finishes.
649  */
650 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
651   TestServer server;
652
653   // connect()
654   EventBase evb;
655   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
656   ConnCallback ccb;
657   socket->connect(&ccb, server.getAddress(), 30);
658
659   // Hopefully the connect didn't succeed immediately.
660   // If it did, we can't exercise the write-while-connecting code path.
661   if (ccb.state == STATE_SUCCEEDED) {
662     LOG(INFO) << "connect() succeeded immediately; skipping test";
663     return;
664   }
665
666   // Install a read callback
667   ReadCallback rcb;
668   socket->setReadCB(&rcb);
669
670   // Ask to write some data
671   char wbuf[128];
672   memset(wbuf, 'a', sizeof(wbuf));
673   WriteCallback wcb;
674   socket->write(&wcb, wbuf, sizeof(wbuf));
675
676   // Shutdown writes
677   socket->shutdownWrite();
678
679   // Even though we haven't looped yet, we should be able to accept
680   // the connection.
681   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
682
683   // Since the connection is still in progress, there should be no data to
684   // read yet.  Verify that the accepted socket is not readable.
685   struct pollfd fds[1];
686   fds[0].fd = acceptedSocket->getSocketFD();
687   fds[0].events = POLLIN;
688   fds[0].revents = 0;
689   int rc = poll(fds, 1, 0);
690   CHECK_EQ(rc, 0);
691
692   // Write data to the accepted socket
693   uint8_t acceptedWbuf[192];
694   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
695   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
696   acceptedSocket->flush();
697   // Shutdown writes to the accepted socket.  This will cause it to see EOF
698   // and uninstall the read callback.
699   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
700
701   // Loop
702   evb.loop();
703
704   // The loop should have completed the connection, written the queued data,
705   // shutdown writes on the socket, read the data we wrote to it, and see the
706   // EOF.
707   //
708   // Check that the connection was completed successfully and that the read
709   // and write callbacks were invoked as expected.
710   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
711   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
712   CHECK_EQ(rcb.buffers.size(), 1);
713   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
714   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
715                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
716   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
717
718   // Check that we can read the data that was written to the socket, and that
719   // we see an EOF, since its socket was half-shutdown.
720   uint8_t readbuf[sizeof(wbuf)];
721   acceptedSocket->readAll(readbuf, sizeof(readbuf));
722   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
723   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
724   CHECK_EQ(bytesRead, 0);
725
726   // Fully close both sockets
727   acceptedSocket->close();
728   socket->close();
729
730   ASSERT_FALSE(socket->isClosedBySelf());
731   ASSERT_TRUE(socket->isClosedByPeer());
732 }
733
734 /**
735  * Test reading, writing, and calling shutdownWriteNow() before the
736  * connect attempt finishes.
737  */
738 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
739   TestServer server;
740
741   // connect()
742   EventBase evb;
743   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
744   ConnCallback ccb;
745   socket->connect(&ccb, server.getAddress(), 30);
746
747   // Hopefully the connect didn't succeed immediately.
748   // If it did, we can't exercise the write-while-connecting code path.
749   if (ccb.state == STATE_SUCCEEDED) {
750     LOG(INFO) << "connect() succeeded immediately; skipping test";
751     return;
752   }
753
754   // Install a read callback
755   ReadCallback rcb;
756   socket->setReadCB(&rcb);
757
758   // Ask to write some data
759   char wbuf[128];
760   memset(wbuf, 'a', sizeof(wbuf));
761   WriteCallback wcb;
762   socket->write(&wcb, wbuf, sizeof(wbuf));
763
764   // Shutdown writes immediately.
765   // This should immediately discard the data that we just tried to write.
766   socket->shutdownWriteNow();
767
768   // Verify that writeError() was invoked on the write callback.
769   CHECK_EQ(wcb.state, STATE_FAILED);
770   CHECK_EQ(wcb.bytesWritten, 0);
771
772   // Even though we haven't looped yet, we should be able to accept
773   // the connection.
774   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
775
776   // Since the connection is still in progress, there should be no data to
777   // read yet.  Verify that the accepted socket is not readable.
778   struct pollfd fds[1];
779   fds[0].fd = acceptedSocket->getSocketFD();
780   fds[0].events = POLLIN;
781   fds[0].revents = 0;
782   int rc = poll(fds, 1, 0);
783   CHECK_EQ(rc, 0);
784
785   // Write data to the accepted socket
786   uint8_t acceptedWbuf[192];
787   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
788   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
789   acceptedSocket->flush();
790   // Shutdown writes to the accepted socket.  This will cause it to see EOF
791   // and uninstall the read callback.
792   ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
793
794   // Loop
795   evb.loop();
796
797   // The loop should have completed the connection, written the queued data,
798   // shutdown writes on the socket, read the data we wrote to it, and see the
799   // EOF.
800   //
801   // Check that the connection was completed successfully and that the read
802   // callback was invoked as expected.
803   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
804   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
805   CHECK_EQ(rcb.buffers.size(), 1);
806   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
807   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
808                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
809
810   // Since we used shutdownWriteNow(), it should have discarded all pending
811   // write data.  Verify we see an immediate EOF when reading from the accepted
812   // socket.
813   uint8_t readbuf[sizeof(wbuf)];
814   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
815   CHECK_EQ(bytesRead, 0);
816
817   // Fully close both sockets
818   acceptedSocket->close();
819   socket->close();
820
821   ASSERT_FALSE(socket->isClosedBySelf());
822   ASSERT_TRUE(socket->isClosedByPeer());
823 }
824
825 // Helper function for use in testConnectOptWrite()
826 // Temporarily disable the read callback
827 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
828   // Uninstall the read callback
829   socket->setReadCB(nullptr);
830   // Schedule the read callback to be reinstalled after 1ms
831   socket->getEventBase()->runInLoop(
832       std::bind(&AsyncSocket::setReadCB, socket, rcb));
833 }
834
835 /**
836  * Test connect+write, then have the connect callback perform another write.
837  *
838  * This tests interaction of the optimistic writing after connect with
839  * additional write attempts that occur in the connect callback.
840  */
841 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
842   TestServer server;
843   EventBase evb;
844   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
845
846   // connect()
847   ConnCallback ccb;
848   socket->connect(&ccb, server.getAddress(), 30);
849
850   // Hopefully the connect didn't succeed immediately.
851   // If it did, we can't exercise the optimistic write code path.
852   if (ccb.state == STATE_SUCCEEDED) {
853     LOG(INFO) << "connect() succeeded immediately; aborting test "
854                        "of optimistic write behavior";
855     return;
856   }
857
858   // Tell the connect callback to perform a write when the connect succeeds
859   WriteCallback wcb2;
860   scoped_array<char> buf2(new char[size2]);
861   memset(buf2.get(), 'b', size2);
862   if (size2 > 0) {
863     ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
864     // Tell the second write callback to close the connection when it is done
865     wcb2.successCallback = [&] { socket->closeNow(); };
866   }
867
868   // Schedule one write() immediately, before the connect finishes
869   scoped_array<char> buf1(new char[size1]);
870   memset(buf1.get(), 'a', size1);
871   WriteCallback wcb1;
872   if (size1 > 0) {
873     socket->write(&wcb1, buf1.get(), size1);
874   }
875
876   if (close) {
877     // immediately perform a close, before connect() completes
878     socket->close();
879   }
880
881   // Start reading from the other endpoint after 10ms.
882   // If we're using large buffers, we have to read so that the writes don't
883   // block forever.
884   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
885   ReadCallback rcb;
886   rcb.dataAvailableCallback = std::bind(tmpDisableReads,
887                                         acceptedSocket.get(), &rcb);
888   socket->getEventBase()->tryRunAfterDelay(
889       std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
890       10);
891
892   // Loop.  We don't bother accepting on the server socket yet.
893   // The kernel should be able to buffer the write request so it can succeed.
894   evb.loop();
895
896   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
897   if (size1 > 0) {
898     CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
899   }
900   if (size2 > 0) {
901     CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
902   }
903
904   socket->close();
905
906   // Make sure the read callback received all of the data
907   size_t bytesRead = 0;
908   for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
909        it != rcb.buffers.end();
910        ++it) {
911     size_t start = bytesRead;
912     bytesRead += it->length;
913     size_t end = bytesRead;
914     if (start < size1) {
915       size_t cmpLen = min(size1, end) - start;
916       CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
917     }
918     if (end > size1 && end <= size1 + size2) {
919       size_t itOffset;
920       size_t buf2Offset;
921       size_t cmpLen;
922       if (start >= size1) {
923         itOffset = 0;
924         buf2Offset = start - size1;
925         cmpLen = end - start;
926       } else {
927         itOffset = size1 - start;
928         buf2Offset = 0;
929         cmpLen = end - size1;
930       }
931       CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
932                                cmpLen),
933                         0);
934     }
935   }
936   CHECK_EQ(bytesRead, size1 + size2);
937 }
938
939 TEST(AsyncSocketTest, ConnectCallbackWrite) {
940   // Test using small writes that should both succeed immediately
941   testConnectOptWrite(100, 200);
942
943   // Test using a large buffer in the connect callback, that should block
944   const size_t largeSize = 8*1024*1024;
945   testConnectOptWrite(100, largeSize);
946
947   // Test using a large initial write
948   testConnectOptWrite(largeSize, 100);
949
950   // Test using two large buffers
951   testConnectOptWrite(largeSize, largeSize);
952
953   // Test a small write in the connect callback,
954   // but no immediate write before connect completes
955   testConnectOptWrite(0, 64);
956
957   // Test a large write in the connect callback,
958   // but no immediate write before connect completes
959   testConnectOptWrite(0, largeSize);
960
961   // Test connect, a small write, then immediately call close() before connect
962   // completes
963   testConnectOptWrite(211, 0, true);
964
965   // Test connect, a large immediate write (that will block), then immediately
966   // call close() before connect completes
967   testConnectOptWrite(largeSize, 0, true);
968 }
969
970 ///////////////////////////////////////////////////////////////////////////
971 // write() related tests
972 ///////////////////////////////////////////////////////////////////////////
973
974 /**
975  * Test writing using a nullptr callback
976  */
977 TEST(AsyncSocketTest, WriteNullCallback) {
978   TestServer server;
979
980   // connect()
981   EventBase evb;
982   std::shared_ptr<AsyncSocket> socket =
983     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
984   evb.loop(); // loop until the socket is connected
985
986   // write() with a nullptr callback
987   char buf[128];
988   memset(buf, 'a', sizeof(buf));
989   socket->write(nullptr, buf, sizeof(buf));
990
991   evb.loop(); // loop until the data is sent
992
993   // Make sure the server got a connection and received the data
994   socket->close();
995   server.verifyConnection(buf, sizeof(buf));
996
997   ASSERT_TRUE(socket->isClosedBySelf());
998   ASSERT_FALSE(socket->isClosedByPeer());
999 }
1000
1001 /**
1002  * Test writing with a send timeout
1003  */
1004 TEST(AsyncSocketTest, WriteTimeout) {
1005   TestServer server;
1006
1007   // connect()
1008   EventBase evb;
1009   std::shared_ptr<AsyncSocket> socket =
1010     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1011   evb.loop(); // loop until the socket is connected
1012
1013   // write() a large chunk of data, with no-one on the other end reading
1014   size_t writeLength = 8*1024*1024;
1015   uint32_t timeout = 200;
1016   socket->setSendTimeout(timeout);
1017   scoped_array<char> buf(new char[writeLength]);
1018   memset(buf.get(), 'a', writeLength);
1019   WriteCallback wcb;
1020   socket->write(&wcb, buf.get(), writeLength);
1021
1022   TimePoint start;
1023   evb.loop();
1024   TimePoint end;
1025
1026   // Make sure the write attempt timed out as requested
1027   CHECK_EQ(wcb.state, STATE_FAILED);
1028   CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
1029
1030   // Check that the write timed out within a reasonable period of time.
1031   // We don't check for exactly the specified timeout, since AsyncSocket only
1032   // times out when it hasn't made progress for that period of time.
1033   //
1034   // On linux, the first write sends a few hundred kb of data, then blocks for
1035   // writability, and then unblocks again after 40ms and is able to write
1036   // another smaller of data before blocking permanently.  Therefore it doesn't
1037   // time out until 40ms + timeout.
1038   //
1039   // I haven't fully verified the cause of this, but I believe it probably
1040   // occurs because the receiving end delays sending an ack for up to 40ms.
1041   // (This is the default value for TCP_DELACK_MIN.)  Once the sender receives
1042   // the ack, it can send some more data.  However, after that point the
1043   // receiver's kernel buffer is full.  This 40ms delay happens even with
1044   // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints.  However, the
1045   // kernel may be automatically disabling TCP_QUICKACK after receiving some
1046   // data.
1047   //
1048   // For now, we simply check that the timeout occurred within 160ms of
1049   // the requested value.
1050   T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1051 }
1052
1053 /**
1054  * Test writing to a socket that the remote endpoint has closed
1055  */
1056 TEST(AsyncSocketTest, WritePipeError) {
1057   TestServer server;
1058
1059   // connect()
1060   EventBase evb;
1061   std::shared_ptr<AsyncSocket> socket =
1062     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1063   socket->setSendTimeout(1000);
1064   evb.loop(); // loop until the socket is connected
1065
1066   // accept and immediately close the socket
1067   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1068   acceptedSocket.reset();
1069
1070   // write() a large chunk of data
1071   size_t writeLength = 8*1024*1024;
1072   scoped_array<char> buf(new char[writeLength]);
1073   memset(buf.get(), 'a', writeLength);
1074   WriteCallback wcb;
1075   socket->write(&wcb, buf.get(), writeLength);
1076
1077   evb.loop();
1078
1079   // Make sure the write failed.
1080   // It would be nice if AsyncSocketException could convey the errno value,
1081   // so that we could check for EPIPE
1082   CHECK_EQ(wcb.state, STATE_FAILED);
1083   CHECK_EQ(wcb.exception.getType(),
1084                     AsyncSocketException::INTERNAL_ERROR);
1085
1086   ASSERT_FALSE(socket->isClosedBySelf());
1087   ASSERT_FALSE(socket->isClosedByPeer());
1088 }
1089
1090 /**
1091  * Test writing a mix of simple buffers and IOBufs
1092  */
1093 TEST(AsyncSocketTest, WriteIOBuf) {
1094   TestServer server;
1095
1096   // connect()
1097   EventBase evb;
1098   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1099   ConnCallback ccb;
1100   socket->connect(&ccb, server.getAddress(), 30);
1101
1102   // Accept the connection
1103   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1104   ReadCallback rcb;
1105   acceptedSocket->setReadCB(&rcb);
1106
1107   // Write a simple buffer to the socket
1108   constexpr size_t simpleBufLength = 5;
1109   char simpleBuf[simpleBufLength];
1110   memset(simpleBuf, 'a', simpleBufLength);
1111   WriteCallback wcb;
1112   socket->write(&wcb, simpleBuf, simpleBufLength);
1113
1114   // Write a single-element IOBuf chain
1115   size_t buf1Length = 7;
1116   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1117   memset(buf1->writableData(), 'b', buf1Length);
1118   buf1->append(buf1Length);
1119   unique_ptr<IOBuf> buf1Copy(buf1->clone());
1120   WriteCallback wcb2;
1121   socket->writeChain(&wcb2, std::move(buf1));
1122
1123   // Write a multiple-element IOBuf chain
1124   size_t buf2Length = 11;
1125   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1126   memset(buf2->writableData(), 'c', buf2Length);
1127   buf2->append(buf2Length);
1128   size_t buf3Length = 13;
1129   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1130   memset(buf3->writableData(), 'd', buf3Length);
1131   buf3->append(buf3Length);
1132   buf2->appendChain(std::move(buf3));
1133   unique_ptr<IOBuf> buf2Copy(buf2->clone());
1134   buf2Copy->coalesce();
1135   WriteCallback wcb3;
1136   socket->writeChain(&wcb3, std::move(buf2));
1137   socket->shutdownWrite();
1138
1139   // Let the reads and writes run to completion
1140   evb.loop();
1141
1142   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1143   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1144   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1145
1146   // Make sure the reader got the right data in the right order
1147   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1148   CHECK_EQ(rcb.buffers.size(), 1);
1149   CHECK_EQ(rcb.buffers[0].length,
1150       simpleBufLength + buf1Length + buf2Length + buf3Length);
1151   CHECK_EQ(
1152       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1153   CHECK_EQ(
1154       memcmp(rcb.buffers[0].buffer + simpleBufLength,
1155           buf1Copy->data(), buf1Copy->length()), 0);
1156   CHECK_EQ(
1157       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1158           buf2Copy->data(), buf2Copy->length()), 0);
1159
1160   acceptedSocket->close();
1161   socket->close();
1162
1163   ASSERT_TRUE(socket->isClosedBySelf());
1164   ASSERT_FALSE(socket->isClosedByPeer());
1165 }
1166
1167 TEST(AsyncSocketTest, WriteIOBufCorked) {
1168   TestServer server;
1169
1170   // connect()
1171   EventBase evb;
1172   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1173   ConnCallback ccb;
1174   socket->connect(&ccb, server.getAddress(), 30);
1175
1176   // Accept the connection
1177   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1178   ReadCallback rcb;
1179   acceptedSocket->setReadCB(&rcb);
1180
1181   // Do three writes, 100ms apart, with the "cork" flag set
1182   // on the second write.  The reader should see the first write
1183   // arrive by itself, followed by the second and third writes
1184   // arriving together.
1185   size_t buf1Length = 5;
1186   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1187   memset(buf1->writableData(), 'a', buf1Length);
1188   buf1->append(buf1Length);
1189   size_t buf2Length = 7;
1190   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1191   memset(buf2->writableData(), 'b', buf2Length);
1192   buf2->append(buf2Length);
1193   size_t buf3Length = 11;
1194   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1195   memset(buf3->writableData(), 'c', buf3Length);
1196   buf3->append(buf3Length);
1197   WriteCallback wcb1;
1198   socket->writeChain(&wcb1, std::move(buf1));
1199   WriteCallback wcb2;
1200   DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1201   write2.scheduleTimeout(100);
1202   WriteCallback wcb3;
1203   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1204   write3.scheduleTimeout(140);
1205
1206   evb.loop();
1207   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1208   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1209   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1210   if (wcb3.state != STATE_SUCCEEDED) {
1211     throw(wcb3.exception);
1212   }
1213   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1214
1215   // Make sure the reader got the data with the right grouping
1216   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1217   CHECK_EQ(rcb.buffers.size(), 2);
1218   CHECK_EQ(rcb.buffers[0].length, buf1Length);
1219   CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1220
1221   acceptedSocket->close();
1222   socket->close();
1223
1224   ASSERT_TRUE(socket->isClosedBySelf());
1225   ASSERT_FALSE(socket->isClosedByPeer());
1226 }
1227
1228 /**
1229  * Test performing a zero-length write
1230  */
1231 TEST(AsyncSocketTest, ZeroLengthWrite) {
1232   TestServer server;
1233
1234   // connect()
1235   EventBase evb;
1236   std::shared_ptr<AsyncSocket> socket =
1237     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1238   evb.loop(); // loop until the socket is connected
1239
1240   auto acceptedSocket = server.acceptAsync(&evb);
1241   ReadCallback rcb;
1242   acceptedSocket->setReadCB(&rcb);
1243
1244   size_t len1 = 1024*1024;
1245   size_t len2 = 1024*1024;
1246   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1247   memset(buf.get(), 'a', len1);
1248   memset(buf.get(), 'b', len2);
1249
1250   WriteCallback wcb1;
1251   WriteCallback wcb2;
1252   WriteCallback wcb3;
1253   WriteCallback wcb4;
1254   socket->write(&wcb1, buf.get(), 0);
1255   socket->write(&wcb2, buf.get(), len1);
1256   socket->write(&wcb3, buf.get() + len1, 0);
1257   socket->write(&wcb4, buf.get() + len1, len2);
1258   socket->close();
1259
1260   evb.loop(); // loop until the data is sent
1261
1262   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1263   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1264   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1265   CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1266   rcb.verifyData(buf.get(), len1 + len2);
1267
1268   ASSERT_TRUE(socket->isClosedBySelf());
1269   ASSERT_FALSE(socket->isClosedByPeer());
1270 }
1271
1272 TEST(AsyncSocketTest, ZeroLengthWritev) {
1273   TestServer server;
1274
1275   // connect()
1276   EventBase evb;
1277   std::shared_ptr<AsyncSocket> socket =
1278     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1279   evb.loop(); // loop until the socket is connected
1280
1281   auto acceptedSocket = server.acceptAsync(&evb);
1282   ReadCallback rcb;
1283   acceptedSocket->setReadCB(&rcb);
1284
1285   size_t len1 = 1024*1024;
1286   size_t len2 = 1024*1024;
1287   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1288   memset(buf.get(), 'a', len1);
1289   memset(buf.get(), 'b', len2);
1290
1291   WriteCallback wcb;
1292   constexpr size_t iovCount = 4;
1293   struct iovec iov[iovCount];
1294   iov[0].iov_base = buf.get();
1295   iov[0].iov_len = len1;
1296   iov[1].iov_base = buf.get() + len1;
1297   iov[1].iov_len = 0;
1298   iov[2].iov_base = buf.get() + len1;
1299   iov[2].iov_len = len2;
1300   iov[3].iov_base = buf.get() + len1 + len2;
1301   iov[3].iov_len = 0;
1302
1303   socket->writev(&wcb, iov, iovCount);
1304   socket->close();
1305   evb.loop(); // loop until the data is sent
1306
1307   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1308   rcb.verifyData(buf.get(), len1 + len2);
1309
1310   ASSERT_TRUE(socket->isClosedBySelf());
1311   ASSERT_FALSE(socket->isClosedByPeer());
1312 }
1313
1314 ///////////////////////////////////////////////////////////////////////////
1315 // close() related tests
1316 ///////////////////////////////////////////////////////////////////////////
1317
1318 /**
1319  * Test calling close() with pending writes when the socket is already closing.
1320  */
1321 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1322   TestServer server;
1323
1324   // connect()
1325   EventBase evb;
1326   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1327   ConnCallback ccb;
1328   socket->connect(&ccb, server.getAddress(), 30);
1329
1330   // accept the socket on the server side
1331   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1332
1333   // Loop to ensure the connect has completed
1334   evb.loop();
1335
1336   // Make sure we are connected
1337   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1338
1339   // Schedule pending writes, until several write attempts have blocked
1340   char buf[128];
1341   memset(buf, 'a', sizeof(buf));
1342   typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1343   WriteCallbackVector writeCallbacks;
1344
1345   writeCallbacks.reserve(5);
1346   while (writeCallbacks.size() < 5) {
1347     std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1348
1349     socket->write(wcb.get(), buf, sizeof(buf));
1350     if (wcb->state == STATE_SUCCEEDED) {
1351       // Succeeded immediately.  Keep performing more writes
1352       continue;
1353     }
1354
1355     // This write is blocked.
1356     // Have the write callback call close() when writeError() is invoked
1357     wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1358     writeCallbacks.push_back(wcb);
1359   }
1360
1361   // Call closeNow() to immediately fail the pending writes
1362   socket->closeNow();
1363
1364   // Make sure writeError() was invoked on all of the pending write callbacks
1365   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1366        it != writeCallbacks.end();
1367        ++it) {
1368     CHECK_EQ((*it)->state, STATE_FAILED);
1369   }
1370
1371   ASSERT_TRUE(socket->isClosedBySelf());
1372   ASSERT_FALSE(socket->isClosedByPeer());
1373 }
1374
1375 ///////////////////////////////////////////////////////////////////////////
1376 // ImmediateRead related tests
1377 ///////////////////////////////////////////////////////////////////////////
1378
1379 /* AsyncSocket use to verify immediate read works */
1380 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1381  public:
1382   bool immediateReadCalled = false;
1383   explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1384  protected:
1385   void checkForImmediateRead() noexcept override {
1386     immediateReadCalled = true;
1387     AsyncSocket::handleRead();
1388   }
1389 };
1390
1391 TEST(AsyncSocket, ConnectReadImmediateRead) {
1392   TestServer server;
1393
1394   const size_t maxBufferSz = 100;
1395   const size_t maxReadsPerEvent = 1;
1396   const size_t expectedDataSz = maxBufferSz * 3;
1397   char expectedData[expectedDataSz];
1398   memset(expectedData, 'j', expectedDataSz);
1399
1400   EventBase evb;
1401   ReadCallback rcb(maxBufferSz);
1402   AsyncSocketImmediateRead socket(&evb);
1403   socket.connect(nullptr, server.getAddress(), 30);
1404
1405   evb.loop(); // loop until the socket is connected
1406
1407   socket.setReadCB(&rcb);
1408   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1409   socket.immediateReadCalled = false;
1410
1411   auto acceptedSocket = server.acceptAsync(&evb);
1412
1413   ReadCallback rcbServer;
1414   WriteCallback wcbServer;
1415   rcbServer.dataAvailableCallback = [&]() {
1416     if (rcbServer.dataRead() == expectedDataSz) {
1417       // write back all data read
1418       rcbServer.verifyData(expectedData, expectedDataSz);
1419       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1420       acceptedSocket->close();
1421     }
1422   };
1423   acceptedSocket->setReadCB(&rcbServer);
1424
1425   // write data
1426   WriteCallback wcb1;
1427   socket.write(&wcb1, expectedData, expectedDataSz);
1428   evb.loop();
1429   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1430   rcb.verifyData(expectedData, expectedDataSz);
1431   CHECK_EQ(socket.immediateReadCalled, true);
1432
1433   ASSERT_FALSE(socket.isClosedBySelf());
1434   ASSERT_FALSE(socket.isClosedByPeer());
1435 }
1436
1437 TEST(AsyncSocket, ConnectReadUninstallRead) {
1438   TestServer server;
1439
1440   const size_t maxBufferSz = 100;
1441   const size_t maxReadsPerEvent = 1;
1442   const size_t expectedDataSz = maxBufferSz * 3;
1443   char expectedData[expectedDataSz];
1444   memset(expectedData, 'k', expectedDataSz);
1445
1446   EventBase evb;
1447   ReadCallback rcb(maxBufferSz);
1448   AsyncSocketImmediateRead socket(&evb);
1449   socket.connect(nullptr, server.getAddress(), 30);
1450
1451   evb.loop(); // loop until the socket is connected
1452
1453   socket.setReadCB(&rcb);
1454   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1455   socket.immediateReadCalled = false;
1456
1457   auto acceptedSocket = server.acceptAsync(&evb);
1458
1459   ReadCallback rcbServer;
1460   WriteCallback wcbServer;
1461   rcbServer.dataAvailableCallback = [&]() {
1462     if (rcbServer.dataRead() == expectedDataSz) {
1463       // write back all data read
1464       rcbServer.verifyData(expectedData, expectedDataSz);
1465       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1466       acceptedSocket->close();
1467     }
1468   };
1469   acceptedSocket->setReadCB(&rcbServer);
1470
1471   rcb.dataAvailableCallback = [&]() {
1472     // we read data and reset readCB
1473     socket.setReadCB(nullptr);
1474   };
1475
1476   // write data
1477   WriteCallback wcb;
1478   socket.write(&wcb, expectedData, expectedDataSz);
1479   evb.loop();
1480   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1481
1482   /* we shoud've only read maxBufferSz data since readCallback_
1483    * was reset in dataAvailableCallback */
1484   CHECK_EQ(rcb.dataRead(), maxBufferSz);
1485   CHECK_EQ(socket.immediateReadCalled, false);
1486
1487   ASSERT_FALSE(socket.isClosedBySelf());
1488   ASSERT_FALSE(socket.isClosedByPeer());
1489 }
1490
1491 // TODO:
1492 // - Test connect() and have the connect callback set the read callback
1493 // - Test connect() and have the connect callback unset the read callback
1494 // - Test reading/writing/closing/destroying the socket in the connect callback
1495 // - Test reading/writing/closing/destroying the socket in the read callback
1496 // - Test reading/writing/closing/destroying the socket in the write callback
1497 // - Test one-way shutdown behavior
1498 // - Test changing the EventBase
1499 //
1500 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1501 //   in connectSuccess(), readDataAvailable(), writeSuccess()
1502
1503
1504 ///////////////////////////////////////////////////////////////////////////
1505 // AsyncServerSocket tests
1506 ///////////////////////////////////////////////////////////////////////////
1507 namespace {
1508 /**
1509  * Helper ConnectionEventCallback class for the test code.
1510  * It maintains counters protected by a spin lock.
1511  */
1512 class TestConnectionEventCallback :
1513   public AsyncServerSocket::ConnectionEventCallback {
1514  public:
1515   virtual void onConnectionAccepted(
1516       const int /* socket */,
1517       const SocketAddress& /* addr */) noexcept override {
1518     folly::RWSpinLock::WriteHolder holder(spinLock_);
1519     connectionAccepted_++;
1520   }
1521
1522   virtual void onConnectionAcceptError(const int /* err */) noexcept override {
1523     folly::RWSpinLock::WriteHolder holder(spinLock_);
1524     connectionAcceptedError_++;
1525   }
1526
1527   virtual void onConnectionDropped(
1528       const int /* socket */,
1529       const SocketAddress& /* addr */) noexcept override {
1530     folly::RWSpinLock::WriteHolder holder(spinLock_);
1531     connectionDropped_++;
1532   }
1533
1534   virtual void onConnectionEnqueuedForAcceptorCallback(
1535       const int /* socket */,
1536       const SocketAddress& /* addr */) noexcept override {
1537     folly::RWSpinLock::WriteHolder holder(spinLock_);
1538     connectionEnqueuedForAcceptCallback_++;
1539   }
1540
1541   virtual void onConnectionDequeuedByAcceptorCallback(
1542       const int /* socket */,
1543       const SocketAddress& /* addr */) noexcept override {
1544     folly::RWSpinLock::WriteHolder holder(spinLock_);
1545     connectionDequeuedByAcceptCallback_++;
1546   }
1547
1548   virtual void onBackoffStarted() noexcept override {
1549     folly::RWSpinLock::WriteHolder holder(spinLock_);
1550     backoffStarted_++;
1551   }
1552
1553   virtual void onBackoffEnded() noexcept override {
1554     folly::RWSpinLock::WriteHolder holder(spinLock_);
1555     backoffEnded_++;
1556   }
1557
1558   virtual void onBackoffError() noexcept override {
1559     folly::RWSpinLock::WriteHolder holder(spinLock_);
1560     backoffError_++;
1561   }
1562
1563   unsigned int getConnectionAccepted() const {
1564     folly::RWSpinLock::ReadHolder holder(spinLock_);
1565     return connectionAccepted_;
1566   }
1567
1568   unsigned int getConnectionAcceptedError() const {
1569     folly::RWSpinLock::ReadHolder holder(spinLock_);
1570     return connectionAcceptedError_;
1571   }
1572
1573   unsigned int getConnectionDropped() const {
1574     folly::RWSpinLock::ReadHolder holder(spinLock_);
1575     return connectionDropped_;
1576   }
1577
1578   unsigned int getConnectionEnqueuedForAcceptCallback() const {
1579     folly::RWSpinLock::ReadHolder holder(spinLock_);
1580     return connectionEnqueuedForAcceptCallback_;
1581   }
1582
1583   unsigned int getConnectionDequeuedByAcceptCallback() const {
1584     folly::RWSpinLock::ReadHolder holder(spinLock_);
1585     return connectionDequeuedByAcceptCallback_;
1586   }
1587
1588   unsigned int getBackoffStarted() const {
1589     folly::RWSpinLock::ReadHolder holder(spinLock_);
1590     return backoffStarted_;
1591   }
1592
1593   unsigned int getBackoffEnded() const {
1594     folly::RWSpinLock::ReadHolder holder(spinLock_);
1595     return backoffEnded_;
1596   }
1597
1598   unsigned int getBackoffError() const {
1599     folly::RWSpinLock::ReadHolder holder(spinLock_);
1600     return backoffError_;
1601   }
1602
1603  private:
1604   mutable folly::RWSpinLock spinLock_;
1605   unsigned int connectionAccepted_{0};
1606   unsigned int connectionAcceptedError_{0};
1607   unsigned int connectionDropped_{0};
1608   unsigned int connectionEnqueuedForAcceptCallback_{0};
1609   unsigned int connectionDequeuedByAcceptCallback_{0};
1610   unsigned int backoffStarted_{0};
1611   unsigned int backoffEnded_{0};
1612   unsigned int backoffError_{0};
1613 };
1614
1615 /**
1616  * Helper AcceptCallback class for the test code
1617  * It records the callbacks that were invoked, and also supports calling
1618  * generic std::function objects in each callback.
1619  */
1620 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1621  public:
1622   enum EventType {
1623     TYPE_START,
1624     TYPE_ACCEPT,
1625     TYPE_ERROR,
1626     TYPE_STOP
1627   };
1628   struct EventInfo {
1629     EventInfo(int fd, const folly::SocketAddress& addr)
1630       : type(TYPE_ACCEPT),
1631         fd(fd),
1632         address(addr),
1633         errorMsg() {}
1634     explicit EventInfo(const std::string& msg)
1635       : type(TYPE_ERROR),
1636         fd(-1),
1637         address(),
1638         errorMsg(msg) {}
1639     explicit EventInfo(EventType et)
1640       : type(et),
1641         fd(-1),
1642         address(),
1643         errorMsg() {}
1644
1645     EventType type;
1646     int fd;  // valid for TYPE_ACCEPT
1647     folly::SocketAddress address;  // valid for TYPE_ACCEPT
1648     string errorMsg;  // valid for TYPE_ERROR
1649   };
1650   typedef std::deque<EventInfo> EventList;
1651
1652   TestAcceptCallback()
1653     : connectionAcceptedFn_(),
1654       acceptErrorFn_(),
1655       acceptStoppedFn_(),
1656       events_() {}
1657
1658   std::deque<EventInfo>* getEvents() {
1659     return &events_;
1660   }
1661
1662   void setConnectionAcceptedFn(
1663       const std::function<void(int, const folly::SocketAddress&)>& fn) {
1664     connectionAcceptedFn_ = fn;
1665   }
1666   void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1667     acceptErrorFn_ = fn;
1668   }
1669   void setAcceptStartedFn(const std::function<void()>& fn) {
1670     acceptStartedFn_ = fn;
1671   }
1672   void setAcceptStoppedFn(const std::function<void()>& fn) {
1673     acceptStoppedFn_ = fn;
1674   }
1675
1676   void connectionAccepted(
1677       int fd, const folly::SocketAddress& clientAddr) noexcept override {
1678     events_.emplace_back(fd, clientAddr);
1679
1680     if (connectionAcceptedFn_) {
1681       connectionAcceptedFn_(fd, clientAddr);
1682     }
1683   }
1684   void acceptError(const std::exception& ex) noexcept override {
1685     events_.emplace_back(ex.what());
1686
1687     if (acceptErrorFn_) {
1688       acceptErrorFn_(ex);
1689     }
1690   }
1691   void acceptStarted() noexcept override {
1692     events_.emplace_back(TYPE_START);
1693
1694     if (acceptStartedFn_) {
1695       acceptStartedFn_();
1696     }
1697   }
1698   void acceptStopped() noexcept override {
1699     events_.emplace_back(TYPE_STOP);
1700
1701     if (acceptStoppedFn_) {
1702       acceptStoppedFn_();
1703     }
1704   }
1705
1706  private:
1707   std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1708   std::function<void(const std::exception&)> acceptErrorFn_;
1709   std::function<void()> acceptStartedFn_;
1710   std::function<void()> acceptStoppedFn_;
1711
1712   std::deque<EventInfo> events_;
1713 };
1714 }
1715
1716 /**
1717  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1718  */
1719 TEST(AsyncSocketTest, ServerAcceptOptions) {
1720   EventBase eventBase;
1721
1722   // Create a server socket
1723   std::shared_ptr<AsyncServerSocket> serverSocket(
1724       AsyncServerSocket::newSocket(&eventBase));
1725   serverSocket->bind(0);
1726   serverSocket->listen(16);
1727   folly::SocketAddress serverAddress;
1728   serverSocket->getAddress(&serverAddress);
1729
1730   // Add a callback to accept one connection then stop the loop
1731   TestAcceptCallback acceptCallback;
1732   acceptCallback.setConnectionAcceptedFn(
1733       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1734         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1735       });
1736   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1737     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1738   });
1739   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1740   serverSocket->startAccepting();
1741
1742   // Connect to the server socket
1743   std::shared_ptr<AsyncSocket> socket(
1744       AsyncSocket::newSocket(&eventBase, serverAddress));
1745
1746   eventBase.loop();
1747
1748   // Verify that the server accepted a connection
1749   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1750   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1751                     TestAcceptCallback::TYPE_START);
1752   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1753                     TestAcceptCallback::TYPE_ACCEPT);
1754   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1755                     TestAcceptCallback::TYPE_STOP);
1756   int fd = acceptCallback.getEvents()->at(1).fd;
1757
1758   // The accepted connection should already be in non-blocking mode
1759   int flags = fcntl(fd, F_GETFL, 0);
1760   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1761
1762 #ifndef TCP_NOPUSH
1763   // The accepted connection should already have TCP_NODELAY set
1764   int value;
1765   socklen_t valueLength = sizeof(value);
1766   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1767   CHECK_EQ(rc, 0);
1768   CHECK_EQ(value, 1);
1769 #endif
1770 }
1771
1772 /**
1773  * Test AsyncServerSocket::removeAcceptCallback()
1774  */
1775 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1776   // Create a new AsyncServerSocket
1777   EventBase eventBase;
1778   std::shared_ptr<AsyncServerSocket> serverSocket(
1779       AsyncServerSocket::newSocket(&eventBase));
1780   serverSocket->bind(0);
1781   serverSocket->listen(16);
1782   folly::SocketAddress serverAddress;
1783   serverSocket->getAddress(&serverAddress);
1784
1785   // Add several accept callbacks
1786   TestAcceptCallback cb1;
1787   TestAcceptCallback cb2;
1788   TestAcceptCallback cb3;
1789   TestAcceptCallback cb4;
1790   TestAcceptCallback cb5;
1791   TestAcceptCallback cb6;
1792   TestAcceptCallback cb7;
1793
1794   // Test having callbacks remove other callbacks before them on the list,
1795   // after them on the list, or removing themselves.
1796   //
1797   // Have callback 2 remove callback 3 and callback 5 the first time it is
1798   // called.
1799   int cb2Count = 0;
1800   cb1.setConnectionAcceptedFn([&](int /* fd */,
1801                                   const folly::SocketAddress& /* addr */) {
1802     std::shared_ptr<AsyncSocket> sock2(
1803         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1804   });
1805   cb3.setConnectionAcceptedFn(
1806       [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1807   cb4.setConnectionAcceptedFn(
1808       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1809         std::shared_ptr<AsyncSocket> sock3(
1810             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1811       });
1812   cb5.setConnectionAcceptedFn(
1813       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1814         std::shared_ptr<AsyncSocket> sock5(
1815             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1816
1817       });
1818   cb2.setConnectionAcceptedFn(
1819       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1820         if (cb2Count == 0) {
1821           serverSocket->removeAcceptCallback(&cb3, nullptr);
1822           serverSocket->removeAcceptCallback(&cb5, nullptr);
1823         }
1824         ++cb2Count;
1825       });
1826   // Have callback 6 remove callback 4 the first time it is called,
1827   // and destroy the server socket the second time it is called
1828   int cb6Count = 0;
1829   cb6.setConnectionAcceptedFn(
1830       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1831         if (cb6Count == 0) {
1832           serverSocket->removeAcceptCallback(&cb4, nullptr);
1833           std::shared_ptr<AsyncSocket> sock6(
1834               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1835           std::shared_ptr<AsyncSocket> sock7(
1836               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1837           std::shared_ptr<AsyncSocket> sock8(
1838               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1839
1840         } else {
1841           serverSocket.reset();
1842         }
1843         ++cb6Count;
1844       });
1845   // Have callback 7 remove itself
1846   cb7.setConnectionAcceptedFn(
1847       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1848         serverSocket->removeAcceptCallback(&cb7, nullptr);
1849       });
1850
1851   serverSocket->addAcceptCallback(&cb1, nullptr);
1852   serverSocket->addAcceptCallback(&cb2, nullptr);
1853   serverSocket->addAcceptCallback(&cb3, nullptr);
1854   serverSocket->addAcceptCallback(&cb4, nullptr);
1855   serverSocket->addAcceptCallback(&cb5, nullptr);
1856   serverSocket->addAcceptCallback(&cb6, nullptr);
1857   serverSocket->addAcceptCallback(&cb7, nullptr);
1858   serverSocket->startAccepting();
1859
1860   // Make several connections to the socket
1861   std::shared_ptr<AsyncSocket> sock1(
1862       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1863   std::shared_ptr<AsyncSocket> sock4(
1864       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1865
1866   // Loop until we are stopped
1867   eventBase.loop();
1868
1869   // Check to make sure that the expected callbacks were invoked.
1870   //
1871   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1872   // the AcceptCallbacks in round-robin fashion, in the order that they were
1873   // added.  The code is implemented this way right now, but the API doesn't
1874   // explicitly require it be done this way.  If we change the code not to be
1875   // exactly round robin in the future, we can simplify the test checks here.
1876   // (We'll also need to update the termination code, since we expect cb6 to
1877   // get called twice to terminate the loop.)
1878   CHECK_EQ(cb1.getEvents()->size(), 4);
1879   CHECK_EQ(cb1.getEvents()->at(0).type,
1880                     TestAcceptCallback::TYPE_START);
1881   CHECK_EQ(cb1.getEvents()->at(1).type,
1882                     TestAcceptCallback::TYPE_ACCEPT);
1883   CHECK_EQ(cb1.getEvents()->at(2).type,
1884                     TestAcceptCallback::TYPE_ACCEPT);
1885   CHECK_EQ(cb1.getEvents()->at(3).type,
1886                     TestAcceptCallback::TYPE_STOP);
1887
1888   CHECK_EQ(cb2.getEvents()->size(), 4);
1889   CHECK_EQ(cb2.getEvents()->at(0).type,
1890                     TestAcceptCallback::TYPE_START);
1891   CHECK_EQ(cb2.getEvents()->at(1).type,
1892                     TestAcceptCallback::TYPE_ACCEPT);
1893   CHECK_EQ(cb2.getEvents()->at(2).type,
1894                     TestAcceptCallback::TYPE_ACCEPT);
1895   CHECK_EQ(cb2.getEvents()->at(3).type,
1896                     TestAcceptCallback::TYPE_STOP);
1897
1898   CHECK_EQ(cb3.getEvents()->size(), 2);
1899   CHECK_EQ(cb3.getEvents()->at(0).type,
1900                     TestAcceptCallback::TYPE_START);
1901   CHECK_EQ(cb3.getEvents()->at(1).type,
1902                     TestAcceptCallback::TYPE_STOP);
1903
1904   CHECK_EQ(cb4.getEvents()->size(), 3);
1905   CHECK_EQ(cb4.getEvents()->at(0).type,
1906                     TestAcceptCallback::TYPE_START);
1907   CHECK_EQ(cb4.getEvents()->at(1).type,
1908                     TestAcceptCallback::TYPE_ACCEPT);
1909   CHECK_EQ(cb4.getEvents()->at(2).type,
1910                     TestAcceptCallback::TYPE_STOP);
1911
1912   CHECK_EQ(cb5.getEvents()->size(), 2);
1913   CHECK_EQ(cb5.getEvents()->at(0).type,
1914                     TestAcceptCallback::TYPE_START);
1915   CHECK_EQ(cb5.getEvents()->at(1).type,
1916                     TestAcceptCallback::TYPE_STOP);
1917
1918   CHECK_EQ(cb6.getEvents()->size(), 4);
1919   CHECK_EQ(cb6.getEvents()->at(0).type,
1920                     TestAcceptCallback::TYPE_START);
1921   CHECK_EQ(cb6.getEvents()->at(1).type,
1922                     TestAcceptCallback::TYPE_ACCEPT);
1923   CHECK_EQ(cb6.getEvents()->at(2).type,
1924                     TestAcceptCallback::TYPE_ACCEPT);
1925   CHECK_EQ(cb6.getEvents()->at(3).type,
1926                     TestAcceptCallback::TYPE_STOP);
1927
1928   CHECK_EQ(cb7.getEvents()->size(), 3);
1929   CHECK_EQ(cb7.getEvents()->at(0).type,
1930                     TestAcceptCallback::TYPE_START);
1931   CHECK_EQ(cb7.getEvents()->at(1).type,
1932                     TestAcceptCallback::TYPE_ACCEPT);
1933   CHECK_EQ(cb7.getEvents()->at(2).type,
1934                     TestAcceptCallback::TYPE_STOP);
1935 }
1936
1937 /**
1938  * Test AsyncServerSocket::removeAcceptCallback()
1939  */
1940 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1941   // Create a new AsyncServerSocket
1942   EventBase eventBase;
1943   std::shared_ptr<AsyncServerSocket> serverSocket(
1944       AsyncServerSocket::newSocket(&eventBase));
1945   serverSocket->bind(0);
1946   serverSocket->listen(16);
1947   folly::SocketAddress serverAddress;
1948   serverSocket->getAddress(&serverAddress);
1949
1950   // Add several accept callbacks
1951   TestAcceptCallback cb1;
1952   auto thread_id = std::this_thread::get_id();
1953   cb1.setAcceptStartedFn([&](){
1954     CHECK_NE(thread_id, std::this_thread::get_id());
1955     thread_id = std::this_thread::get_id();
1956   });
1957   cb1.setConnectionAcceptedFn(
1958       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1959         CHECK_EQ(thread_id, std::this_thread::get_id());
1960         serverSocket->removeAcceptCallback(&cb1, nullptr);
1961       });
1962   cb1.setAcceptStoppedFn([&](){
1963     CHECK_EQ(thread_id, std::this_thread::get_id());
1964   });
1965
1966   // Test having callbacks remove other callbacks before them on the list,
1967   serverSocket->addAcceptCallback(&cb1, nullptr);
1968   serverSocket->startAccepting();
1969
1970   // Make several connections to the socket
1971   std::shared_ptr<AsyncSocket> sock1(
1972       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1973
1974   // Loop in another thread
1975   auto other = std::thread([&](){
1976     eventBase.loop();
1977   });
1978   other.join();
1979
1980   // Check to make sure that the expected callbacks were invoked.
1981   //
1982   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1983   // the AcceptCallbacks in round-robin fashion, in the order that they were
1984   // added.  The code is implemented this way right now, but the API doesn't
1985   // explicitly require it be done this way.  If we change the code not to be
1986   // exactly round robin in the future, we can simplify the test checks here.
1987   // (We'll also need to update the termination code, since we expect cb6 to
1988   // get called twice to terminate the loop.)
1989   CHECK_EQ(cb1.getEvents()->size(), 3);
1990   CHECK_EQ(cb1.getEvents()->at(0).type,
1991                     TestAcceptCallback::TYPE_START);
1992   CHECK_EQ(cb1.getEvents()->at(1).type,
1993                     TestAcceptCallback::TYPE_ACCEPT);
1994   CHECK_EQ(cb1.getEvents()->at(2).type,
1995                     TestAcceptCallback::TYPE_STOP);
1996
1997 }
1998
1999 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
2000   // Add a callback to accept one connection then stop accepting
2001   TestAcceptCallback acceptCallback;
2002   acceptCallback.setConnectionAcceptedFn(
2003       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2004         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2005       });
2006   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2007     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2008   });
2009   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2010   serverSocket->startAccepting();
2011
2012   // Connect to the server socket
2013   EventBase* eventBase = serverSocket->getEventBase();
2014   folly::SocketAddress serverAddress;
2015   serverSocket->getAddress(&serverAddress);
2016   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
2017
2018   // Loop to process all events
2019   eventBase->loop();
2020
2021   // Verify that the server accepted a connection
2022   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2023   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2024                     TestAcceptCallback::TYPE_START);
2025   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2026                     TestAcceptCallback::TYPE_ACCEPT);
2027   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2028                     TestAcceptCallback::TYPE_STOP);
2029 }
2030
2031 /* Verify that we don't leak sockets if we are destroyed()
2032  * and there are still writes pending
2033  *
2034  * If destroy() only calls close() instead of closeNow(),
2035  * it would shutdown(writes) on the socket, but it would
2036  * never be close()'d, and the socket would leak
2037  */
2038 TEST(AsyncSocketTest, DestroyCloseTest) {
2039   TestServer server;
2040
2041   // connect()
2042   EventBase clientEB;
2043   EventBase serverEB;
2044   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
2045   ConnCallback ccb;
2046   socket->connect(&ccb, server.getAddress(), 30);
2047
2048   // Accept the connection
2049   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
2050   ReadCallback rcb;
2051   acceptedSocket->setReadCB(&rcb);
2052
2053   // Write a large buffer to the socket that is larger than kernel buffer
2054   size_t simpleBufLength = 5000000;
2055   char* simpleBuf = new char[simpleBufLength];
2056   memset(simpleBuf, 'a', simpleBufLength);
2057   WriteCallback wcb;
2058
2059   // Let the reads and writes run to completion
2060   int fd = acceptedSocket->getFd();
2061
2062   acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2063   socket.reset();
2064   acceptedSocket.reset();
2065
2066   // Test that server socket was closed
2067   ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2068   CHECK_EQ(sz, -1);
2069   CHECK_EQ(errno, 9);
2070   delete[] simpleBuf;
2071 }
2072
2073 /**
2074  * Test AsyncServerSocket::useExistingSocket()
2075  */
2076 TEST(AsyncSocketTest, ServerExistingSocket) {
2077   EventBase eventBase;
2078
2079   // Test creating a socket, and letting AsyncServerSocket bind and listen
2080   {
2081     // Manually create a socket
2082     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2083     ASSERT_GE(fd, 0);
2084
2085     // Create a server socket
2086     AsyncServerSocket::UniquePtr serverSocket(
2087         new AsyncServerSocket(&eventBase));
2088     serverSocket->useExistingSocket(fd);
2089     folly::SocketAddress address;
2090     serverSocket->getAddress(&address);
2091     address.setPort(0);
2092     serverSocket->bind(address);
2093     serverSocket->listen(16);
2094
2095     // Make sure the socket works
2096     serverSocketSanityTest(serverSocket.get());
2097   }
2098
2099   // Test creating a socket and binding manually,
2100   // then letting AsyncServerSocket listen
2101   {
2102     // Manually create a socket
2103     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2104     ASSERT_GE(fd, 0);
2105     // bind
2106     struct sockaddr_in addr;
2107     addr.sin_family = AF_INET;
2108     addr.sin_port = 0;
2109     addr.sin_addr.s_addr = INADDR_ANY;
2110     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2111                              sizeof(addr)), 0);
2112     // Look up the address that we bound to
2113     folly::SocketAddress boundAddress;
2114     boundAddress.setFromLocalAddress(fd);
2115
2116     // Create a server socket
2117     AsyncServerSocket::UniquePtr serverSocket(
2118         new AsyncServerSocket(&eventBase));
2119     serverSocket->useExistingSocket(fd);
2120     serverSocket->listen(16);
2121
2122     // Make sure AsyncServerSocket reports the same address that we bound to
2123     folly::SocketAddress serverSocketAddress;
2124     serverSocket->getAddress(&serverSocketAddress);
2125     CHECK_EQ(boundAddress, serverSocketAddress);
2126
2127     // Make sure the socket works
2128     serverSocketSanityTest(serverSocket.get());
2129   }
2130
2131   // Test creating a socket, binding and listening manually,
2132   // then giving it to AsyncServerSocket
2133   {
2134     // Manually create a socket
2135     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2136     ASSERT_GE(fd, 0);
2137     // bind
2138     struct sockaddr_in addr;
2139     addr.sin_family = AF_INET;
2140     addr.sin_port = 0;
2141     addr.sin_addr.s_addr = INADDR_ANY;
2142     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2143                              sizeof(addr)), 0);
2144     // Look up the address that we bound to
2145     folly::SocketAddress boundAddress;
2146     boundAddress.setFromLocalAddress(fd);
2147     // listen
2148     CHECK_EQ(listen(fd, 16), 0);
2149
2150     // Create a server socket
2151     AsyncServerSocket::UniquePtr serverSocket(
2152         new AsyncServerSocket(&eventBase));
2153     serverSocket->useExistingSocket(fd);
2154
2155     // Make sure AsyncServerSocket reports the same address that we bound to
2156     folly::SocketAddress serverSocketAddress;
2157     serverSocket->getAddress(&serverSocketAddress);
2158     CHECK_EQ(boundAddress, serverSocketAddress);
2159
2160     // Make sure the socket works
2161     serverSocketSanityTest(serverSocket.get());
2162   }
2163 }
2164
2165 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2166   EventBase eventBase;
2167
2168   // Create a server socket
2169   std::shared_ptr<AsyncServerSocket> serverSocket(
2170       AsyncServerSocket::newSocket(&eventBase));
2171   string path(1, 0);
2172   path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2173   folly::SocketAddress serverAddress;
2174   serverAddress.setFromPath(path);
2175   serverSocket->bind(serverAddress);
2176   serverSocket->listen(16);
2177
2178   // Add a callback to accept one connection then stop the loop
2179   TestAcceptCallback acceptCallback;
2180   acceptCallback.setConnectionAcceptedFn(
2181       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2182         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2183       });
2184   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2185     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2186   });
2187   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2188   serverSocket->startAccepting();
2189
2190   // Connect to the server socket
2191   std::shared_ptr<AsyncSocket> socket(
2192       AsyncSocket::newSocket(&eventBase, serverAddress));
2193
2194   eventBase.loop();
2195
2196   // Verify that the server accepted a connection
2197   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2198   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2199                     TestAcceptCallback::TYPE_START);
2200   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2201                     TestAcceptCallback::TYPE_ACCEPT);
2202   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2203                     TestAcceptCallback::TYPE_STOP);
2204   int fd = acceptCallback.getEvents()->at(1).fd;
2205
2206   // The accepted connection should already be in non-blocking mode
2207   int flags = fcntl(fd, F_GETFL, 0);
2208   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2209 }
2210
2211 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2212   EventBase eventBase;
2213   TestConnectionEventCallback connectionEventCallback;
2214
2215   // Create a server socket
2216   std::shared_ptr<AsyncServerSocket> serverSocket(
2217       AsyncServerSocket::newSocket(&eventBase));
2218   serverSocket->setConnectionEventCallback(&connectionEventCallback);
2219   serverSocket->bind(0);
2220   serverSocket->listen(16);
2221   folly::SocketAddress serverAddress;
2222   serverSocket->getAddress(&serverAddress);
2223
2224   // Add a callback to accept one connection then stop the loop
2225   TestAcceptCallback acceptCallback;
2226   acceptCallback.setConnectionAcceptedFn(
2227       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2228         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2229       });
2230   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2231     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2232   });
2233   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2234   serverSocket->startAccepting();
2235
2236   // Connect to the server socket
2237   std::shared_ptr<AsyncSocket> socket(
2238       AsyncSocket::newSocket(&eventBase, serverAddress));
2239
2240   eventBase.loop();
2241
2242   // Validate the connection event counters
2243   ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2244   ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2245   ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2246   ASSERT_EQ(
2247       connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2248   ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2249   ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2250   ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2251   ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2252 }
2253
2254 /**
2255  * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2256  */
2257 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2258   EventBase eventBase;
2259
2260   // Counter of how many connections have been accepted
2261   int count = 0;
2262
2263   // Create a server socket
2264   auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2265   serverSocket->bind(0);
2266   serverSocket->listen(16);
2267   folly::SocketAddress serverAddress;
2268   serverSocket->getAddress(&serverAddress);
2269
2270   // Add a callback to accept connections
2271   TestAcceptCallback acceptCallback;
2272   acceptCallback.setConnectionAcceptedFn(
2273       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2274         count++;
2275         CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2276
2277         if (count == 4) {
2278           // all messages are processed, remove accept callback
2279           serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2280         }
2281       });
2282   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2283     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2284   });
2285   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2286   serverSocket->startAccepting();
2287
2288   // Connect to the server socket, 4 clients, there are 4 connections
2289   auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2290   auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2291   auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2292   auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2293
2294   eventBase.loop();
2295 }
2296
2297 /**
2298  * Test AsyncTransport::BufferCallback
2299  */
2300 TEST(AsyncSocketTest, BufferTest) {
2301   TestServer server;
2302
2303   EventBase evb;
2304   AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2305   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2306   ConnCallback ccb;
2307   socket->connect(&ccb, server.getAddress(), 30, option);
2308
2309   char buf[100 * 1024];
2310   memset(buf, 'c', sizeof(buf));
2311   WriteCallback wcb;
2312   BufferCallback bcb;
2313   socket->setBufferCallback(&bcb);
2314   socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2315
2316   evb.loop();
2317   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2318   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
2319
2320   ASSERT_TRUE(bcb.hasBuffered());
2321   ASSERT_TRUE(bcb.hasBufferCleared());
2322
2323   socket->close();
2324   server.verifyConnection(buf, sizeof(buf));
2325
2326   ASSERT_TRUE(socket->isClosedBySelf());
2327   ASSERT_FALSE(socket->isClosedByPeer());
2328 }
2329
2330 TEST(AsyncSocketTest, BufferCallbackKill) {
2331   TestServer server;
2332   EventBase evb;
2333   AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2334   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2335   ConnCallback ccb;
2336   socket->connect(&ccb, server.getAddress(), 30, option);
2337   evb.loopOnce();
2338
2339   char buf[100 * 1024];
2340   memset(buf, 'c', sizeof(buf));
2341   BufferCallback bcb;
2342   socket->setBufferCallback(&bcb);
2343   WriteCallback wcb;
2344   wcb.successCallback = [&] {
2345     ASSERT_TRUE(socket.unique());
2346     socket.reset();
2347   };
2348
2349   // This will trigger AsyncSocket::handleWrite,
2350   // which calls WriteCallback::writeSuccess,
2351   // which calls wcb.successCallback above,
2352   // which tries to delete socket
2353   // Then, the socket will also try to use this BufferCallback
2354   // And that should crash us, if there is no DestructorGuard on the stack
2355   socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2356
2357   evb.loop();
2358   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2359 }
2360
2361 #if FOLLY_ALLOW_TFO
2362 TEST(AsyncSocketTest, ConnectTFO) {
2363   // Start listening on a local port
2364   TestServer server(true);
2365
2366   // Connect using a AsyncSocket
2367   EventBase evb;
2368   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2369   socket->enableTFO();
2370   ConnCallback cb;
2371   socket->connect(&cb, server.getAddress(), 30);
2372
2373   std::array<uint8_t, 128> buf;
2374   memset(buf.data(), 'a', buf.size());
2375
2376   std::array<uint8_t, 3> readBuf;
2377   auto sendBuf = IOBuf::copyBuffer("hey");
2378
2379   std::thread t([&] {
2380     auto acceptedSocket = server.accept();
2381     acceptedSocket->write(buf.data(), buf.size());
2382     acceptedSocket->flush();
2383     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2384     acceptedSocket->close();
2385   });
2386
2387   evb.loop();
2388
2389   CHECK_EQ(cb.state, STATE_SUCCEEDED);
2390   EXPECT_LE(0, socket->getConnectTime().count());
2391   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2392   EXPECT_TRUE(socket->getTFOAttempted());
2393
2394   // Should trigger the connect
2395   WriteCallback write;
2396   ReadCallback rcb;
2397   socket->writeChain(&write, sendBuf->clone());
2398   socket->setReadCB(&rcb);
2399   evb.loop();
2400
2401   t.join();
2402
2403   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2404   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2405   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2406   ASSERT_EQ(1, rcb.buffers.size());
2407   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2408   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2409   EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
2410 }
2411
2412 /**
2413  * Test connecting to a server that isn't listening
2414  */
2415 TEST(AsyncSocketTest, ConnectRefusedTFO) {
2416   EventBase evb;
2417
2418   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2419
2420   socket->enableTFO();
2421
2422   // Hopefully nothing is actually listening on this address
2423   folly::SocketAddress addr("::1", 65535);
2424   ConnCallback cb;
2425   socket->connect(&cb, addr, 30);
2426
2427   evb.loop();
2428
2429   WriteCallback write1;
2430   // Trigger the connect if TFO attempt is supported.
2431   socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2432   evb.loop();
2433   WriteCallback write2;
2434   socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2435   evb.loop();
2436
2437   if (!socket->getTFOFinished()) {
2438     EXPECT_EQ(STATE_FAILED, write1.state);
2439     EXPECT_FALSE(socket->getTFOFinished());
2440   } else {
2441     EXPECT_EQ(STATE_SUCCEEDED, write1.state);
2442     EXPECT_TRUE(socket->getTFOFinished());
2443   }
2444
2445   EXPECT_EQ(STATE_FAILED, write2.state);
2446
2447   EXPECT_EQ(STATE_SUCCEEDED, cb.state);
2448   EXPECT_LE(0, socket->getConnectTime().count());
2449   EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2450   EXPECT_TRUE(socket->getTFOAttempted());
2451 }
2452
2453 /**
2454  * Test calling closeNow() immediately after connecting.
2455  */
2456 TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
2457   TestServer server(true);
2458
2459   // connect()
2460   EventBase evb;
2461   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2462   socket->enableTFO();
2463
2464   ConnCallback ccb;
2465   socket->connect(&ccb, server.getAddress(), 30);
2466
2467   // write()
2468   std::array<char, 128> buf;
2469   memset(buf.data(), 'a', buf.size());
2470
2471   // close()
2472   socket->closeNow();
2473
2474   // Loop, although there shouldn't be anything to do.
2475   evb.loop();
2476
2477   EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
2478   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2479
2480   ASSERT_TRUE(socket->isClosedBySelf());
2481   ASSERT_FALSE(socket->isClosedByPeer());
2482 }
2483
2484 /**
2485  * Test calling close() immediately after connect()
2486  */
2487 TEST(AsyncSocketTest, ConnectAndCloseTFO) {
2488   TestServer server(true);
2489
2490   // Connect using a AsyncSocket
2491   EventBase evb;
2492   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2493   socket->enableTFO();
2494
2495   ConnCallback ccb;
2496   socket->connect(&ccb, server.getAddress(), 30);
2497
2498   socket->close();
2499
2500   // Loop, although there shouldn't be anything to do.
2501   evb.loop();
2502
2503   // Make sure the connection was aborted
2504   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2505   EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
2506
2507   ASSERT_TRUE(socket->isClosedBySelf());
2508   ASSERT_FALSE(socket->isClosedByPeer());
2509 }
2510
2511 class MockAsyncTFOSocket : public AsyncSocket {
2512  public:
2513   using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
2514
2515   explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
2516
2517   MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
2518 };
2519
2520 TEST(AsyncSocketTest, TestTFOUnsupported) {
2521   TestServer server(true);
2522
2523   // Connect using a AsyncSocket
2524   EventBase evb;
2525   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2526   socket->enableTFO();
2527
2528   ConnCallback ccb;
2529   socket->connect(&ccb, server.getAddress(), 30);
2530   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2531
2532   ReadCallback rcb;
2533   socket->setReadCB(&rcb);
2534
2535   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2536       .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2537   WriteCallback write;
2538   auto sendBuf = IOBuf::copyBuffer("hey");
2539   socket->writeChain(&write, sendBuf->clone());
2540   EXPECT_EQ(STATE_WAITING, write.state);
2541
2542   std::array<uint8_t, 128> buf;
2543   memset(buf.data(), 'a', buf.size());
2544
2545   std::array<uint8_t, 3> readBuf;
2546
2547   std::thread t([&] {
2548     std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2549     acceptedSocket->write(buf.data(), buf.size());
2550     acceptedSocket->flush();
2551     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2552     acceptedSocket->close();
2553   });
2554
2555   evb.loop();
2556
2557   t.join();
2558   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2559   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2560
2561   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2562   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2563   ASSERT_EQ(1, rcb.buffers.size());
2564   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2565   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2566 }
2567
2568 TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
2569   // Try connecting to server that won't respond.
2570   //
2571   // This depends somewhat on the network where this test is run.
2572   // Hopefully this IP will be routable but unresponsive.
2573   // (Alternatively, we could try listening on a local raw socket, but that
2574   // normally requires root privileges.)
2575   auto host = SocketAddressTestHelper::isIPv6Enabled()
2576       ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2577       : SocketAddressTestHelper::isIPv4Enabled()
2578           ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2579           : nullptr;
2580   SocketAddress addr(host, 65535);
2581
2582   // Connect using a AsyncSocket
2583   EventBase evb;
2584   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2585   socket->enableTFO();
2586
2587   ConnCallback ccb;
2588   // Set a very small timeout
2589   socket->connect(&ccb, addr, 1);
2590   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2591
2592   ReadCallback rcb;
2593   socket->setReadCB(&rcb);
2594
2595   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2596       .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2597   WriteCallback write;
2598   socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2599
2600   evb.loop();
2601
2602   EXPECT_EQ(STATE_FAILED, write.state);
2603 }
2604
2605 TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
2606   TestServer server(true);
2607
2608   // Connect using a AsyncSocket
2609   EventBase evb;
2610   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2611   socket->enableTFO();
2612
2613   ConnCallback ccb;
2614   socket->connect(&ccb, server.getAddress(), 30);
2615   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2616
2617   ReadCallback rcb;
2618   socket->setReadCB(&rcb);
2619
2620   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2621       .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2622         sockaddr_storage addr;
2623         auto len = server.getAddress().getAddress(&addr);
2624         return connect(fd, (const struct sockaddr*)&addr, len);
2625       }));
2626   WriteCallback write;
2627   auto sendBuf = IOBuf::copyBuffer("hey");
2628   socket->writeChain(&write, sendBuf->clone());
2629   EXPECT_EQ(STATE_WAITING, write.state);
2630
2631   std::array<uint8_t, 128> buf;
2632   memset(buf.data(), 'a', buf.size());
2633
2634   std::array<uint8_t, 3> readBuf;
2635
2636   std::thread t([&] {
2637     std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2638     acceptedSocket->write(buf.data(), buf.size());
2639     acceptedSocket->flush();
2640     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2641     acceptedSocket->close();
2642   });
2643
2644   evb.loop();
2645
2646   t.join();
2647   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2648
2649   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2650   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2651
2652   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2653   ASSERT_EQ(1, rcb.buffers.size());
2654   ASSERT_EQ(buf.size(), rcb.buffers[0].length);
2655   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2656 }
2657
2658 TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
2659   // Try connecting to server that won't respond.
2660   //
2661   // This depends somewhat on the network where this test is run.
2662   // Hopefully this IP will be routable but unresponsive.
2663   // (Alternatively, we could try listening on a local raw socket, but that
2664   // normally requires root privileges.)
2665   auto host = SocketAddressTestHelper::isIPv6Enabled()
2666       ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2667       : SocketAddressTestHelper::isIPv4Enabled()
2668           ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2669           : nullptr;
2670   SocketAddress addr(host, 65535);
2671
2672   // Connect using a AsyncSocket
2673   EventBase evb;
2674   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2675   socket->enableTFO();
2676
2677   ConnCallback ccb;
2678   // Set a very small timeout
2679   socket->connect(&ccb, addr, 1);
2680   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2681
2682   ReadCallback rcb;
2683   socket->setReadCB(&rcb);
2684
2685   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2686       .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2687         sockaddr_storage addr2;
2688         auto len = addr.getAddress(&addr2);
2689         return connect(fd, (const struct sockaddr*)&addr2, len);
2690       }));
2691   WriteCallback write;
2692   socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2693
2694   evb.loop();
2695
2696   EXPECT_EQ(STATE_FAILED, write.state);
2697 }
2698
2699 TEST(AsyncSocketTest, TestTFOEagain) {
2700   TestServer server(true);
2701
2702   // Connect using a AsyncSocket
2703   EventBase evb;
2704   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2705   socket->enableTFO();
2706
2707   ConnCallback ccb;
2708   socket->connect(&ccb, server.getAddress(), 30);
2709
2710   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2711       .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
2712   WriteCallback write;
2713   socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2714
2715   evb.loop();
2716
2717   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2718   EXPECT_EQ(STATE_FAILED, write.state);
2719 }
2720
2721 // Sending a large amount of data in the first write which will
2722 // definitely not fit into MSS.
2723 TEST(AsyncSocketTest, ConnectTFOWithBigData) {
2724   // Start listening on a local port
2725   TestServer server(true);
2726
2727   // Connect using a AsyncSocket
2728   EventBase evb;
2729   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2730   socket->enableTFO();
2731   ConnCallback cb;
2732   socket->connect(&cb, server.getAddress(), 30);
2733
2734   std::array<uint8_t, 128> buf;
2735   memset(buf.data(), 'a', buf.size());
2736
2737   constexpr size_t len = 10 * 1024;
2738   auto sendBuf = IOBuf::create(len);
2739   sendBuf->append(len);
2740   std::array<uint8_t, len> readBuf;
2741
2742   std::thread t([&] {
2743     auto acceptedSocket = server.accept();
2744     acceptedSocket->write(buf.data(), buf.size());
2745     acceptedSocket->flush();
2746     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2747     acceptedSocket->close();
2748   });
2749
2750   evb.loop();
2751
2752   CHECK_EQ(cb.state, STATE_SUCCEEDED);
2753   EXPECT_LE(0, socket->getConnectTime().count());
2754   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2755   EXPECT_TRUE(socket->getTFOAttempted());
2756
2757   // Should trigger the connect
2758   WriteCallback write;
2759   ReadCallback rcb;
2760   socket->writeChain(&write, sendBuf->clone());
2761   socket->setReadCB(&rcb);
2762   evb.loop();
2763
2764   t.join();
2765
2766   EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
2767   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2768   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2769   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2770   ASSERT_EQ(1, rcb.buffers.size());
2771   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2772   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2773 }
2774
2775 #endif