b1f69db5d790aba182f00493e5d8c3ef9868816f
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/ExceptionWrapper.h>
17 #include <folly/RWSpinLock.h>
18 #include <folly/Random.h>
19 #include <folly/SocketAddress.h>
20 #include <folly/io/async/AsyncServerSocket.h>
21 #include <folly/io/async/AsyncSocket.h>
22 #include <folly/io/async/AsyncTimeout.h>
23 #include <folly/io/async/EventBase.h>
24
25 #include <folly/experimental/TestUtil.h>
26 #include <folly/io/IOBuf.h>
27 #include <folly/io/async/test/AsyncSocketTest.h>
28 #include <folly/io/async/test/Util.h>
29 #include <folly/portability/GMock.h>
30 #include <folly/portability/GTest.h>
31 #include <folly/portability/Sockets.h>
32 #include <folly/portability/Unistd.h>
33 #include <folly/test/SocketAddressTestHelper.h>
34
35 #include <boost/scoped_array.hpp>
36 #include <fcntl.h>
37 #include <sys/types.h>
38 #include <iostream>
39 #include <thread>
40
41 using namespace boost;
42
43 using std::string;
44 using std::vector;
45 using std::min;
46 using std::cerr;
47 using std::endl;
48 using std::unique_ptr;
49 using std::chrono::milliseconds;
50 using boost::scoped_array;
51
52 using namespace folly;
53 using namespace testing;
54
55 namespace fsp = folly::portability::sockets;
56
57 class DelayedWrite: public AsyncTimeout {
58  public:
59   DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
60       unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
61       bool cork, bool lastWrite = false):
62     AsyncTimeout(socket->getEventBase()),
63     socket_(socket),
64     bufs_(std::move(bufs)),
65     wcb_(wcb),
66     cork_(cork),
67     lastWrite_(lastWrite) {}
68
69  private:
70   void timeoutExpired() noexcept override {
71     WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
72     socket_->writeChain(wcb_, std::move(bufs_), flags);
73     if (lastWrite_) {
74       socket_->shutdownWrite();
75     }
76   }
77
78   std::shared_ptr<AsyncSocket> socket_;
79   unique_ptr<IOBuf> bufs_;
80   AsyncTransportWrapper::WriteCallback* wcb_;
81   bool cork_;
82   bool lastWrite_;
83 };
84
85 ///////////////////////////////////////////////////////////////////////////
86 // connect() tests
87 ///////////////////////////////////////////////////////////////////////////
88
89 /**
90  * Test connecting to a server
91  */
92 TEST(AsyncSocketTest, Connect) {
93   // Start listening on a local port
94   TestServer server;
95
96   // Connect using a AsyncSocket
97   EventBase evb;
98   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
99   ConnCallback cb;
100   socket->connect(&cb, server.getAddress(), 30);
101
102   evb.loop();
103
104   ASSERT_EQ(cb.state, STATE_SUCCEEDED);
105   EXPECT_LE(0, socket->getConnectTime().count());
106   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
107 }
108
109 enum class TFOState {
110   DISABLED,
111   ENABLED,
112 };
113
114 class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
115
116 std::vector<TFOState> getTestingValues() {
117   std::vector<TFOState> vals;
118   vals.emplace_back(TFOState::DISABLED);
119
120 #if FOLLY_ALLOW_TFO
121   vals.emplace_back(TFOState::ENABLED);
122 #endif
123   return vals;
124 }
125
126 INSTANTIATE_TEST_CASE_P(
127     ConnectTests,
128     AsyncSocketConnectTest,
129     ::testing::ValuesIn(getTestingValues()));
130
131 /**
132  * Test connecting to a server that isn't listening
133  */
134 TEST(AsyncSocketTest, ConnectRefused) {
135   EventBase evb;
136
137   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
138
139   // Hopefully nothing is actually listening on this address
140   folly::SocketAddress addr("127.0.0.1", 65535);
141   ConnCallback cb;
142   socket->connect(&cb, addr, 30);
143
144   evb.loop();
145
146   EXPECT_EQ(STATE_FAILED, cb.state);
147   EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
148   EXPECT_LE(0, socket->getConnectTime().count());
149   EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
150 }
151
152 /**
153  * Test connection timeout
154  */
155 TEST(AsyncSocketTest, ConnectTimeout) {
156   EventBase evb;
157
158   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
159
160   // Try connecting to server that won't respond.
161   //
162   // This depends somewhat on the network where this test is run.
163   // Hopefully this IP will be routable but unresponsive.
164   // (Alternatively, we could try listening on a local raw socket, but that
165   // normally requires root privileges.)
166   auto host =
167       SocketAddressTestHelper::isIPv6Enabled() ?
168       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
169       SocketAddressTestHelper::isIPv4Enabled() ?
170       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
171       nullptr;
172   SocketAddress addr(host, 65535);
173   ConnCallback cb;
174   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
175
176   evb.loop();
177
178   ASSERT_EQ(cb.state, STATE_FAILED);
179   ASSERT_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
180
181   // Verify that we can still get the peer address after a timeout.
182   // Use case is if the client was created from a client pool, and we want
183   // to log which peer failed.
184   folly::SocketAddress peer;
185   socket->getPeerAddress(&peer);
186   ASSERT_EQ(peer, addr);
187   EXPECT_LE(0, socket->getConnectTime().count());
188   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
189 }
190
191 /**
192  * Test writing immediately after connecting, without waiting for connect
193  * to finish.
194  */
195 TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
196   TestServer server;
197
198   // connect()
199   EventBase evb;
200   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
201
202   if (GetParam() == TFOState::ENABLED) {
203     socket->enableTFO();
204   }
205
206   ConnCallback ccb;
207   socket->connect(&ccb, server.getAddress(), 30);
208
209   // write()
210   char buf[128];
211   memset(buf, 'a', sizeof(buf));
212   WriteCallback wcb;
213   socket->write(&wcb, buf, sizeof(buf));
214
215   // Loop.  We don't bother accepting on the server socket yet.
216   // The kernel should be able to buffer the write request so it can succeed.
217   evb.loop();
218
219   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
220   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
221
222   // Make sure the server got a connection and received the data
223   socket->close();
224   server.verifyConnection(buf, sizeof(buf));
225
226   ASSERT_TRUE(socket->isClosedBySelf());
227   ASSERT_FALSE(socket->isClosedByPeer());
228   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
229 }
230
231 /**
232  * Test connecting using a nullptr connect callback.
233  */
234 TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
235   TestServer server;
236
237   // connect()
238   EventBase evb;
239   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
240   if (GetParam() == TFOState::ENABLED) {
241     socket->enableTFO();
242   }
243
244   socket->connect(nullptr, server.getAddress(), 30);
245
246   // write some data, just so we have some way of verifing
247   // that the socket works correctly after connecting
248   char buf[128];
249   memset(buf, 'a', sizeof(buf));
250   WriteCallback wcb;
251   socket->write(&wcb, buf, sizeof(buf));
252
253   evb.loop();
254
255   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
256
257   // Make sure the server got a connection and received the data
258   socket->close();
259   server.verifyConnection(buf, sizeof(buf));
260
261   ASSERT_TRUE(socket->isClosedBySelf());
262   ASSERT_FALSE(socket->isClosedByPeer());
263 }
264
265 /**
266  * Test calling both write() and close() immediately after connecting, without
267  * waiting for connect to finish.
268  *
269  * This exercises the STATE_CONNECTING_CLOSING code.
270  */
271 TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
272   TestServer server;
273
274   // connect()
275   EventBase evb;
276   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
277   if (GetParam() == TFOState::ENABLED) {
278     socket->enableTFO();
279   }
280   ConnCallback ccb;
281   socket->connect(&ccb, server.getAddress(), 30);
282
283   // write()
284   char buf[128];
285   memset(buf, 'a', sizeof(buf));
286   WriteCallback wcb;
287   socket->write(&wcb, buf, sizeof(buf));
288
289   // close()
290   socket->close();
291
292   // Loop.  We don't bother accepting on the server socket yet.
293   // The kernel should be able to buffer the write request so it can succeed.
294   evb.loop();
295
296   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
297   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
298
299   // Make sure the server got a connection and received the data
300   server.verifyConnection(buf, sizeof(buf));
301
302   ASSERT_TRUE(socket->isClosedBySelf());
303   ASSERT_FALSE(socket->isClosedByPeer());
304 }
305
306 /**
307  * Test calling close() immediately after connect()
308  */
309 TEST(AsyncSocketTest, ConnectAndClose) {
310   TestServer server;
311
312   // Connect using a AsyncSocket
313   EventBase evb;
314   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
315   ConnCallback ccb;
316   socket->connect(&ccb, server.getAddress(), 30);
317
318   // Hopefully the connect didn't succeed immediately.
319   // If it did, we can't exercise the close-while-connecting code path.
320   if (ccb.state == STATE_SUCCEEDED) {
321     LOG(INFO) << "connect() succeeded immediately; aborting test "
322                        "of close-during-connect behavior";
323     return;
324   }
325
326   socket->close();
327
328   // Loop, although there shouldn't be anything to do.
329   evb.loop();
330
331   // Make sure the connection was aborted
332   ASSERT_EQ(ccb.state, STATE_FAILED);
333
334   ASSERT_TRUE(socket->isClosedBySelf());
335   ASSERT_FALSE(socket->isClosedByPeer());
336 }
337
338 /**
339  * Test calling closeNow() immediately after connect()
340  *
341  * This should be identical to the normal close behavior.
342  */
343 TEST(AsyncSocketTest, ConnectAndCloseNow) {
344   TestServer server;
345
346   // Connect using a AsyncSocket
347   EventBase evb;
348   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
349   ConnCallback ccb;
350   socket->connect(&ccb, server.getAddress(), 30);
351
352   // Hopefully the connect didn't succeed immediately.
353   // If it did, we can't exercise the close-while-connecting code path.
354   if (ccb.state == STATE_SUCCEEDED) {
355     LOG(INFO) << "connect() succeeded immediately; aborting test "
356                        "of closeNow()-during-connect behavior";
357     return;
358   }
359
360   socket->closeNow();
361
362   // Loop, although there shouldn't be anything to do.
363   evb.loop();
364
365   // Make sure the connection was aborted
366   ASSERT_EQ(ccb.state, STATE_FAILED);
367
368   ASSERT_TRUE(socket->isClosedBySelf());
369   ASSERT_FALSE(socket->isClosedByPeer());
370 }
371
372 /**
373  * Test calling both write() and closeNow() immediately after connecting,
374  * without waiting for connect to finish.
375  *
376  * This should abort the pending write.
377  */
378 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
379   TestServer server;
380
381   // connect()
382   EventBase evb;
383   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
384   ConnCallback ccb;
385   socket->connect(&ccb, server.getAddress(), 30);
386
387   // Hopefully the connect didn't succeed immediately.
388   // If it did, we can't exercise the close-while-connecting code path.
389   if (ccb.state == STATE_SUCCEEDED) {
390     LOG(INFO) << "connect() succeeded immediately; aborting test "
391                        "of write-during-connect behavior";
392     return;
393   }
394
395   // write()
396   char buf[128];
397   memset(buf, 'a', sizeof(buf));
398   WriteCallback wcb;
399   socket->write(&wcb, buf, sizeof(buf));
400
401   // close()
402   socket->closeNow();
403
404   // Loop, although there shouldn't be anything to do.
405   evb.loop();
406
407   ASSERT_EQ(ccb.state, STATE_FAILED);
408   ASSERT_EQ(wcb.state, STATE_FAILED);
409
410   ASSERT_TRUE(socket->isClosedBySelf());
411   ASSERT_FALSE(socket->isClosedByPeer());
412 }
413
414 /**
415  * Test installing a read callback immediately, before connect() finishes.
416  */
417 TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
418   TestServer server;
419
420   // connect()
421   EventBase evb;
422   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
423   if (GetParam() == TFOState::ENABLED) {
424     socket->enableTFO();
425   }
426
427   ConnCallback ccb;
428   socket->connect(&ccb, server.getAddress(), 30);
429
430   ReadCallback rcb;
431   socket->setReadCB(&rcb);
432
433   if (GetParam() == TFOState::ENABLED) {
434     // Trigger a connection
435     socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
436   }
437
438   // Even though we haven't looped yet, we should be able to accept
439   // the connection and send data to it.
440   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
441   uint8_t buf[128];
442   memset(buf, 'a', sizeof(buf));
443   acceptedSocket->write(buf, sizeof(buf));
444   acceptedSocket->flush();
445   acceptedSocket->close();
446
447   // Loop, although there shouldn't be anything to do.
448   evb.loop();
449
450   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
451   ASSERT_EQ(rcb.buffers.size(), 1);
452   ASSERT_EQ(rcb.buffers[0].length, sizeof(buf));
453   ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
454
455   ASSERT_FALSE(socket->isClosedBySelf());
456   ASSERT_FALSE(socket->isClosedByPeer());
457 }
458
459 /**
460  * Test installing a read callback and then closing immediately before the
461  * connect attempt finishes.
462  */
463 TEST(AsyncSocketTest, ConnectReadAndClose) {
464   TestServer server;
465
466   // connect()
467   EventBase evb;
468   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
469   ConnCallback ccb;
470   socket->connect(&ccb, server.getAddress(), 30);
471
472   // Hopefully the connect didn't succeed immediately.
473   // If it did, we can't exercise the close-while-connecting code path.
474   if (ccb.state == STATE_SUCCEEDED) {
475     LOG(INFO) << "connect() succeeded immediately; aborting test "
476                        "of read-during-connect behavior";
477     return;
478   }
479
480   ReadCallback rcb;
481   socket->setReadCB(&rcb);
482
483   // close()
484   socket->close();
485
486   // Loop, although there shouldn't be anything to do.
487   evb.loop();
488
489   ASSERT_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
490   ASSERT_EQ(rcb.buffers.size(), 0);
491   ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
492
493   ASSERT_TRUE(socket->isClosedBySelf());
494   ASSERT_FALSE(socket->isClosedByPeer());
495 }
496
497 /**
498  * Test both writing and installing a read callback immediately,
499  * before connect() finishes.
500  */
501 TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
502   TestServer server;
503
504   // connect()
505   EventBase evb;
506   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
507   if (GetParam() == TFOState::ENABLED) {
508     socket->enableTFO();
509   }
510   ConnCallback ccb;
511   socket->connect(&ccb, server.getAddress(), 30);
512
513   // write()
514   char buf1[128];
515   memset(buf1, 'a', sizeof(buf1));
516   WriteCallback wcb;
517   socket->write(&wcb, buf1, sizeof(buf1));
518
519   // set a read callback
520   ReadCallback rcb;
521   socket->setReadCB(&rcb);
522
523   // Even though we haven't looped yet, we should be able to accept
524   // the connection and send data to it.
525   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
526   uint8_t buf2[128];
527   memset(buf2, 'b', sizeof(buf2));
528   acceptedSocket->write(buf2, sizeof(buf2));
529   acceptedSocket->flush();
530
531   // shut down the write half of acceptedSocket, so that the AsyncSocket
532   // will stop reading and we can break out of the event loop.
533   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
534
535   // Loop
536   evb.loop();
537
538   // Make sure the connect succeeded
539   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
540
541   // Make sure the AsyncSocket read the data written by the accepted socket
542   ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
543   ASSERT_EQ(rcb.buffers.size(), 1);
544   ASSERT_EQ(rcb.buffers[0].length, sizeof(buf2));
545   ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
546
547   // Close the AsyncSocket so we'll see EOF on acceptedSocket
548   socket->close();
549
550   // Make sure the accepted socket saw the data written by the AsyncSocket
551   uint8_t readbuf[sizeof(buf1)];
552   acceptedSocket->readAll(readbuf, sizeof(readbuf));
553   ASSERT_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
554   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
555   ASSERT_EQ(bytesRead, 0);
556
557   ASSERT_FALSE(socket->isClosedBySelf());
558   ASSERT_TRUE(socket->isClosedByPeer());
559 }
560
561 /**
562  * Test writing to the socket then shutting down writes before the connect
563  * attempt finishes.
564  */
565 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
566   TestServer server;
567
568   // connect()
569   EventBase evb;
570   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
571   ConnCallback ccb;
572   socket->connect(&ccb, server.getAddress(), 30);
573
574   // Hopefully the connect didn't succeed immediately.
575   // If it did, we can't exercise the write-while-connecting code path.
576   if (ccb.state == STATE_SUCCEEDED) {
577     LOG(INFO) << "connect() succeeded immediately; skipping test";
578     return;
579   }
580
581   // Ask to write some data
582   char wbuf[128];
583   memset(wbuf, 'a', sizeof(wbuf));
584   WriteCallback wcb;
585   socket->write(&wcb, wbuf, sizeof(wbuf));
586   socket->shutdownWrite();
587
588   // Shutdown writes
589   socket->shutdownWrite();
590
591   // Even though we haven't looped yet, we should be able to accept
592   // the connection.
593   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
594
595   // Since the connection is still in progress, there should be no data to
596   // read yet.  Verify that the accepted socket is not readable.
597   struct pollfd fds[1];
598   fds[0].fd = acceptedSocket->getSocketFD();
599   fds[0].events = POLLIN;
600   fds[0].revents = 0;
601   int rc = poll(fds, 1, 0);
602   ASSERT_EQ(rc, 0);
603
604   // Write data to the accepted socket
605   uint8_t acceptedWbuf[192];
606   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
607   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
608   acceptedSocket->flush();
609
610   // Loop
611   evb.loop();
612
613   // The loop should have completed the connection, written the queued data,
614   // and shutdown writes on the socket.
615   //
616   // Check that the connection was completed successfully and that the write
617   // callback succeeded.
618   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
619   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
620
621   // Check that we can read the data that was written to the socket, and that
622   // we see an EOF, since its socket was half-shutdown.
623   uint8_t readbuf[sizeof(wbuf)];
624   acceptedSocket->readAll(readbuf, sizeof(readbuf));
625   ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
626   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
627   ASSERT_EQ(bytesRead, 0);
628
629   // Close the accepted socket.  This will cause it to see EOF
630   // and uninstall the read callback when we loop next.
631   acceptedSocket->close();
632
633   // Install a read callback, then loop again.
634   ReadCallback rcb;
635   socket->setReadCB(&rcb);
636   evb.loop();
637
638   // This loop should have read the data and seen the EOF
639   ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
640   ASSERT_EQ(rcb.buffers.size(), 1);
641   ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
642   ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
643                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
644
645   ASSERT_FALSE(socket->isClosedBySelf());
646   ASSERT_FALSE(socket->isClosedByPeer());
647 }
648
649 /**
650  * Test reading, writing, and shutting down writes before the connect attempt
651  * finishes.
652  */
653 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
654   TestServer server;
655
656   // connect()
657   EventBase evb;
658   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
659   ConnCallback ccb;
660   socket->connect(&ccb, server.getAddress(), 30);
661
662   // Hopefully the connect didn't succeed immediately.
663   // If it did, we can't exercise the write-while-connecting code path.
664   if (ccb.state == STATE_SUCCEEDED) {
665     LOG(INFO) << "connect() succeeded immediately; skipping test";
666     return;
667   }
668
669   // Install a read callback
670   ReadCallback rcb;
671   socket->setReadCB(&rcb);
672
673   // Ask to write some data
674   char wbuf[128];
675   memset(wbuf, 'a', sizeof(wbuf));
676   WriteCallback wcb;
677   socket->write(&wcb, wbuf, sizeof(wbuf));
678
679   // Shutdown writes
680   socket->shutdownWrite();
681
682   // Even though we haven't looped yet, we should be able to accept
683   // the connection.
684   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
685
686   // Since the connection is still in progress, there should be no data to
687   // read yet.  Verify that the accepted socket is not readable.
688   struct pollfd fds[1];
689   fds[0].fd = acceptedSocket->getSocketFD();
690   fds[0].events = POLLIN;
691   fds[0].revents = 0;
692   int rc = poll(fds, 1, 0);
693   ASSERT_EQ(rc, 0);
694
695   // Write data to the accepted socket
696   uint8_t acceptedWbuf[192];
697   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
698   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
699   acceptedSocket->flush();
700   // Shutdown writes to the accepted socket.  This will cause it to see EOF
701   // and uninstall the read callback.
702   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
703
704   // Loop
705   evb.loop();
706
707   // The loop should have completed the connection, written the queued data,
708   // shutdown writes on the socket, read the data we wrote to it, and see the
709   // EOF.
710   //
711   // Check that the connection was completed successfully and that the read
712   // and write callbacks were invoked as expected.
713   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
714   ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
715   ASSERT_EQ(rcb.buffers.size(), 1);
716   ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
717   ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
718                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
719   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
720
721   // Check that we can read the data that was written to the socket, and that
722   // we see an EOF, since its socket was half-shutdown.
723   uint8_t readbuf[sizeof(wbuf)];
724   acceptedSocket->readAll(readbuf, sizeof(readbuf));
725   ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
726   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
727   ASSERT_EQ(bytesRead, 0);
728
729   // Fully close both sockets
730   acceptedSocket->close();
731   socket->close();
732
733   ASSERT_FALSE(socket->isClosedBySelf());
734   ASSERT_TRUE(socket->isClosedByPeer());
735 }
736
737 /**
738  * Test reading, writing, and calling shutdownWriteNow() before the
739  * connect attempt finishes.
740  */
741 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
742   TestServer server;
743
744   // connect()
745   EventBase evb;
746   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
747   ConnCallback ccb;
748   socket->connect(&ccb, server.getAddress(), 30);
749
750   // Hopefully the connect didn't succeed immediately.
751   // If it did, we can't exercise the write-while-connecting code path.
752   if (ccb.state == STATE_SUCCEEDED) {
753     LOG(INFO) << "connect() succeeded immediately; skipping test";
754     return;
755   }
756
757   // Install a read callback
758   ReadCallback rcb;
759   socket->setReadCB(&rcb);
760
761   // Ask to write some data
762   char wbuf[128];
763   memset(wbuf, 'a', sizeof(wbuf));
764   WriteCallback wcb;
765   socket->write(&wcb, wbuf, sizeof(wbuf));
766
767   // Shutdown writes immediately.
768   // This should immediately discard the data that we just tried to write.
769   socket->shutdownWriteNow();
770
771   // Verify that writeError() was invoked on the write callback.
772   ASSERT_EQ(wcb.state, STATE_FAILED);
773   ASSERT_EQ(wcb.bytesWritten, 0);
774
775   // Even though we haven't looped yet, we should be able to accept
776   // the connection.
777   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
778
779   // Since the connection is still in progress, there should be no data to
780   // read yet.  Verify that the accepted socket is not readable.
781   struct pollfd fds[1];
782   fds[0].fd = acceptedSocket->getSocketFD();
783   fds[0].events = POLLIN;
784   fds[0].revents = 0;
785   int rc = poll(fds, 1, 0);
786   ASSERT_EQ(rc, 0);
787
788   // Write data to the accepted socket
789   uint8_t acceptedWbuf[192];
790   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
791   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
792   acceptedSocket->flush();
793   // Shutdown writes to the accepted socket.  This will cause it to see EOF
794   // and uninstall the read callback.
795   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
796
797   // Loop
798   evb.loop();
799
800   // The loop should have completed the connection, written the queued data,
801   // shutdown writes on the socket, read the data we wrote to it, and see the
802   // EOF.
803   //
804   // Check that the connection was completed successfully and that the read
805   // callback was invoked as expected.
806   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
807   ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
808   ASSERT_EQ(rcb.buffers.size(), 1);
809   ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
810   ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
811                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
812
813   // Since we used shutdownWriteNow(), it should have discarded all pending
814   // write data.  Verify we see an immediate EOF when reading from the accepted
815   // socket.
816   uint8_t readbuf[sizeof(wbuf)];
817   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
818   ASSERT_EQ(bytesRead, 0);
819
820   // Fully close both sockets
821   acceptedSocket->close();
822   socket->close();
823
824   ASSERT_FALSE(socket->isClosedBySelf());
825   ASSERT_TRUE(socket->isClosedByPeer());
826 }
827
828 // Helper function for use in testConnectOptWrite()
829 // Temporarily disable the read callback
830 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
831   // Uninstall the read callback
832   socket->setReadCB(nullptr);
833   // Schedule the read callback to be reinstalled after 1ms
834   socket->getEventBase()->runInLoop(
835       std::bind(&AsyncSocket::setReadCB, socket, rcb));
836 }
837
838 /**
839  * Test connect+write, then have the connect callback perform another write.
840  *
841  * This tests interaction of the optimistic writing after connect with
842  * additional write attempts that occur in the connect callback.
843  */
844 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
845   TestServer server;
846   EventBase evb;
847   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
848
849   // connect()
850   ConnCallback ccb;
851   socket->connect(&ccb, server.getAddress(), 30);
852
853   // Hopefully the connect didn't succeed immediately.
854   // If it did, we can't exercise the optimistic write code path.
855   if (ccb.state == STATE_SUCCEEDED) {
856     LOG(INFO) << "connect() succeeded immediately; aborting test "
857                        "of optimistic write behavior";
858     return;
859   }
860
861   // Tell the connect callback to perform a write when the connect succeeds
862   WriteCallback wcb2;
863   scoped_array<char> buf2(new char[size2]);
864   memset(buf2.get(), 'b', size2);
865   if (size2 > 0) {
866     ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
867     // Tell the second write callback to close the connection when it is done
868     wcb2.successCallback = [&] { socket->closeNow(); };
869   }
870
871   // Schedule one write() immediately, before the connect finishes
872   scoped_array<char> buf1(new char[size1]);
873   memset(buf1.get(), 'a', size1);
874   WriteCallback wcb1;
875   if (size1 > 0) {
876     socket->write(&wcb1, buf1.get(), size1);
877   }
878
879   if (close) {
880     // immediately perform a close, before connect() completes
881     socket->close();
882   }
883
884   // Start reading from the other endpoint after 10ms.
885   // If we're using large buffers, we have to read so that the writes don't
886   // block forever.
887   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
888   ReadCallback rcb;
889   rcb.dataAvailableCallback = std::bind(tmpDisableReads,
890                                         acceptedSocket.get(), &rcb);
891   socket->getEventBase()->tryRunAfterDelay(
892       std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
893       10);
894
895   // Loop.  We don't bother accepting on the server socket yet.
896   // The kernel should be able to buffer the write request so it can succeed.
897   evb.loop();
898
899   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
900   if (size1 > 0) {
901     ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
902   }
903   if (size2 > 0) {
904     ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
905   }
906
907   socket->close();
908
909   // Make sure the read callback received all of the data
910   size_t bytesRead = 0;
911   for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
912        it != rcb.buffers.end();
913        ++it) {
914     size_t start = bytesRead;
915     bytesRead += it->length;
916     size_t end = bytesRead;
917     if (start < size1) {
918       size_t cmpLen = min(size1, end) - start;
919       ASSERT_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
920     }
921     if (end > size1 && end <= size1 + size2) {
922       size_t itOffset;
923       size_t buf2Offset;
924       size_t cmpLen;
925       if (start >= size1) {
926         itOffset = 0;
927         buf2Offset = start - size1;
928         cmpLen = end - start;
929       } else {
930         itOffset = size1 - start;
931         buf2Offset = 0;
932         cmpLen = end - size1;
933       }
934       ASSERT_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
935                                cmpLen),
936                         0);
937     }
938   }
939   ASSERT_EQ(bytesRead, size1 + size2);
940 }
941
942 TEST(AsyncSocketTest, ConnectCallbackWrite) {
943   // Test using small writes that should both succeed immediately
944   testConnectOptWrite(100, 200);
945
946   // Test using a large buffer in the connect callback, that should block
947   const size_t largeSize = 32 * 1024 * 1024;
948   testConnectOptWrite(100, largeSize);
949
950   // Test using a large initial write
951   testConnectOptWrite(largeSize, 100);
952
953   // Test using two large buffers
954   testConnectOptWrite(largeSize, largeSize);
955
956   // Test a small write in the connect callback,
957   // but no immediate write before connect completes
958   testConnectOptWrite(0, 64);
959
960   // Test a large write in the connect callback,
961   // but no immediate write before connect completes
962   testConnectOptWrite(0, largeSize);
963
964   // Test connect, a small write, then immediately call close() before connect
965   // completes
966   testConnectOptWrite(211, 0, true);
967
968   // Test connect, a large immediate write (that will block), then immediately
969   // call close() before connect completes
970   testConnectOptWrite(largeSize, 0, true);
971 }
972
973 ///////////////////////////////////////////////////////////////////////////
974 // write() related tests
975 ///////////////////////////////////////////////////////////////////////////
976
977 /**
978  * Test writing using a nullptr callback
979  */
980 TEST(AsyncSocketTest, WriteNullCallback) {
981   TestServer server;
982
983   // connect()
984   EventBase evb;
985   std::shared_ptr<AsyncSocket> socket =
986     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
987   evb.loop(); // loop until the socket is connected
988
989   // write() with a nullptr callback
990   char buf[128];
991   memset(buf, 'a', sizeof(buf));
992   socket->write(nullptr, buf, sizeof(buf));
993
994   evb.loop(); // loop until the data is sent
995
996   // Make sure the server got a connection and received the data
997   socket->close();
998   server.verifyConnection(buf, sizeof(buf));
999
1000   ASSERT_TRUE(socket->isClosedBySelf());
1001   ASSERT_FALSE(socket->isClosedByPeer());
1002 }
1003
1004 /**
1005  * Test writing with a send timeout
1006  */
1007 TEST(AsyncSocketTest, WriteTimeout) {
1008   TestServer server;
1009
1010   // connect()
1011   EventBase evb;
1012   std::shared_ptr<AsyncSocket> socket =
1013     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1014   evb.loop(); // loop until the socket is connected
1015
1016   // write() a large chunk of data, with no-one on the other end reading.
1017   // Tricky: the kernel caches the connection metrics for recently-used
1018   // routes (see tcp_no_metrics_save) so a freshly opened connection can
1019   // have a send buffer size bigger than wmem_default.  This makes the test
1020   // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
1021   size_t writeLength = 32 * 1024 * 1024;
1022   uint32_t timeout = 200;
1023   socket->setSendTimeout(timeout);
1024   scoped_array<char> buf(new char[writeLength]);
1025   memset(buf.get(), 'a', writeLength);
1026   WriteCallback wcb;
1027   socket->write(&wcb, buf.get(), writeLength);
1028
1029   TimePoint start;
1030   evb.loop();
1031   TimePoint end;
1032
1033   // Make sure the write attempt timed out as requested
1034   ASSERT_EQ(wcb.state, STATE_FAILED);
1035   ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
1036
1037   // Check that the write timed out within a reasonable period of time.
1038   // We don't check for exactly the specified timeout, since AsyncSocket only
1039   // times out when it hasn't made progress for that period of time.
1040   //
1041   // On linux, the first write sends a few hundred kb of data, then blocks for
1042   // writability, and then unblocks again after 40ms and is able to write
1043   // another smaller of data before blocking permanently.  Therefore it doesn't
1044   // time out until 40ms + timeout.
1045   //
1046   // I haven't fully verified the cause of this, but I believe it probably
1047   // occurs because the receiving end delays sending an ack for up to 40ms.
1048   // (This is the default value for TCP_DELACK_MIN.)  Once the sender receives
1049   // the ack, it can send some more data.  However, after that point the
1050   // receiver's kernel buffer is full.  This 40ms delay happens even with
1051   // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints.  However, the
1052   // kernel may be automatically disabling TCP_QUICKACK after receiving some
1053   // data.
1054   //
1055   // For now, we simply check that the timeout occurred within 160ms of
1056   // the requested value.
1057   T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1058 }
1059
1060 /**
1061  * Test writing to a socket that the remote endpoint has closed
1062  */
1063 TEST(AsyncSocketTest, WritePipeError) {
1064   TestServer server;
1065
1066   // connect()
1067   EventBase evb;
1068   std::shared_ptr<AsyncSocket> socket =
1069     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1070   socket->setSendTimeout(1000);
1071   evb.loop(); // loop until the socket is connected
1072
1073   // accept and immediately close the socket
1074   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1075   acceptedSocket->close();
1076
1077   // write() a large chunk of data
1078   size_t writeLength = 32 * 1024 * 1024;
1079   scoped_array<char> buf(new char[writeLength]);
1080   memset(buf.get(), 'a', writeLength);
1081   WriteCallback wcb;
1082   socket->write(&wcb, buf.get(), writeLength);
1083
1084   evb.loop();
1085
1086   // Make sure the write failed.
1087   // It would be nice if AsyncSocketException could convey the errno value,
1088   // so that we could check for EPIPE
1089   ASSERT_EQ(wcb.state, STATE_FAILED);
1090   ASSERT_EQ(wcb.exception.getType(),
1091                     AsyncSocketException::INTERNAL_ERROR);
1092
1093   ASSERT_FALSE(socket->isClosedBySelf());
1094   ASSERT_FALSE(socket->isClosedByPeer());
1095 }
1096
1097 /**
1098  * Test that bytes written is correctly computed in case of write failure
1099  */
1100 TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) {
1101   // Send and receive buffer sizes for the sockets.
1102   const int sockBufSize = 8 * 1024;
1103
1104   TestServer server(false, sockBufSize);
1105
1106   AsyncSocket::OptionMap options{
1107       {{SOL_SOCKET, SO_SNDBUF}, sockBufSize},
1108       {{SOL_SOCKET, SO_RCVBUF}, sockBufSize},
1109       {{IPPROTO_TCP, TCP_NODELAY}, 1},
1110   };
1111
1112   // The current thread will be used by the receiver - use a separate thread
1113   // for the sender.
1114   EventBase senderEvb;
1115   std::thread senderThread([&]() { senderEvb.loopForever(); });
1116
1117   ConnCallback ccb;
1118   std::shared_ptr<AsyncSocket> socket;
1119
1120   senderEvb.runInEventBaseThreadAndWait([&]() {
1121     socket = AsyncSocket::newSocket(&senderEvb);
1122     socket->connect(&ccb, server.getAddress(), 30, options);
1123   });
1124
1125   // accept the socket on the server side
1126   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1127
1128   // Send a big (45KB) write so that it is partially written. The first write
1129   // is 16KB (8KB on both sides) and subsequent writes are 8KB each. Reading
1130   // just under 24KB would cause 3-4 writes for the total of 32-40KB in the
1131   // following sequence: 16KB + 8KB + 8KB (+ 8KB). This ensures that not all
1132   // bytes are written when the socket is reset. Having at least 3 writes
1133   // ensures that the total size (45KB) would be exceeed in case of overcounting
1134   // based on the initial write size of 16KB.
1135   constexpr size_t sendSize = 45 * 1024;
1136   auto const sendBuf = std::vector<char>(sendSize, 'a');
1137
1138   WriteCallback wcb;
1139
1140   senderEvb.runInEventBaseThreadAndWait(
1141       [&]() { socket->write(&wcb, sendBuf.data(), sendSize); });
1142
1143   // Reading 20KB would cause three additional writes of 8KB, but less
1144   // than 45KB total, so the socket is reset before all bytes are written.
1145   constexpr size_t recvSize = 20 * 1024;
1146   uint8_t recvBuf[recvSize];
1147   int bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf));
1148
1149   acceptedSocket->closeWithReset();
1150
1151   senderEvb.terminateLoopSoon();
1152   senderThread.join();
1153
1154   LOG(INFO) << "Bytes written: " << wcb.bytesWritten;
1155
1156   ASSERT_EQ(STATE_FAILED, wcb.state);
1157   ASSERT_GE(wcb.bytesWritten, bytesRead);
1158   ASSERT_LE(wcb.bytesWritten, sendSize);
1159   ASSERT_EQ(recvSize, bytesRead);
1160   ASSERT(32 * 1024 == wcb.bytesWritten || 40 * 1024 == wcb.bytesWritten);
1161 }
1162
1163 /**
1164  * Test writing a mix of simple buffers and IOBufs
1165  */
1166 TEST(AsyncSocketTest, WriteIOBuf) {
1167   TestServer server;
1168
1169   // connect()
1170   EventBase evb;
1171   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1172   ConnCallback ccb;
1173   socket->connect(&ccb, server.getAddress(), 30);
1174
1175   // Accept the connection
1176   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1177   ReadCallback rcb;
1178   acceptedSocket->setReadCB(&rcb);
1179
1180   // Write a simple buffer to the socket
1181   constexpr size_t simpleBufLength = 5;
1182   char simpleBuf[simpleBufLength];
1183   memset(simpleBuf, 'a', simpleBufLength);
1184   WriteCallback wcb;
1185   socket->write(&wcb, simpleBuf, simpleBufLength);
1186
1187   // Write a single-element IOBuf chain
1188   size_t buf1Length = 7;
1189   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1190   memset(buf1->writableData(), 'b', buf1Length);
1191   buf1->append(buf1Length);
1192   unique_ptr<IOBuf> buf1Copy(buf1->clone());
1193   WriteCallback wcb2;
1194   socket->writeChain(&wcb2, std::move(buf1));
1195
1196   // Write a multiple-element IOBuf chain
1197   size_t buf2Length = 11;
1198   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1199   memset(buf2->writableData(), 'c', buf2Length);
1200   buf2->append(buf2Length);
1201   size_t buf3Length = 13;
1202   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1203   memset(buf3->writableData(), 'd', buf3Length);
1204   buf3->append(buf3Length);
1205   buf2->appendChain(std::move(buf3));
1206   unique_ptr<IOBuf> buf2Copy(buf2->clone());
1207   buf2Copy->coalesce();
1208   WriteCallback wcb3;
1209   socket->writeChain(&wcb3, std::move(buf2));
1210   socket->shutdownWrite();
1211
1212   // Let the reads and writes run to completion
1213   evb.loop();
1214
1215   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1216   ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1217   ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1218
1219   // Make sure the reader got the right data in the right order
1220   ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
1221   ASSERT_EQ(rcb.buffers.size(), 1);
1222   ASSERT_EQ(rcb.buffers[0].length,
1223       simpleBufLength + buf1Length + buf2Length + buf3Length);
1224   ASSERT_EQ(
1225       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1226   ASSERT_EQ(
1227       memcmp(rcb.buffers[0].buffer + simpleBufLength,
1228           buf1Copy->data(), buf1Copy->length()), 0);
1229   ASSERT_EQ(
1230       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1231           buf2Copy->data(), buf2Copy->length()), 0);
1232
1233   acceptedSocket->close();
1234   socket->close();
1235
1236   ASSERT_TRUE(socket->isClosedBySelf());
1237   ASSERT_FALSE(socket->isClosedByPeer());
1238 }
1239
1240 TEST(AsyncSocketTest, WriteIOBufCorked) {
1241   TestServer server;
1242
1243   // connect()
1244   EventBase evb;
1245   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1246   ConnCallback ccb;
1247   socket->connect(&ccb, server.getAddress(), 30);
1248
1249   // Accept the connection
1250   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1251   ReadCallback rcb;
1252   acceptedSocket->setReadCB(&rcb);
1253
1254   // Do three writes, 100ms apart, with the "cork" flag set
1255   // on the second write.  The reader should see the first write
1256   // arrive by itself, followed by the second and third writes
1257   // arriving together.
1258   size_t buf1Length = 5;
1259   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1260   memset(buf1->writableData(), 'a', buf1Length);
1261   buf1->append(buf1Length);
1262   size_t buf2Length = 7;
1263   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1264   memset(buf2->writableData(), 'b', buf2Length);
1265   buf2->append(buf2Length);
1266   size_t buf3Length = 11;
1267   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1268   memset(buf3->writableData(), 'c', buf3Length);
1269   buf3->append(buf3Length);
1270   WriteCallback wcb1;
1271   socket->writeChain(&wcb1, std::move(buf1));
1272   WriteCallback wcb2;
1273   DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1274   write2.scheduleTimeout(100);
1275   WriteCallback wcb3;
1276   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1277   write3.scheduleTimeout(140);
1278
1279   evb.loop();
1280   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
1281   ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1282   ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1283   if (wcb3.state != STATE_SUCCEEDED) {
1284     throw(wcb3.exception);
1285   }
1286   ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1287
1288   // Make sure the reader got the data with the right grouping
1289   ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
1290   ASSERT_EQ(rcb.buffers.size(), 2);
1291   ASSERT_EQ(rcb.buffers[0].length, buf1Length);
1292   ASSERT_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1293
1294   acceptedSocket->close();
1295   socket->close();
1296
1297   ASSERT_TRUE(socket->isClosedBySelf());
1298   ASSERT_FALSE(socket->isClosedByPeer());
1299 }
1300
1301 /**
1302  * Test performing a zero-length write
1303  */
1304 TEST(AsyncSocketTest, ZeroLengthWrite) {
1305   TestServer server;
1306
1307   // connect()
1308   EventBase evb;
1309   std::shared_ptr<AsyncSocket> socket =
1310     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1311   evb.loop(); // loop until the socket is connected
1312
1313   auto acceptedSocket = server.acceptAsync(&evb);
1314   ReadCallback rcb;
1315   acceptedSocket->setReadCB(&rcb);
1316
1317   size_t len1 = 1024*1024;
1318   size_t len2 = 1024*1024;
1319   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1320   memset(buf.get(), 'a', len1);
1321   memset(buf.get(), 'b', len2);
1322
1323   WriteCallback wcb1;
1324   WriteCallback wcb2;
1325   WriteCallback wcb3;
1326   WriteCallback wcb4;
1327   socket->write(&wcb1, buf.get(), 0);
1328   socket->write(&wcb2, buf.get(), len1);
1329   socket->write(&wcb3, buf.get() + len1, 0);
1330   socket->write(&wcb4, buf.get() + len1, len2);
1331   socket->close();
1332
1333   evb.loop(); // loop until the data is sent
1334
1335   ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1336   ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1337   ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1338   ASSERT_EQ(wcb4.state, STATE_SUCCEEDED);
1339   rcb.verifyData(buf.get(), len1 + len2);
1340
1341   ASSERT_TRUE(socket->isClosedBySelf());
1342   ASSERT_FALSE(socket->isClosedByPeer());
1343 }
1344
1345 TEST(AsyncSocketTest, ZeroLengthWritev) {
1346   TestServer server;
1347
1348   // connect()
1349   EventBase evb;
1350   std::shared_ptr<AsyncSocket> socket =
1351     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1352   evb.loop(); // loop until the socket is connected
1353
1354   auto acceptedSocket = server.acceptAsync(&evb);
1355   ReadCallback rcb;
1356   acceptedSocket->setReadCB(&rcb);
1357
1358   size_t len1 = 1024*1024;
1359   size_t len2 = 1024*1024;
1360   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1361   memset(buf.get(), 'a', len1);
1362   memset(buf.get(), 'b', len2);
1363
1364   WriteCallback wcb;
1365   constexpr size_t iovCount = 4;
1366   struct iovec iov[iovCount];
1367   iov[0].iov_base = buf.get();
1368   iov[0].iov_len = len1;
1369   iov[1].iov_base = buf.get() + len1;
1370   iov[1].iov_len = 0;
1371   iov[2].iov_base = buf.get() + len1;
1372   iov[2].iov_len = len2;
1373   iov[3].iov_base = buf.get() + len1 + len2;
1374   iov[3].iov_len = 0;
1375
1376   socket->writev(&wcb, iov, iovCount);
1377   socket->close();
1378   evb.loop(); // loop until the data is sent
1379
1380   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1381   rcb.verifyData(buf.get(), len1 + len2);
1382
1383   ASSERT_TRUE(socket->isClosedBySelf());
1384   ASSERT_FALSE(socket->isClosedByPeer());
1385 }
1386
1387 ///////////////////////////////////////////////////////////////////////////
1388 // close() related tests
1389 ///////////////////////////////////////////////////////////////////////////
1390
1391 /**
1392  * Test calling close() with pending writes when the socket is already closing.
1393  */
1394 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1395   TestServer server;
1396
1397   // connect()
1398   EventBase evb;
1399   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1400   ConnCallback ccb;
1401   socket->connect(&ccb, server.getAddress(), 30);
1402
1403   // accept the socket on the server side
1404   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1405
1406   // Loop to ensure the connect has completed
1407   evb.loop();
1408
1409   // Make sure we are connected
1410   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
1411
1412   // Schedule pending writes, until several write attempts have blocked
1413   char buf[128];
1414   memset(buf, 'a', sizeof(buf));
1415   typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1416   WriteCallbackVector writeCallbacks;
1417
1418   writeCallbacks.reserve(5);
1419   while (writeCallbacks.size() < 5) {
1420     std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1421
1422     socket->write(wcb.get(), buf, sizeof(buf));
1423     if (wcb->state == STATE_SUCCEEDED) {
1424       // Succeeded immediately.  Keep performing more writes
1425       continue;
1426     }
1427
1428     // This write is blocked.
1429     // Have the write callback call close() when writeError() is invoked
1430     wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1431     writeCallbacks.push_back(wcb);
1432   }
1433
1434   // Call closeNow() to immediately fail the pending writes
1435   socket->closeNow();
1436
1437   // Make sure writeError() was invoked on all of the pending write callbacks
1438   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1439        it != writeCallbacks.end();
1440        ++it) {
1441     ASSERT_EQ((*it)->state, STATE_FAILED);
1442   }
1443
1444   ASSERT_TRUE(socket->isClosedBySelf());
1445   ASSERT_FALSE(socket->isClosedByPeer());
1446 }
1447
1448 ///////////////////////////////////////////////////////////////////////////
1449 // ImmediateRead related tests
1450 ///////////////////////////////////////////////////////////////////////////
1451
1452 /* AsyncSocket use to verify immediate read works */
1453 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1454  public:
1455   bool immediateReadCalled = false;
1456   explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1457  protected:
1458   void checkForImmediateRead() noexcept override {
1459     immediateReadCalled = true;
1460     AsyncSocket::handleRead();
1461   }
1462 };
1463
1464 TEST(AsyncSocket, ConnectReadImmediateRead) {
1465   TestServer server;
1466
1467   const size_t maxBufferSz = 100;
1468   const size_t maxReadsPerEvent = 1;
1469   const size_t expectedDataSz = maxBufferSz * 3;
1470   char expectedData[expectedDataSz];
1471   memset(expectedData, 'j', expectedDataSz);
1472
1473   EventBase evb;
1474   ReadCallback rcb(maxBufferSz);
1475   AsyncSocketImmediateRead socket(&evb);
1476   socket.connect(nullptr, server.getAddress(), 30);
1477
1478   evb.loop(); // loop until the socket is connected
1479
1480   socket.setReadCB(&rcb);
1481   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1482   socket.immediateReadCalled = false;
1483
1484   auto acceptedSocket = server.acceptAsync(&evb);
1485
1486   ReadCallback rcbServer;
1487   WriteCallback wcbServer;
1488   rcbServer.dataAvailableCallback = [&]() {
1489     if (rcbServer.dataRead() == expectedDataSz) {
1490       // write back all data read
1491       rcbServer.verifyData(expectedData, expectedDataSz);
1492       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1493       acceptedSocket->close();
1494     }
1495   };
1496   acceptedSocket->setReadCB(&rcbServer);
1497
1498   // write data
1499   WriteCallback wcb1;
1500   socket.write(&wcb1, expectedData, expectedDataSz);
1501   evb.loop();
1502   ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1503   rcb.verifyData(expectedData, expectedDataSz);
1504   ASSERT_EQ(socket.immediateReadCalled, true);
1505
1506   ASSERT_FALSE(socket.isClosedBySelf());
1507   ASSERT_FALSE(socket.isClosedByPeer());
1508 }
1509
1510 TEST(AsyncSocket, ConnectReadUninstallRead) {
1511   TestServer server;
1512
1513   const size_t maxBufferSz = 100;
1514   const size_t maxReadsPerEvent = 1;
1515   const size_t expectedDataSz = maxBufferSz * 3;
1516   char expectedData[expectedDataSz];
1517   memset(expectedData, 'k', expectedDataSz);
1518
1519   EventBase evb;
1520   ReadCallback rcb(maxBufferSz);
1521   AsyncSocketImmediateRead socket(&evb);
1522   socket.connect(nullptr, server.getAddress(), 30);
1523
1524   evb.loop(); // loop until the socket is connected
1525
1526   socket.setReadCB(&rcb);
1527   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1528   socket.immediateReadCalled = false;
1529
1530   auto acceptedSocket = server.acceptAsync(&evb);
1531
1532   ReadCallback rcbServer;
1533   WriteCallback wcbServer;
1534   rcbServer.dataAvailableCallback = [&]() {
1535     if (rcbServer.dataRead() == expectedDataSz) {
1536       // write back all data read
1537       rcbServer.verifyData(expectedData, expectedDataSz);
1538       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1539       acceptedSocket->close();
1540     }
1541   };
1542   acceptedSocket->setReadCB(&rcbServer);
1543
1544   rcb.dataAvailableCallback = [&]() {
1545     // we read data and reset readCB
1546     socket.setReadCB(nullptr);
1547   };
1548
1549   // write data
1550   WriteCallback wcb;
1551   socket.write(&wcb, expectedData, expectedDataSz);
1552   evb.loop();
1553   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1554
1555   /* we shoud've only read maxBufferSz data since readCallback_
1556    * was reset in dataAvailableCallback */
1557   ASSERT_EQ(rcb.dataRead(), maxBufferSz);
1558   ASSERT_EQ(socket.immediateReadCalled, false);
1559
1560   ASSERT_FALSE(socket.isClosedBySelf());
1561   ASSERT_FALSE(socket.isClosedByPeer());
1562 }
1563
1564 // TODO:
1565 // - Test connect() and have the connect callback set the read callback
1566 // - Test connect() and have the connect callback unset the read callback
1567 // - Test reading/writing/closing/destroying the socket in the connect callback
1568 // - Test reading/writing/closing/destroying the socket in the read callback
1569 // - Test reading/writing/closing/destroying the socket in the write callback
1570 // - Test one-way shutdown behavior
1571 // - Test changing the EventBase
1572 //
1573 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1574 //   in connectSuccess(), readDataAvailable(), writeSuccess()
1575
1576
1577 ///////////////////////////////////////////////////////////////////////////
1578 // AsyncServerSocket tests
1579 ///////////////////////////////////////////////////////////////////////////
1580 namespace {
1581 /**
1582  * Helper ConnectionEventCallback class for the test code.
1583  * It maintains counters protected by a spin lock.
1584  */
1585 class TestConnectionEventCallback :
1586   public AsyncServerSocket::ConnectionEventCallback {
1587  public:
1588   virtual void onConnectionAccepted(
1589       const int /* socket */,
1590       const SocketAddress& /* addr */) noexcept override {
1591     folly::RWSpinLock::WriteHolder holder(spinLock_);
1592     connectionAccepted_++;
1593   }
1594
1595   virtual void onConnectionAcceptError(const int /* err */) noexcept override {
1596     folly::RWSpinLock::WriteHolder holder(spinLock_);
1597     connectionAcceptedError_++;
1598   }
1599
1600   virtual void onConnectionDropped(
1601       const int /* socket */,
1602       const SocketAddress& /* addr */) noexcept override {
1603     folly::RWSpinLock::WriteHolder holder(spinLock_);
1604     connectionDropped_++;
1605   }
1606
1607   virtual void onConnectionEnqueuedForAcceptorCallback(
1608       const int /* socket */,
1609       const SocketAddress& /* addr */) noexcept override {
1610     folly::RWSpinLock::WriteHolder holder(spinLock_);
1611     connectionEnqueuedForAcceptCallback_++;
1612   }
1613
1614   virtual void onConnectionDequeuedByAcceptorCallback(
1615       const int /* socket */,
1616       const SocketAddress& /* addr */) noexcept override {
1617     folly::RWSpinLock::WriteHolder holder(spinLock_);
1618     connectionDequeuedByAcceptCallback_++;
1619   }
1620
1621   virtual void onBackoffStarted() noexcept override {
1622     folly::RWSpinLock::WriteHolder holder(spinLock_);
1623     backoffStarted_++;
1624   }
1625
1626   virtual void onBackoffEnded() noexcept override {
1627     folly::RWSpinLock::WriteHolder holder(spinLock_);
1628     backoffEnded_++;
1629   }
1630
1631   virtual void onBackoffError() noexcept override {
1632     folly::RWSpinLock::WriteHolder holder(spinLock_);
1633     backoffError_++;
1634   }
1635
1636   unsigned int getConnectionAccepted() const {
1637     folly::RWSpinLock::ReadHolder holder(spinLock_);
1638     return connectionAccepted_;
1639   }
1640
1641   unsigned int getConnectionAcceptedError() const {
1642     folly::RWSpinLock::ReadHolder holder(spinLock_);
1643     return connectionAcceptedError_;
1644   }
1645
1646   unsigned int getConnectionDropped() const {
1647     folly::RWSpinLock::ReadHolder holder(spinLock_);
1648     return connectionDropped_;
1649   }
1650
1651   unsigned int getConnectionEnqueuedForAcceptCallback() const {
1652     folly::RWSpinLock::ReadHolder holder(spinLock_);
1653     return connectionEnqueuedForAcceptCallback_;
1654   }
1655
1656   unsigned int getConnectionDequeuedByAcceptCallback() const {
1657     folly::RWSpinLock::ReadHolder holder(spinLock_);
1658     return connectionDequeuedByAcceptCallback_;
1659   }
1660
1661   unsigned int getBackoffStarted() const {
1662     folly::RWSpinLock::ReadHolder holder(spinLock_);
1663     return backoffStarted_;
1664   }
1665
1666   unsigned int getBackoffEnded() const {
1667     folly::RWSpinLock::ReadHolder holder(spinLock_);
1668     return backoffEnded_;
1669   }
1670
1671   unsigned int getBackoffError() const {
1672     folly::RWSpinLock::ReadHolder holder(spinLock_);
1673     return backoffError_;
1674   }
1675
1676  private:
1677   mutable folly::RWSpinLock spinLock_;
1678   unsigned int connectionAccepted_{0};
1679   unsigned int connectionAcceptedError_{0};
1680   unsigned int connectionDropped_{0};
1681   unsigned int connectionEnqueuedForAcceptCallback_{0};
1682   unsigned int connectionDequeuedByAcceptCallback_{0};
1683   unsigned int backoffStarted_{0};
1684   unsigned int backoffEnded_{0};
1685   unsigned int backoffError_{0};
1686 };
1687
1688 /**
1689  * Helper AcceptCallback class for the test code
1690  * It records the callbacks that were invoked, and also supports calling
1691  * generic std::function objects in each callback.
1692  */
1693 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1694  public:
1695   enum EventType {
1696     TYPE_START,
1697     TYPE_ACCEPT,
1698     TYPE_ERROR,
1699     TYPE_STOP
1700   };
1701   struct EventInfo {
1702     EventInfo(int fd, const folly::SocketAddress& addr)
1703       : type(TYPE_ACCEPT),
1704         fd(fd),
1705         address(addr),
1706         errorMsg() {}
1707     explicit EventInfo(const std::string& msg)
1708       : type(TYPE_ERROR),
1709         fd(-1),
1710         address(),
1711         errorMsg(msg) {}
1712     explicit EventInfo(EventType et)
1713       : type(et),
1714         fd(-1),
1715         address(),
1716         errorMsg() {}
1717
1718     EventType type;
1719     int fd;  // valid for TYPE_ACCEPT
1720     folly::SocketAddress address;  // valid for TYPE_ACCEPT
1721     string errorMsg;  // valid for TYPE_ERROR
1722   };
1723   typedef std::deque<EventInfo> EventList;
1724
1725   TestAcceptCallback()
1726     : connectionAcceptedFn_(),
1727       acceptErrorFn_(),
1728       acceptStoppedFn_(),
1729       events_() {}
1730
1731   std::deque<EventInfo>* getEvents() {
1732     return &events_;
1733   }
1734
1735   void setConnectionAcceptedFn(
1736       const std::function<void(int, const folly::SocketAddress&)>& fn) {
1737     connectionAcceptedFn_ = fn;
1738   }
1739   void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1740     acceptErrorFn_ = fn;
1741   }
1742   void setAcceptStartedFn(const std::function<void()>& fn) {
1743     acceptStartedFn_ = fn;
1744   }
1745   void setAcceptStoppedFn(const std::function<void()>& fn) {
1746     acceptStoppedFn_ = fn;
1747   }
1748
1749   void connectionAccepted(
1750       int fd, const folly::SocketAddress& clientAddr) noexcept override {
1751     events_.emplace_back(fd, clientAddr);
1752
1753     if (connectionAcceptedFn_) {
1754       connectionAcceptedFn_(fd, clientAddr);
1755     }
1756   }
1757   void acceptError(const std::exception& ex) noexcept override {
1758     events_.emplace_back(ex.what());
1759
1760     if (acceptErrorFn_) {
1761       acceptErrorFn_(ex);
1762     }
1763   }
1764   void acceptStarted() noexcept override {
1765     events_.emplace_back(TYPE_START);
1766
1767     if (acceptStartedFn_) {
1768       acceptStartedFn_();
1769     }
1770   }
1771   void acceptStopped() noexcept override {
1772     events_.emplace_back(TYPE_STOP);
1773
1774     if (acceptStoppedFn_) {
1775       acceptStoppedFn_();
1776     }
1777   }
1778
1779  private:
1780   std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1781   std::function<void(const std::exception&)> acceptErrorFn_;
1782   std::function<void()> acceptStartedFn_;
1783   std::function<void()> acceptStoppedFn_;
1784
1785   std::deque<EventInfo> events_;
1786 };
1787 }
1788
1789 /**
1790  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1791  */
1792 TEST(AsyncSocketTest, ServerAcceptOptions) {
1793   EventBase eventBase;
1794
1795   // Create a server socket
1796   std::shared_ptr<AsyncServerSocket> serverSocket(
1797       AsyncServerSocket::newSocket(&eventBase));
1798   serverSocket->bind(0);
1799   serverSocket->listen(16);
1800   folly::SocketAddress serverAddress;
1801   serverSocket->getAddress(&serverAddress);
1802
1803   // Add a callback to accept one connection then stop the loop
1804   TestAcceptCallback acceptCallback;
1805   acceptCallback.setConnectionAcceptedFn(
1806       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1807         serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1808       });
1809   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1810     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1811   });
1812   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
1813   serverSocket->startAccepting();
1814
1815   // Connect to the server socket
1816   std::shared_ptr<AsyncSocket> socket(
1817       AsyncSocket::newSocket(&eventBase, serverAddress));
1818
1819   eventBase.loop();
1820
1821   // Verify that the server accepted a connection
1822   ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
1823   ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
1824                     TestAcceptCallback::TYPE_START);
1825   ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
1826                     TestAcceptCallback::TYPE_ACCEPT);
1827   ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
1828                     TestAcceptCallback::TYPE_STOP);
1829   int fd = acceptCallback.getEvents()->at(1).fd;
1830
1831   // The accepted connection should already be in non-blocking mode
1832   int flags = fcntl(fd, F_GETFL, 0);
1833   ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1834
1835 #ifndef TCP_NOPUSH
1836   // The accepted connection should already have TCP_NODELAY set
1837   int value;
1838   socklen_t valueLength = sizeof(value);
1839   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1840   ASSERT_EQ(rc, 0);
1841   ASSERT_EQ(value, 1);
1842 #endif
1843 }
1844
1845 /**
1846  * Test AsyncServerSocket::removeAcceptCallback()
1847  */
1848 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1849   // Create a new AsyncServerSocket
1850   EventBase eventBase;
1851   std::shared_ptr<AsyncServerSocket> serverSocket(
1852       AsyncServerSocket::newSocket(&eventBase));
1853   serverSocket->bind(0);
1854   serverSocket->listen(16);
1855   folly::SocketAddress serverAddress;
1856   serverSocket->getAddress(&serverAddress);
1857
1858   // Add several accept callbacks
1859   TestAcceptCallback cb1;
1860   TestAcceptCallback cb2;
1861   TestAcceptCallback cb3;
1862   TestAcceptCallback cb4;
1863   TestAcceptCallback cb5;
1864   TestAcceptCallback cb6;
1865   TestAcceptCallback cb7;
1866
1867   // Test having callbacks remove other callbacks before them on the list,
1868   // after them on the list, or removing themselves.
1869   //
1870   // Have callback 2 remove callback 3 and callback 5 the first time it is
1871   // called.
1872   int cb2Count = 0;
1873   cb1.setConnectionAcceptedFn([&](int /* fd */,
1874                                   const folly::SocketAddress& /* addr */) {
1875     std::shared_ptr<AsyncSocket> sock2(
1876         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1877   });
1878   cb3.setConnectionAcceptedFn(
1879       [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1880   cb4.setConnectionAcceptedFn(
1881       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1882         std::shared_ptr<AsyncSocket> sock3(
1883             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1884       });
1885   cb5.setConnectionAcceptedFn(
1886       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1887         std::shared_ptr<AsyncSocket> sock5(
1888             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1889
1890       });
1891   cb2.setConnectionAcceptedFn(
1892       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1893         if (cb2Count == 0) {
1894           serverSocket->removeAcceptCallback(&cb3, nullptr);
1895           serverSocket->removeAcceptCallback(&cb5, nullptr);
1896         }
1897         ++cb2Count;
1898       });
1899   // Have callback 6 remove callback 4 the first time it is called,
1900   // and destroy the server socket the second time it is called
1901   int cb6Count = 0;
1902   cb6.setConnectionAcceptedFn(
1903       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1904         if (cb6Count == 0) {
1905           serverSocket->removeAcceptCallback(&cb4, nullptr);
1906           std::shared_ptr<AsyncSocket> sock6(
1907               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1908           std::shared_ptr<AsyncSocket> sock7(
1909               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1910           std::shared_ptr<AsyncSocket> sock8(
1911               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1912
1913         } else {
1914           serverSocket.reset();
1915         }
1916         ++cb6Count;
1917       });
1918   // Have callback 7 remove itself
1919   cb7.setConnectionAcceptedFn(
1920       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1921         serverSocket->removeAcceptCallback(&cb7, nullptr);
1922       });
1923
1924   serverSocket->addAcceptCallback(&cb1, &eventBase);
1925   serverSocket->addAcceptCallback(&cb2, &eventBase);
1926   serverSocket->addAcceptCallback(&cb3, &eventBase);
1927   serverSocket->addAcceptCallback(&cb4, &eventBase);
1928   serverSocket->addAcceptCallback(&cb5, &eventBase);
1929   serverSocket->addAcceptCallback(&cb6, &eventBase);
1930   serverSocket->addAcceptCallback(&cb7, &eventBase);
1931   serverSocket->startAccepting();
1932
1933   // Make several connections to the socket
1934   std::shared_ptr<AsyncSocket> sock1(
1935       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1936   std::shared_ptr<AsyncSocket> sock4(
1937       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1938
1939   // Loop until we are stopped
1940   eventBase.loop();
1941
1942   // Check to make sure that the expected callbacks were invoked.
1943   //
1944   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1945   // the AcceptCallbacks in round-robin fashion, in the order that they were
1946   // added.  The code is implemented this way right now, but the API doesn't
1947   // explicitly require it be done this way.  If we change the code not to be
1948   // exactly round robin in the future, we can simplify the test checks here.
1949   // (We'll also need to update the termination code, since we expect cb6 to
1950   // get called twice to terminate the loop.)
1951   ASSERT_EQ(cb1.getEvents()->size(), 4);
1952   ASSERT_EQ(cb1.getEvents()->at(0).type,
1953                     TestAcceptCallback::TYPE_START);
1954   ASSERT_EQ(cb1.getEvents()->at(1).type,
1955                     TestAcceptCallback::TYPE_ACCEPT);
1956   ASSERT_EQ(cb1.getEvents()->at(2).type,
1957                     TestAcceptCallback::TYPE_ACCEPT);
1958   ASSERT_EQ(cb1.getEvents()->at(3).type,
1959                     TestAcceptCallback::TYPE_STOP);
1960
1961   ASSERT_EQ(cb2.getEvents()->size(), 4);
1962   ASSERT_EQ(cb2.getEvents()->at(0).type,
1963                     TestAcceptCallback::TYPE_START);
1964   ASSERT_EQ(cb2.getEvents()->at(1).type,
1965                     TestAcceptCallback::TYPE_ACCEPT);
1966   ASSERT_EQ(cb2.getEvents()->at(2).type,
1967                     TestAcceptCallback::TYPE_ACCEPT);
1968   ASSERT_EQ(cb2.getEvents()->at(3).type,
1969                     TestAcceptCallback::TYPE_STOP);
1970
1971   ASSERT_EQ(cb3.getEvents()->size(), 2);
1972   ASSERT_EQ(cb3.getEvents()->at(0).type,
1973                     TestAcceptCallback::TYPE_START);
1974   ASSERT_EQ(cb3.getEvents()->at(1).type,
1975                     TestAcceptCallback::TYPE_STOP);
1976
1977   ASSERT_EQ(cb4.getEvents()->size(), 3);
1978   ASSERT_EQ(cb4.getEvents()->at(0).type,
1979                     TestAcceptCallback::TYPE_START);
1980   ASSERT_EQ(cb4.getEvents()->at(1).type,
1981                     TestAcceptCallback::TYPE_ACCEPT);
1982   ASSERT_EQ(cb4.getEvents()->at(2).type,
1983                     TestAcceptCallback::TYPE_STOP);
1984
1985   ASSERT_EQ(cb5.getEvents()->size(), 2);
1986   ASSERT_EQ(cb5.getEvents()->at(0).type,
1987                     TestAcceptCallback::TYPE_START);
1988   ASSERT_EQ(cb5.getEvents()->at(1).type,
1989                     TestAcceptCallback::TYPE_STOP);
1990
1991   ASSERT_EQ(cb6.getEvents()->size(), 4);
1992   ASSERT_EQ(cb6.getEvents()->at(0).type,
1993                     TestAcceptCallback::TYPE_START);
1994   ASSERT_EQ(cb6.getEvents()->at(1).type,
1995                     TestAcceptCallback::TYPE_ACCEPT);
1996   ASSERT_EQ(cb6.getEvents()->at(2).type,
1997                     TestAcceptCallback::TYPE_ACCEPT);
1998   ASSERT_EQ(cb6.getEvents()->at(3).type,
1999                     TestAcceptCallback::TYPE_STOP);
2000
2001   ASSERT_EQ(cb7.getEvents()->size(), 3);
2002   ASSERT_EQ(cb7.getEvents()->at(0).type,
2003                     TestAcceptCallback::TYPE_START);
2004   ASSERT_EQ(cb7.getEvents()->at(1).type,
2005                     TestAcceptCallback::TYPE_ACCEPT);
2006   ASSERT_EQ(cb7.getEvents()->at(2).type,
2007                     TestAcceptCallback::TYPE_STOP);
2008 }
2009
2010 /**
2011  * Test AsyncServerSocket::removeAcceptCallback()
2012  */
2013 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
2014   // Create a new AsyncServerSocket
2015   EventBase eventBase;
2016   std::shared_ptr<AsyncServerSocket> serverSocket(
2017       AsyncServerSocket::newSocket(&eventBase));
2018   serverSocket->bind(0);
2019   serverSocket->listen(16);
2020   folly::SocketAddress serverAddress;
2021   serverSocket->getAddress(&serverAddress);
2022
2023   // Add several accept callbacks
2024   TestAcceptCallback cb1;
2025   auto thread_id = std::this_thread::get_id();
2026   cb1.setAcceptStartedFn([&](){
2027     CHECK_NE(thread_id, std::this_thread::get_id());
2028     thread_id = std::this_thread::get_id();
2029   });
2030   cb1.setConnectionAcceptedFn(
2031       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2032         ASSERT_EQ(thread_id, std::this_thread::get_id());
2033         serverSocket->removeAcceptCallback(&cb1, &eventBase);
2034       });
2035   cb1.setAcceptStoppedFn([&](){
2036     ASSERT_EQ(thread_id, std::this_thread::get_id());
2037   });
2038
2039   // Test having callbacks remove other callbacks before them on the list,
2040   serverSocket->addAcceptCallback(&cb1, &eventBase);
2041   serverSocket->startAccepting();
2042
2043   // Make several connections to the socket
2044   std::shared_ptr<AsyncSocket> sock1(
2045       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
2046
2047   // Loop in another thread
2048   auto other = std::thread([&](){
2049     eventBase.loop();
2050   });
2051   other.join();
2052
2053   // Check to make sure that the expected callbacks were invoked.
2054   //
2055   // NOTE: This code depends on the AsyncServerSocket operating calling all of
2056   // the AcceptCallbacks in round-robin fashion, in the order that they were
2057   // added.  The code is implemented this way right now, but the API doesn't
2058   // explicitly require it be done this way.  If we change the code not to be
2059   // exactly round robin in the future, we can simplify the test checks here.
2060   // (We'll also need to update the termination code, since we expect cb6 to
2061   // get called twice to terminate the loop.)
2062   ASSERT_EQ(cb1.getEvents()->size(), 3);
2063   ASSERT_EQ(cb1.getEvents()->at(0).type,
2064                     TestAcceptCallback::TYPE_START);
2065   ASSERT_EQ(cb1.getEvents()->at(1).type,
2066                     TestAcceptCallback::TYPE_ACCEPT);
2067   ASSERT_EQ(cb1.getEvents()->at(2).type,
2068                     TestAcceptCallback::TYPE_STOP);
2069
2070 }
2071
2072 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
2073   EventBase* eventBase = serverSocket->getEventBase();
2074   CHECK(eventBase);
2075
2076   // Add a callback to accept one connection then stop accepting
2077   TestAcceptCallback acceptCallback;
2078   acceptCallback.setConnectionAcceptedFn(
2079       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2080         serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
2081       });
2082   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2083     serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
2084   });
2085   serverSocket->addAcceptCallback(&acceptCallback, eventBase);
2086   serverSocket->startAccepting();
2087
2088   // Connect to the server socket
2089   folly::SocketAddress serverAddress;
2090   serverSocket->getAddress(&serverAddress);
2091   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
2092
2093   // Loop to process all events
2094   eventBase->loop();
2095
2096   // Verify that the server accepted a connection
2097   ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
2098   ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
2099                     TestAcceptCallback::TYPE_START);
2100   ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
2101                     TestAcceptCallback::TYPE_ACCEPT);
2102   ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
2103                     TestAcceptCallback::TYPE_STOP);
2104 }
2105
2106 /* Verify that we don't leak sockets if we are destroyed()
2107  * and there are still writes pending
2108  *
2109  * If destroy() only calls close() instead of closeNow(),
2110  * it would shutdown(writes) on the socket, but it would
2111  * never be close()'d, and the socket would leak
2112  */
2113 TEST(AsyncSocketTest, DestroyCloseTest) {
2114   TestServer server;
2115
2116   // connect()
2117   EventBase clientEB;
2118   EventBase serverEB;
2119   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
2120   ConnCallback ccb;
2121   socket->connect(&ccb, server.getAddress(), 30);
2122
2123   // Accept the connection
2124   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
2125   ReadCallback rcb;
2126   acceptedSocket->setReadCB(&rcb);
2127
2128   // Write a large buffer to the socket that is larger than kernel buffer
2129   size_t simpleBufLength = 5000000;
2130   char* simpleBuf = new char[simpleBufLength];
2131   memset(simpleBuf, 'a', simpleBufLength);
2132   WriteCallback wcb;
2133
2134   // Let the reads and writes run to completion
2135   int fd = acceptedSocket->getFd();
2136
2137   acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2138   socket.reset();
2139   acceptedSocket.reset();
2140
2141   // Test that server socket was closed
2142   folly::test::msvcSuppressAbortOnInvalidParams([&] {
2143     ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2144     ASSERT_EQ(sz, -1);
2145     ASSERT_EQ(errno, EBADF);
2146   });
2147   delete[] simpleBuf;
2148 }
2149
2150 /**
2151  * Test AsyncServerSocket::useExistingSocket()
2152  */
2153 TEST(AsyncSocketTest, ServerExistingSocket) {
2154   EventBase eventBase;
2155
2156   // Test creating a socket, and letting AsyncServerSocket bind and listen
2157   {
2158     // Manually create a socket
2159     int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2160     ASSERT_GE(fd, 0);
2161
2162     // Create a server socket
2163     AsyncServerSocket::UniquePtr serverSocket(
2164         new AsyncServerSocket(&eventBase));
2165     serverSocket->useExistingSocket(fd);
2166     folly::SocketAddress address;
2167     serverSocket->getAddress(&address);
2168     address.setPort(0);
2169     serverSocket->bind(address);
2170     serverSocket->listen(16);
2171
2172     // Make sure the socket works
2173     serverSocketSanityTest(serverSocket.get());
2174   }
2175
2176   // Test creating a socket and binding manually,
2177   // then letting AsyncServerSocket listen
2178   {
2179     // Manually create a socket
2180     int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2181     ASSERT_GE(fd, 0);
2182     // bind
2183     struct sockaddr_in addr;
2184     addr.sin_family = AF_INET;
2185     addr.sin_port = 0;
2186     addr.sin_addr.s_addr = INADDR_ANY;
2187     ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2188                              sizeof(addr)), 0);
2189     // Look up the address that we bound to
2190     folly::SocketAddress boundAddress;
2191     boundAddress.setFromLocalAddress(fd);
2192
2193     // Create a server socket
2194     AsyncServerSocket::UniquePtr serverSocket(
2195         new AsyncServerSocket(&eventBase));
2196     serverSocket->useExistingSocket(fd);
2197     serverSocket->listen(16);
2198
2199     // Make sure AsyncServerSocket reports the same address that we bound to
2200     folly::SocketAddress serverSocketAddress;
2201     serverSocket->getAddress(&serverSocketAddress);
2202     ASSERT_EQ(boundAddress, serverSocketAddress);
2203
2204     // Make sure the socket works
2205     serverSocketSanityTest(serverSocket.get());
2206   }
2207
2208   // Test creating a socket, binding and listening manually,
2209   // then giving it to AsyncServerSocket
2210   {
2211     // Manually create a socket
2212     int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2213     ASSERT_GE(fd, 0);
2214     // bind
2215     struct sockaddr_in addr;
2216     addr.sin_family = AF_INET;
2217     addr.sin_port = 0;
2218     addr.sin_addr.s_addr = INADDR_ANY;
2219     ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2220                              sizeof(addr)), 0);
2221     // Look up the address that we bound to
2222     folly::SocketAddress boundAddress;
2223     boundAddress.setFromLocalAddress(fd);
2224     // listen
2225     ASSERT_EQ(listen(fd, 16), 0);
2226
2227     // Create a server socket
2228     AsyncServerSocket::UniquePtr serverSocket(
2229         new AsyncServerSocket(&eventBase));
2230     serverSocket->useExistingSocket(fd);
2231
2232     // Make sure AsyncServerSocket reports the same address that we bound to
2233     folly::SocketAddress serverSocketAddress;
2234     serverSocket->getAddress(&serverSocketAddress);
2235     ASSERT_EQ(boundAddress, serverSocketAddress);
2236
2237     // Make sure the socket works
2238     serverSocketSanityTest(serverSocket.get());
2239   }
2240 }
2241
2242 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2243   EventBase eventBase;
2244
2245   // Create a server socket
2246   std::shared_ptr<AsyncServerSocket> serverSocket(
2247       AsyncServerSocket::newSocket(&eventBase));
2248   string path(1, 0);
2249   path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2250   folly::SocketAddress serverAddress;
2251   serverAddress.setFromPath(path);
2252   serverSocket->bind(serverAddress);
2253   serverSocket->listen(16);
2254
2255   // Add a callback to accept one connection then stop the loop
2256   TestAcceptCallback acceptCallback;
2257   acceptCallback.setConnectionAcceptedFn(
2258       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2259         serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2260       });
2261   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2262     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2263   });
2264   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2265   serverSocket->startAccepting();
2266
2267   // Connect to the server socket
2268   std::shared_ptr<AsyncSocket> socket(
2269       AsyncSocket::newSocket(&eventBase, serverAddress));
2270
2271   eventBase.loop();
2272
2273   // Verify that the server accepted a connection
2274   ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
2275   ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
2276                     TestAcceptCallback::TYPE_START);
2277   ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
2278                     TestAcceptCallback::TYPE_ACCEPT);
2279   ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
2280                     TestAcceptCallback::TYPE_STOP);
2281   int fd = acceptCallback.getEvents()->at(1).fd;
2282
2283   // The accepted connection should already be in non-blocking mode
2284   int flags = fcntl(fd, F_GETFL, 0);
2285   ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2286 }
2287
2288 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2289   EventBase eventBase;
2290   TestConnectionEventCallback connectionEventCallback;
2291
2292   // Create a server socket
2293   std::shared_ptr<AsyncServerSocket> serverSocket(
2294       AsyncServerSocket::newSocket(&eventBase));
2295   serverSocket->setConnectionEventCallback(&connectionEventCallback);
2296   serverSocket->bind(0);
2297   serverSocket->listen(16);
2298   folly::SocketAddress serverAddress;
2299   serverSocket->getAddress(&serverAddress);
2300
2301   // Add a callback to accept one connection then stop the loop
2302   TestAcceptCallback acceptCallback;
2303   acceptCallback.setConnectionAcceptedFn(
2304       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2305         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2306       });
2307   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2308     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2309   });
2310   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2311   serverSocket->startAccepting();
2312
2313   // Connect to the server socket
2314   std::shared_ptr<AsyncSocket> socket(
2315       AsyncSocket::newSocket(&eventBase, serverAddress));
2316
2317   eventBase.loop();
2318
2319   // Validate the connection event counters
2320   ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2321   ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2322   ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2323   ASSERT_EQ(
2324       connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2325   ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2326   ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2327   ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2328   ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2329 }
2330
2331 TEST(AsyncSocketTest, CallbackInPrimaryEventBase) {
2332   EventBase eventBase;
2333   TestConnectionEventCallback connectionEventCallback;
2334
2335   // Create a server socket
2336   std::shared_ptr<AsyncServerSocket> serverSocket(
2337       AsyncServerSocket::newSocket(&eventBase));
2338   serverSocket->setConnectionEventCallback(&connectionEventCallback);
2339   serverSocket->bind(0);
2340   serverSocket->listen(16);
2341   folly::SocketAddress serverAddress;
2342   serverSocket->getAddress(&serverAddress);
2343
2344   // Add a callback to accept one connection then stop the loop
2345   TestAcceptCallback acceptCallback;
2346   acceptCallback.setConnectionAcceptedFn(
2347       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2348         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2349       });
2350   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2351     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2352   });
2353   bool acceptStartedFlag{false};
2354   acceptCallback.setAcceptStartedFn([&acceptStartedFlag](){
2355     acceptStartedFlag = true;
2356   });
2357   bool acceptStoppedFlag{false};
2358   acceptCallback.setAcceptStoppedFn([&acceptStoppedFlag](){
2359     acceptStoppedFlag = true;
2360   });
2361   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2362   serverSocket->startAccepting();
2363
2364   // Connect to the server socket
2365   std::shared_ptr<AsyncSocket> socket(
2366       AsyncSocket::newSocket(&eventBase, serverAddress));
2367
2368   eventBase.loop();
2369
2370   ASSERT_TRUE(acceptStartedFlag);
2371   ASSERT_TRUE(acceptStoppedFlag);
2372   // Validate the connection event counters
2373   ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2374   ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2375   ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2376   ASSERT_EQ(
2377       connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
2378   ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
2379   ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2380   ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2381   ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2382 }
2383
2384
2385
2386 /**
2387  * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2388  */
2389 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2390   EventBase eventBase;
2391
2392   // Counter of how many connections have been accepted
2393   int count = 0;
2394
2395   // Create a server socket
2396   auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2397   serverSocket->bind(0);
2398   serverSocket->listen(16);
2399   folly::SocketAddress serverAddress;
2400   serverSocket->getAddress(&serverAddress);
2401
2402   // Add a callback to accept connections
2403   TestAcceptCallback acceptCallback;
2404   acceptCallback.setConnectionAcceptedFn(
2405       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2406         count++;
2407         ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2408
2409         if (count == 4) {
2410           // all messages are processed, remove accept callback
2411           serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2412         }
2413       });
2414   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2415     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2416   });
2417   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2418   serverSocket->startAccepting();
2419
2420   // Connect to the server socket, 4 clients, there are 4 connections
2421   auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2422   auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2423   auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2424   auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2425
2426   eventBase.loop();
2427 }
2428
2429 /**
2430  * Test AsyncTransport::BufferCallback
2431  */
2432 TEST(AsyncSocketTest, BufferTest) {
2433   TestServer server;
2434
2435   EventBase evb;
2436   AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2437   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2438   ConnCallback ccb;
2439   socket->connect(&ccb, server.getAddress(), 30, option);
2440
2441   char buf[100 * 1024];
2442   memset(buf, 'c', sizeof(buf));
2443   WriteCallback wcb;
2444   BufferCallback bcb;
2445   socket->setBufferCallback(&bcb);
2446   socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2447
2448   evb.loop();
2449   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2450   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
2451
2452   ASSERT_TRUE(bcb.hasBuffered());
2453   ASSERT_TRUE(bcb.hasBufferCleared());
2454
2455   socket->close();
2456   server.verifyConnection(buf, sizeof(buf));
2457
2458   ASSERT_TRUE(socket->isClosedBySelf());
2459   ASSERT_FALSE(socket->isClosedByPeer());
2460 }
2461
2462 TEST(AsyncSocketTest, BufferCallbackKill) {
2463   TestServer server;
2464   EventBase evb;
2465   AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2466   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2467   ConnCallback ccb;
2468   socket->connect(&ccb, server.getAddress(), 30, option);
2469   evb.loopOnce();
2470
2471   char buf[100 * 1024];
2472   memset(buf, 'c', sizeof(buf));
2473   BufferCallback bcb;
2474   socket->setBufferCallback(&bcb);
2475   WriteCallback wcb;
2476   wcb.successCallback = [&] {
2477     ASSERT_TRUE(socket.unique());
2478     socket.reset();
2479   };
2480
2481   // This will trigger AsyncSocket::handleWrite,
2482   // which calls WriteCallback::writeSuccess,
2483   // which calls wcb.successCallback above,
2484   // which tries to delete socket
2485   // Then, the socket will also try to use this BufferCallback
2486   // And that should crash us, if there is no DestructorGuard on the stack
2487   socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2488
2489   evb.loop();
2490   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2491 }
2492
2493 #if FOLLY_ALLOW_TFO
2494 TEST(AsyncSocketTest, ConnectTFO) {
2495   // Start listening on a local port
2496   TestServer server(true);
2497
2498   // Connect using a AsyncSocket
2499   EventBase evb;
2500   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2501   socket->enableTFO();
2502   ConnCallback cb;
2503   socket->connect(&cb, server.getAddress(), 30);
2504
2505   std::array<uint8_t, 128> buf;
2506   memset(buf.data(), 'a', buf.size());
2507
2508   std::array<uint8_t, 3> readBuf;
2509   auto sendBuf = IOBuf::copyBuffer("hey");
2510
2511   std::thread t([&] {
2512     auto acceptedSocket = server.accept();
2513     acceptedSocket->write(buf.data(), buf.size());
2514     acceptedSocket->flush();
2515     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2516     acceptedSocket->close();
2517   });
2518
2519   evb.loop();
2520
2521   ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2522   EXPECT_LE(0, socket->getConnectTime().count());
2523   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2524   EXPECT_TRUE(socket->getTFOAttempted());
2525
2526   // Should trigger the connect
2527   WriteCallback write;
2528   ReadCallback rcb;
2529   socket->writeChain(&write, sendBuf->clone());
2530   socket->setReadCB(&rcb);
2531   evb.loop();
2532
2533   t.join();
2534
2535   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2536   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2537   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2538   ASSERT_EQ(1, rcb.buffers.size());
2539   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2540   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2541   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2542 }
2543
2544 TEST(AsyncSocketTest, ConnectTFOSupplyEarlyReadCB) {
2545   // Start listening on a local port
2546   TestServer server(true);
2547
2548   // Connect using a AsyncSocket
2549   EventBase evb;
2550   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2551   socket->enableTFO();
2552   ConnCallback cb;
2553   socket->connect(&cb, server.getAddress(), 30);
2554   ReadCallback rcb;
2555   socket->setReadCB(&rcb);
2556
2557   std::array<uint8_t, 128> buf;
2558   memset(buf.data(), 'a', buf.size());
2559
2560   std::array<uint8_t, 3> readBuf;
2561   auto sendBuf = IOBuf::copyBuffer("hey");
2562
2563   std::thread t([&] {
2564     auto acceptedSocket = server.accept();
2565     acceptedSocket->write(buf.data(), buf.size());
2566     acceptedSocket->flush();
2567     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2568     acceptedSocket->close();
2569   });
2570
2571   evb.loop();
2572
2573   ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2574   EXPECT_LE(0, socket->getConnectTime().count());
2575   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2576   EXPECT_TRUE(socket->getTFOAttempted());
2577
2578   // Should trigger the connect
2579   WriteCallback write;
2580   socket->writeChain(&write, sendBuf->clone());
2581   evb.loop();
2582
2583   t.join();
2584
2585   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2586   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2587   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2588   ASSERT_EQ(1, rcb.buffers.size());
2589   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2590   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2591   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2592 }
2593
2594 /**
2595  * Test connecting to a server that isn't listening
2596  */
2597 TEST(AsyncSocketTest, ConnectRefusedImmediatelyTFO) {
2598   EventBase evb;
2599
2600   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2601
2602   socket->enableTFO();
2603
2604   // Hopefully nothing is actually listening on this address
2605   folly::SocketAddress addr("::1", 65535);
2606   ConnCallback cb;
2607   socket->connect(&cb, addr, 30);
2608
2609   evb.loop();
2610
2611   WriteCallback write1;
2612   // Trigger the connect if TFO attempt is supported.
2613   socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2614   WriteCallback write2;
2615   socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2616   evb.loop();
2617
2618   if (!socket->getTFOFinished()) {
2619     EXPECT_EQ(STATE_FAILED, write1.state);
2620   } else {
2621     EXPECT_EQ(STATE_SUCCEEDED, write1.state);
2622     EXPECT_FALSE(socket->getTFOSucceded());
2623   }
2624
2625   EXPECT_EQ(STATE_FAILED, write2.state);
2626
2627   EXPECT_EQ(STATE_SUCCEEDED, cb.state);
2628   EXPECT_LE(0, socket->getConnectTime().count());
2629   EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2630   EXPECT_TRUE(socket->getTFOAttempted());
2631 }
2632
2633 /**
2634  * Test calling closeNow() immediately after connecting.
2635  */
2636 TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
2637   TestServer server(true);
2638
2639   // connect()
2640   EventBase evb;
2641   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2642   socket->enableTFO();
2643
2644   ConnCallback ccb;
2645   socket->connect(&ccb, server.getAddress(), 30);
2646
2647   // write()
2648   std::array<char, 128> buf;
2649   memset(buf.data(), 'a', buf.size());
2650
2651   // close()
2652   socket->closeNow();
2653
2654   // Loop, although there shouldn't be anything to do.
2655   evb.loop();
2656
2657   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2658
2659   ASSERT_TRUE(socket->isClosedBySelf());
2660   ASSERT_FALSE(socket->isClosedByPeer());
2661 }
2662
2663 /**
2664  * Test calling close() immediately after connect()
2665  */
2666 TEST(AsyncSocketTest, ConnectAndCloseTFO) {
2667   TestServer server(true);
2668
2669   // Connect using a AsyncSocket
2670   EventBase evb;
2671   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2672   socket->enableTFO();
2673
2674   ConnCallback ccb;
2675   socket->connect(&ccb, server.getAddress(), 30);
2676
2677   socket->close();
2678
2679   // Loop, although there shouldn't be anything to do.
2680   evb.loop();
2681
2682   // Make sure the connection was aborted
2683   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2684
2685   ASSERT_TRUE(socket->isClosedBySelf());
2686   ASSERT_FALSE(socket->isClosedByPeer());
2687 }
2688
2689 class MockAsyncTFOSocket : public AsyncSocket {
2690  public:
2691   using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
2692
2693   explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
2694
2695   MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
2696 };
2697
2698 TEST(AsyncSocketTest, TestTFOUnsupported) {
2699   TestServer server(true);
2700
2701   // Connect using a AsyncSocket
2702   EventBase evb;
2703   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2704   socket->enableTFO();
2705
2706   ConnCallback ccb;
2707   socket->connect(&ccb, server.getAddress(), 30);
2708   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2709
2710   ReadCallback rcb;
2711   socket->setReadCB(&rcb);
2712
2713   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2714       .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2715   WriteCallback write;
2716   auto sendBuf = IOBuf::copyBuffer("hey");
2717   socket->writeChain(&write, sendBuf->clone());
2718   EXPECT_EQ(STATE_WAITING, write.state);
2719
2720   std::array<uint8_t, 128> buf;
2721   memset(buf.data(), 'a', buf.size());
2722
2723   std::array<uint8_t, 3> readBuf;
2724
2725   std::thread t([&] {
2726     std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2727     acceptedSocket->write(buf.data(), buf.size());
2728     acceptedSocket->flush();
2729     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2730     acceptedSocket->close();
2731   });
2732
2733   evb.loop();
2734
2735   t.join();
2736   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2737   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2738
2739   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2740   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2741   ASSERT_EQ(1, rcb.buffers.size());
2742   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2743   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2744   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2745 }
2746
2747 TEST(AsyncSocketTest, ConnectRefusedDelayedTFO) {
2748   EventBase evb;
2749
2750   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2751   socket->enableTFO();
2752
2753   // Hopefully this fails
2754   folly::SocketAddress fakeAddr("127.0.0.1", 65535);
2755   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2756       .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2757         sockaddr_storage addr;
2758         auto len = fakeAddr.getAddress(&addr);
2759         int ret = connect(fd, (const struct sockaddr*)&addr, len);
2760         LOG(INFO) << "connecting the socket " << fd << " : " << ret << " : "
2761                   << errno;
2762         return ret;
2763       }));
2764
2765   // Hopefully nothing is actually listening on this address
2766   ConnCallback cb;
2767   socket->connect(&cb, fakeAddr, 30);
2768
2769   WriteCallback write1;
2770   // Trigger the connect if TFO attempt is supported.
2771   socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2772
2773   if (socket->getTFOFinished()) {
2774     // This test is useless now.
2775     return;
2776   }
2777   WriteCallback write2;
2778   // Trigger the connect if TFO attempt is supported.
2779   socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2780   evb.loop();
2781
2782   EXPECT_EQ(STATE_FAILED, write1.state);
2783   EXPECT_EQ(STATE_FAILED, write2.state);
2784   EXPECT_FALSE(socket->getTFOSucceded());
2785
2786   EXPECT_EQ(STATE_SUCCEEDED, cb.state);
2787   EXPECT_LE(0, socket->getConnectTime().count());
2788   EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2789   EXPECT_TRUE(socket->getTFOAttempted());
2790 }
2791
2792 TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
2793   // Try connecting to server that won't respond.
2794   //
2795   // This depends somewhat on the network where this test is run.
2796   // Hopefully this IP will be routable but unresponsive.
2797   // (Alternatively, we could try listening on a local raw socket, but that
2798   // normally requires root privileges.)
2799   auto host = SocketAddressTestHelper::isIPv6Enabled()
2800       ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2801       : SocketAddressTestHelper::isIPv4Enabled()
2802           ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2803           : nullptr;
2804   SocketAddress addr(host, 65535);
2805
2806   // Connect using a AsyncSocket
2807   EventBase evb;
2808   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2809   socket->enableTFO();
2810
2811   ConnCallback ccb;
2812   // Set a very small timeout
2813   socket->connect(&ccb, addr, 1);
2814   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2815
2816   ReadCallback rcb;
2817   socket->setReadCB(&rcb);
2818
2819   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2820       .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2821   WriteCallback write;
2822   socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2823
2824   evb.loop();
2825
2826   EXPECT_EQ(STATE_FAILED, write.state);
2827 }
2828
2829 TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
2830   TestServer server(true);
2831
2832   // Connect using a AsyncSocket
2833   EventBase evb;
2834   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2835   socket->enableTFO();
2836
2837   ConnCallback ccb;
2838   socket->connect(&ccb, server.getAddress(), 30);
2839   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2840
2841   ReadCallback rcb;
2842   socket->setReadCB(&rcb);
2843
2844   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2845       .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2846         sockaddr_storage addr;
2847         auto len = server.getAddress().getAddress(&addr);
2848         return connect(fd, (const struct sockaddr*)&addr, len);
2849       }));
2850   WriteCallback write;
2851   auto sendBuf = IOBuf::copyBuffer("hey");
2852   socket->writeChain(&write, sendBuf->clone());
2853   EXPECT_EQ(STATE_WAITING, write.state);
2854
2855   std::array<uint8_t, 128> buf;
2856   memset(buf.data(), 'a', buf.size());
2857
2858   std::array<uint8_t, 3> readBuf;
2859
2860   std::thread t([&] {
2861     std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2862     acceptedSocket->write(buf.data(), buf.size());
2863     acceptedSocket->flush();
2864     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2865     acceptedSocket->close();
2866   });
2867
2868   evb.loop();
2869
2870   t.join();
2871   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2872
2873   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2874   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2875
2876   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2877   ASSERT_EQ(1, rcb.buffers.size());
2878   ASSERT_EQ(buf.size(), rcb.buffers[0].length);
2879   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2880 }
2881
2882 TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
2883   // Try connecting to server that won't respond.
2884   //
2885   // This depends somewhat on the network where this test is run.
2886   // Hopefully this IP will be routable but unresponsive.
2887   // (Alternatively, we could try listening on a local raw socket, but that
2888   // normally requires root privileges.)
2889   auto host = SocketAddressTestHelper::isIPv6Enabled()
2890       ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2891       : SocketAddressTestHelper::isIPv4Enabled()
2892           ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2893           : nullptr;
2894   SocketAddress addr(host, 65535);
2895
2896   // Connect using a AsyncSocket
2897   EventBase evb;
2898   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2899   socket->enableTFO();
2900
2901   ConnCallback ccb;
2902   // Set a very small timeout
2903   socket->connect(&ccb, addr, 1);
2904   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2905
2906   ReadCallback rcb;
2907   socket->setReadCB(&rcb);
2908
2909   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2910       .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2911         sockaddr_storage addr2;
2912         auto len = addr.getAddress(&addr2);
2913         return connect(fd, (const struct sockaddr*)&addr2, len);
2914       }));
2915   WriteCallback write;
2916   socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2917
2918   evb.loop();
2919
2920   EXPECT_EQ(STATE_FAILED, write.state);
2921 }
2922
2923 TEST(AsyncSocketTest, TestTFOEagain) {
2924   TestServer server(true);
2925
2926   // Connect using a AsyncSocket
2927   EventBase evb;
2928   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2929   socket->enableTFO();
2930
2931   ConnCallback ccb;
2932   socket->connect(&ccb, server.getAddress(), 30);
2933
2934   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2935       .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
2936   WriteCallback write;
2937   socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2938
2939   evb.loop();
2940
2941   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2942   EXPECT_EQ(STATE_FAILED, write.state);
2943 }
2944
2945 // Sending a large amount of data in the first write which will
2946 // definitely not fit into MSS.
2947 TEST(AsyncSocketTest, ConnectTFOWithBigData) {
2948   // Start listening on a local port
2949   TestServer server(true);
2950
2951   // Connect using a AsyncSocket
2952   EventBase evb;
2953   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2954   socket->enableTFO();
2955   ConnCallback cb;
2956   socket->connect(&cb, server.getAddress(), 30);
2957
2958   std::array<uint8_t, 128> buf;
2959   memset(buf.data(), 'a', buf.size());
2960
2961   constexpr size_t len = 10 * 1024;
2962   auto sendBuf = IOBuf::create(len);
2963   sendBuf->append(len);
2964   std::array<uint8_t, len> readBuf;
2965
2966   std::thread t([&] {
2967     auto acceptedSocket = server.accept();
2968     acceptedSocket->write(buf.data(), buf.size());
2969     acceptedSocket->flush();
2970     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2971     acceptedSocket->close();
2972   });
2973
2974   evb.loop();
2975
2976   ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2977   EXPECT_LE(0, socket->getConnectTime().count());
2978   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2979   EXPECT_TRUE(socket->getTFOAttempted());
2980
2981   // Should trigger the connect
2982   WriteCallback write;
2983   ReadCallback rcb;
2984   socket->writeChain(&write, sendBuf->clone());
2985   socket->setReadCB(&rcb);
2986   evb.loop();
2987
2988   t.join();
2989
2990   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2991   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2992   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2993   ASSERT_EQ(1, rcb.buffers.size());
2994   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2995   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2996   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2997 }
2998
2999 class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback {
3000  public:
3001   MOCK_METHOD1(evbAttached, void(AsyncSocket*));
3002   MOCK_METHOD1(evbDetached, void(AsyncSocket*));
3003 };
3004
3005 TEST(AsyncSocketTest, EvbCallbacks) {
3006   auto cb = folly::make_unique<MockEvbChangeCallback>();
3007   EventBase evb;
3008   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3009
3010   InSequence seq;
3011   EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
3012   EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
3013
3014   socket->setEvbChangedCallback(std::move(cb));
3015   socket->detachEventBase();
3016   socket->attachEventBase(&evb);
3017 }
3018
3019 #endif