13f23b75d81a6be5fab18944100bb2202f5d3ed4
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/ExceptionWrapper.h>
17 #include <folly/RWSpinLock.h>
18 #include <folly/Random.h>
19 #include <folly/SocketAddress.h>
20 #include <folly/io/async/AsyncServerSocket.h>
21 #include <folly/io/async/AsyncSocket.h>
22 #include <folly/io/async/AsyncTimeout.h>
23 #include <folly/io/async/EventBase.h>
24
25 #include <folly/experimental/TestUtil.h>
26 #include <folly/io/IOBuf.h>
27 #include <folly/io/async/test/AsyncSocketTest.h>
28 #include <folly/io/async/test/Util.h>
29 #include <folly/portability/GMock.h>
30 #include <folly/portability/GTest.h>
31 #include <folly/portability/Sockets.h>
32 #include <folly/portability/Unistd.h>
33 #include <folly/test/SocketAddressTestHelper.h>
34
35 #include <boost/scoped_array.hpp>
36 #include <fcntl.h>
37 #include <sys/types.h>
38 #include <iostream>
39 #include <thread>
40
41 using namespace boost;
42
43 using std::string;
44 using std::vector;
45 using std::min;
46 using std::cerr;
47 using std::endl;
48 using std::unique_ptr;
49 using std::chrono::milliseconds;
50 using boost::scoped_array;
51
52 using namespace folly;
53 using namespace testing;
54
55 namespace fsp = folly::portability::sockets;
56
57 class DelayedWrite: public AsyncTimeout {
58  public:
59   DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
60       unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
61       bool cork, bool lastWrite = false):
62     AsyncTimeout(socket->getEventBase()),
63     socket_(socket),
64     bufs_(std::move(bufs)),
65     wcb_(wcb),
66     cork_(cork),
67     lastWrite_(lastWrite) {}
68
69  private:
70   void timeoutExpired() noexcept override {
71     WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
72     socket_->writeChain(wcb_, std::move(bufs_), flags);
73     if (lastWrite_) {
74       socket_->shutdownWrite();
75     }
76   }
77
78   std::shared_ptr<AsyncSocket> socket_;
79   unique_ptr<IOBuf> bufs_;
80   AsyncTransportWrapper::WriteCallback* wcb_;
81   bool cork_;
82   bool lastWrite_;
83 };
84
85 ///////////////////////////////////////////////////////////////////////////
86 // connect() tests
87 ///////////////////////////////////////////////////////////////////////////
88
89 /**
90  * Test connecting to a server
91  */
92 TEST(AsyncSocketTest, Connect) {
93   // Start listening on a local port
94   TestServer server;
95
96   // Connect using a AsyncSocket
97   EventBase evb;
98   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
99   ConnCallback cb;
100   socket->connect(&cb, server.getAddress(), 30);
101
102   evb.loop();
103
104   ASSERT_EQ(cb.state, STATE_SUCCEEDED);
105   EXPECT_LE(0, socket->getConnectTime().count());
106   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
107 }
108
109 enum class TFOState {
110   DISABLED,
111   ENABLED,
112 };
113
114 class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
115
116 std::vector<TFOState> getTestingValues() {
117   std::vector<TFOState> vals;
118   vals.emplace_back(TFOState::DISABLED);
119
120 #if FOLLY_ALLOW_TFO
121   vals.emplace_back(TFOState::ENABLED);
122 #endif
123   return vals;
124 }
125
126 INSTANTIATE_TEST_CASE_P(
127     ConnectTests,
128     AsyncSocketConnectTest,
129     ::testing::ValuesIn(getTestingValues()));
130
131 /**
132  * Test connecting to a server that isn't listening
133  */
134 TEST(AsyncSocketTest, ConnectRefused) {
135   EventBase evb;
136
137   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
138
139   // Hopefully nothing is actually listening on this address
140   folly::SocketAddress addr("127.0.0.1", 65535);
141   ConnCallback cb;
142   socket->connect(&cb, addr, 30);
143
144   evb.loop();
145
146   EXPECT_EQ(STATE_FAILED, cb.state);
147   EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
148   EXPECT_LE(0, socket->getConnectTime().count());
149   EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
150 }
151
152 /**
153  * Test connection timeout
154  */
155 TEST(AsyncSocketTest, ConnectTimeout) {
156   EventBase evb;
157
158   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
159
160   // Try connecting to server that won't respond.
161   //
162   // This depends somewhat on the network where this test is run.
163   // Hopefully this IP will be routable but unresponsive.
164   // (Alternatively, we could try listening on a local raw socket, but that
165   // normally requires root privileges.)
166   auto host =
167       SocketAddressTestHelper::isIPv6Enabled() ?
168       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
169       SocketAddressTestHelper::isIPv4Enabled() ?
170       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
171       nullptr;
172   SocketAddress addr(host, 65535);
173   ConnCallback cb;
174   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
175
176   evb.loop();
177
178   ASSERT_EQ(cb.state, STATE_FAILED);
179   ASSERT_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
180
181   // Verify that we can still get the peer address after a timeout.
182   // Use case is if the client was created from a client pool, and we want
183   // to log which peer failed.
184   folly::SocketAddress peer;
185   socket->getPeerAddress(&peer);
186   ASSERT_EQ(peer, addr);
187   EXPECT_LE(0, socket->getConnectTime().count());
188   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
189 }
190
191 /**
192  * Test writing immediately after connecting, without waiting for connect
193  * to finish.
194  */
195 TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
196   TestServer server;
197
198   // connect()
199   EventBase evb;
200   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
201
202   if (GetParam() == TFOState::ENABLED) {
203     socket->enableTFO();
204   }
205
206   ConnCallback ccb;
207   socket->connect(&ccb, server.getAddress(), 30);
208
209   // write()
210   char buf[128];
211   memset(buf, 'a', sizeof(buf));
212   WriteCallback wcb;
213   socket->write(&wcb, buf, sizeof(buf));
214
215   // Loop.  We don't bother accepting on the server socket yet.
216   // The kernel should be able to buffer the write request so it can succeed.
217   evb.loop();
218
219   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
220   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
221
222   // Make sure the server got a connection and received the data
223   socket->close();
224   server.verifyConnection(buf, sizeof(buf));
225
226   ASSERT_TRUE(socket->isClosedBySelf());
227   ASSERT_FALSE(socket->isClosedByPeer());
228   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
229 }
230
231 /**
232  * Test connecting using a nullptr connect callback.
233  */
234 TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
235   TestServer server;
236
237   // connect()
238   EventBase evb;
239   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
240   if (GetParam() == TFOState::ENABLED) {
241     socket->enableTFO();
242   }
243
244   socket->connect(nullptr, server.getAddress(), 30);
245
246   // write some data, just so we have some way of verifing
247   // that the socket works correctly after connecting
248   char buf[128];
249   memset(buf, 'a', sizeof(buf));
250   WriteCallback wcb;
251   socket->write(&wcb, buf, sizeof(buf));
252
253   evb.loop();
254
255   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
256
257   // Make sure the server got a connection and received the data
258   socket->close();
259   server.verifyConnection(buf, sizeof(buf));
260
261   ASSERT_TRUE(socket->isClosedBySelf());
262   ASSERT_FALSE(socket->isClosedByPeer());
263 }
264
265 /**
266  * Test calling both write() and close() immediately after connecting, without
267  * waiting for connect to finish.
268  *
269  * This exercises the STATE_CONNECTING_CLOSING code.
270  */
271 TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
272   TestServer server;
273
274   // connect()
275   EventBase evb;
276   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
277   if (GetParam() == TFOState::ENABLED) {
278     socket->enableTFO();
279   }
280   ConnCallback ccb;
281   socket->connect(&ccb, server.getAddress(), 30);
282
283   // write()
284   char buf[128];
285   memset(buf, 'a', sizeof(buf));
286   WriteCallback wcb;
287   socket->write(&wcb, buf, sizeof(buf));
288
289   // close()
290   socket->close();
291
292   // Loop.  We don't bother accepting on the server socket yet.
293   // The kernel should be able to buffer the write request so it can succeed.
294   evb.loop();
295
296   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
297   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
298
299   // Make sure the server got a connection and received the data
300   server.verifyConnection(buf, sizeof(buf));
301
302   ASSERT_TRUE(socket->isClosedBySelf());
303   ASSERT_FALSE(socket->isClosedByPeer());
304 }
305
306 /**
307  * Test calling close() immediately after connect()
308  */
309 TEST(AsyncSocketTest, ConnectAndClose) {
310   TestServer server;
311
312   // Connect using a AsyncSocket
313   EventBase evb;
314   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
315   ConnCallback ccb;
316   socket->connect(&ccb, server.getAddress(), 30);
317
318   // Hopefully the connect didn't succeed immediately.
319   // If it did, we can't exercise the close-while-connecting code path.
320   if (ccb.state == STATE_SUCCEEDED) {
321     LOG(INFO) << "connect() succeeded immediately; aborting test "
322                        "of close-during-connect behavior";
323     return;
324   }
325
326   socket->close();
327
328   // Loop, although there shouldn't be anything to do.
329   evb.loop();
330
331   // Make sure the connection was aborted
332   ASSERT_EQ(ccb.state, STATE_FAILED);
333
334   ASSERT_TRUE(socket->isClosedBySelf());
335   ASSERT_FALSE(socket->isClosedByPeer());
336 }
337
338 /**
339  * Test calling closeNow() immediately after connect()
340  *
341  * This should be identical to the normal close behavior.
342  */
343 TEST(AsyncSocketTest, ConnectAndCloseNow) {
344   TestServer server;
345
346   // Connect using a AsyncSocket
347   EventBase evb;
348   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
349   ConnCallback ccb;
350   socket->connect(&ccb, server.getAddress(), 30);
351
352   // Hopefully the connect didn't succeed immediately.
353   // If it did, we can't exercise the close-while-connecting code path.
354   if (ccb.state == STATE_SUCCEEDED) {
355     LOG(INFO) << "connect() succeeded immediately; aborting test "
356                        "of closeNow()-during-connect behavior";
357     return;
358   }
359
360   socket->closeNow();
361
362   // Loop, although there shouldn't be anything to do.
363   evb.loop();
364
365   // Make sure the connection was aborted
366   ASSERT_EQ(ccb.state, STATE_FAILED);
367
368   ASSERT_TRUE(socket->isClosedBySelf());
369   ASSERT_FALSE(socket->isClosedByPeer());
370 }
371
372 /**
373  * Test calling both write() and closeNow() immediately after connecting,
374  * without waiting for connect to finish.
375  *
376  * This should abort the pending write.
377  */
378 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
379   TestServer server;
380
381   // connect()
382   EventBase evb;
383   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
384   ConnCallback ccb;
385   socket->connect(&ccb, server.getAddress(), 30);
386
387   // Hopefully the connect didn't succeed immediately.
388   // If it did, we can't exercise the close-while-connecting code path.
389   if (ccb.state == STATE_SUCCEEDED) {
390     LOG(INFO) << "connect() succeeded immediately; aborting test "
391                        "of write-during-connect behavior";
392     return;
393   }
394
395   // write()
396   char buf[128];
397   memset(buf, 'a', sizeof(buf));
398   WriteCallback wcb;
399   socket->write(&wcb, buf, sizeof(buf));
400
401   // close()
402   socket->closeNow();
403
404   // Loop, although there shouldn't be anything to do.
405   evb.loop();
406
407   ASSERT_EQ(ccb.state, STATE_FAILED);
408   ASSERT_EQ(wcb.state, STATE_FAILED);
409
410   ASSERT_TRUE(socket->isClosedBySelf());
411   ASSERT_FALSE(socket->isClosedByPeer());
412 }
413
414 /**
415  * Test installing a read callback immediately, before connect() finishes.
416  */
417 TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
418   TestServer server;
419
420   // connect()
421   EventBase evb;
422   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
423   if (GetParam() == TFOState::ENABLED) {
424     socket->enableTFO();
425   }
426
427   ConnCallback ccb;
428   socket->connect(&ccb, server.getAddress(), 30);
429
430   ReadCallback rcb;
431   socket->setReadCB(&rcb);
432
433   if (GetParam() == TFOState::ENABLED) {
434     // Trigger a connection
435     socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
436   }
437
438   // Even though we haven't looped yet, we should be able to accept
439   // the connection and send data to it.
440   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
441   uint8_t buf[128];
442   memset(buf, 'a', sizeof(buf));
443   acceptedSocket->write(buf, sizeof(buf));
444   acceptedSocket->flush();
445   acceptedSocket->close();
446
447   // Loop, although there shouldn't be anything to do.
448   evb.loop();
449
450   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
451   ASSERT_EQ(rcb.buffers.size(), 1);
452   ASSERT_EQ(rcb.buffers[0].length, sizeof(buf));
453   ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
454
455   ASSERT_FALSE(socket->isClosedBySelf());
456   ASSERT_FALSE(socket->isClosedByPeer());
457 }
458
459 /**
460  * Test installing a read callback and then closing immediately before the
461  * connect attempt finishes.
462  */
463 TEST(AsyncSocketTest, ConnectReadAndClose) {
464   TestServer server;
465
466   // connect()
467   EventBase evb;
468   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
469   ConnCallback ccb;
470   socket->connect(&ccb, server.getAddress(), 30);
471
472   // Hopefully the connect didn't succeed immediately.
473   // If it did, we can't exercise the close-while-connecting code path.
474   if (ccb.state == STATE_SUCCEEDED) {
475     LOG(INFO) << "connect() succeeded immediately; aborting test "
476                        "of read-during-connect behavior";
477     return;
478   }
479
480   ReadCallback rcb;
481   socket->setReadCB(&rcb);
482
483   // close()
484   socket->close();
485
486   // Loop, although there shouldn't be anything to do.
487   evb.loop();
488
489   ASSERT_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
490   ASSERT_EQ(rcb.buffers.size(), 0);
491   ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
492
493   ASSERT_TRUE(socket->isClosedBySelf());
494   ASSERT_FALSE(socket->isClosedByPeer());
495 }
496
497 /**
498  * Test both writing and installing a read callback immediately,
499  * before connect() finishes.
500  */
501 TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
502   TestServer server;
503
504   // connect()
505   EventBase evb;
506   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
507   if (GetParam() == TFOState::ENABLED) {
508     socket->enableTFO();
509   }
510   ConnCallback ccb;
511   socket->connect(&ccb, server.getAddress(), 30);
512
513   // write()
514   char buf1[128];
515   memset(buf1, 'a', sizeof(buf1));
516   WriteCallback wcb;
517   socket->write(&wcb, buf1, sizeof(buf1));
518
519   // set a read callback
520   ReadCallback rcb;
521   socket->setReadCB(&rcb);
522
523   // Even though we haven't looped yet, we should be able to accept
524   // the connection and send data to it.
525   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
526   uint8_t buf2[128];
527   memset(buf2, 'b', sizeof(buf2));
528   acceptedSocket->write(buf2, sizeof(buf2));
529   acceptedSocket->flush();
530
531   // shut down the write half of acceptedSocket, so that the AsyncSocket
532   // will stop reading and we can break out of the event loop.
533   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
534
535   // Loop
536   evb.loop();
537
538   // Make sure the connect succeeded
539   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
540
541   // Make sure the AsyncSocket read the data written by the accepted socket
542   ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
543   ASSERT_EQ(rcb.buffers.size(), 1);
544   ASSERT_EQ(rcb.buffers[0].length, sizeof(buf2));
545   ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
546
547   // Close the AsyncSocket so we'll see EOF on acceptedSocket
548   socket->close();
549
550   // Make sure the accepted socket saw the data written by the AsyncSocket
551   uint8_t readbuf[sizeof(buf1)];
552   acceptedSocket->readAll(readbuf, sizeof(readbuf));
553   ASSERT_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
554   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
555   ASSERT_EQ(bytesRead, 0);
556
557   ASSERT_FALSE(socket->isClosedBySelf());
558   ASSERT_TRUE(socket->isClosedByPeer());
559 }
560
561 /**
562  * Test writing to the socket then shutting down writes before the connect
563  * attempt finishes.
564  */
565 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
566   TestServer server;
567
568   // connect()
569   EventBase evb;
570   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
571   ConnCallback ccb;
572   socket->connect(&ccb, server.getAddress(), 30);
573
574   // Hopefully the connect didn't succeed immediately.
575   // If it did, we can't exercise the write-while-connecting code path.
576   if (ccb.state == STATE_SUCCEEDED) {
577     LOG(INFO) << "connect() succeeded immediately; skipping test";
578     return;
579   }
580
581   // Ask to write some data
582   char wbuf[128];
583   memset(wbuf, 'a', sizeof(wbuf));
584   WriteCallback wcb;
585   socket->write(&wcb, wbuf, sizeof(wbuf));
586   socket->shutdownWrite();
587
588   // Shutdown writes
589   socket->shutdownWrite();
590
591   // Even though we haven't looped yet, we should be able to accept
592   // the connection.
593   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
594
595   // Since the connection is still in progress, there should be no data to
596   // read yet.  Verify that the accepted socket is not readable.
597   struct pollfd fds[1];
598   fds[0].fd = acceptedSocket->getSocketFD();
599   fds[0].events = POLLIN;
600   fds[0].revents = 0;
601   int rc = poll(fds, 1, 0);
602   ASSERT_EQ(rc, 0);
603
604   // Write data to the accepted socket
605   uint8_t acceptedWbuf[192];
606   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
607   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
608   acceptedSocket->flush();
609
610   // Loop
611   evb.loop();
612
613   // The loop should have completed the connection, written the queued data,
614   // and shutdown writes on the socket.
615   //
616   // Check that the connection was completed successfully and that the write
617   // callback succeeded.
618   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
619   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
620
621   // Check that we can read the data that was written to the socket, and that
622   // we see an EOF, since its socket was half-shutdown.
623   uint8_t readbuf[sizeof(wbuf)];
624   acceptedSocket->readAll(readbuf, sizeof(readbuf));
625   ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
626   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
627   ASSERT_EQ(bytesRead, 0);
628
629   // Close the accepted socket.  This will cause it to see EOF
630   // and uninstall the read callback when we loop next.
631   acceptedSocket->close();
632
633   // Install a read callback, then loop again.
634   ReadCallback rcb;
635   socket->setReadCB(&rcb);
636   evb.loop();
637
638   // This loop should have read the data and seen the EOF
639   ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
640   ASSERT_EQ(rcb.buffers.size(), 1);
641   ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
642   ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
643                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
644
645   ASSERT_FALSE(socket->isClosedBySelf());
646   ASSERT_FALSE(socket->isClosedByPeer());
647 }
648
649 /**
650  * Test reading, writing, and shutting down writes before the connect attempt
651  * finishes.
652  */
653 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
654   TestServer server;
655
656   // connect()
657   EventBase evb;
658   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
659   ConnCallback ccb;
660   socket->connect(&ccb, server.getAddress(), 30);
661
662   // Hopefully the connect didn't succeed immediately.
663   // If it did, we can't exercise the write-while-connecting code path.
664   if (ccb.state == STATE_SUCCEEDED) {
665     LOG(INFO) << "connect() succeeded immediately; skipping test";
666     return;
667   }
668
669   // Install a read callback
670   ReadCallback rcb;
671   socket->setReadCB(&rcb);
672
673   // Ask to write some data
674   char wbuf[128];
675   memset(wbuf, 'a', sizeof(wbuf));
676   WriteCallback wcb;
677   socket->write(&wcb, wbuf, sizeof(wbuf));
678
679   // Shutdown writes
680   socket->shutdownWrite();
681
682   // Even though we haven't looped yet, we should be able to accept
683   // the connection.
684   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
685
686   // Since the connection is still in progress, there should be no data to
687   // read yet.  Verify that the accepted socket is not readable.
688   struct pollfd fds[1];
689   fds[0].fd = acceptedSocket->getSocketFD();
690   fds[0].events = POLLIN;
691   fds[0].revents = 0;
692   int rc = poll(fds, 1, 0);
693   ASSERT_EQ(rc, 0);
694
695   // Write data to the accepted socket
696   uint8_t acceptedWbuf[192];
697   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
698   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
699   acceptedSocket->flush();
700   // Shutdown writes to the accepted socket.  This will cause it to see EOF
701   // and uninstall the read callback.
702   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
703
704   // Loop
705   evb.loop();
706
707   // The loop should have completed the connection, written the queued data,
708   // shutdown writes on the socket, read the data we wrote to it, and see the
709   // EOF.
710   //
711   // Check that the connection was completed successfully and that the read
712   // and write callbacks were invoked as expected.
713   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
714   ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
715   ASSERT_EQ(rcb.buffers.size(), 1);
716   ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
717   ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
718                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
719   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
720
721   // Check that we can read the data that was written to the socket, and that
722   // we see an EOF, since its socket was half-shutdown.
723   uint8_t readbuf[sizeof(wbuf)];
724   acceptedSocket->readAll(readbuf, sizeof(readbuf));
725   ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
726   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
727   ASSERT_EQ(bytesRead, 0);
728
729   // Fully close both sockets
730   acceptedSocket->close();
731   socket->close();
732
733   ASSERT_FALSE(socket->isClosedBySelf());
734   ASSERT_TRUE(socket->isClosedByPeer());
735 }
736
737 /**
738  * Test reading, writing, and calling shutdownWriteNow() before the
739  * connect attempt finishes.
740  */
741 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
742   TestServer server;
743
744   // connect()
745   EventBase evb;
746   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
747   ConnCallback ccb;
748   socket->connect(&ccb, server.getAddress(), 30);
749
750   // Hopefully the connect didn't succeed immediately.
751   // If it did, we can't exercise the write-while-connecting code path.
752   if (ccb.state == STATE_SUCCEEDED) {
753     LOG(INFO) << "connect() succeeded immediately; skipping test";
754     return;
755   }
756
757   // Install a read callback
758   ReadCallback rcb;
759   socket->setReadCB(&rcb);
760
761   // Ask to write some data
762   char wbuf[128];
763   memset(wbuf, 'a', sizeof(wbuf));
764   WriteCallback wcb;
765   socket->write(&wcb, wbuf, sizeof(wbuf));
766
767   // Shutdown writes immediately.
768   // This should immediately discard the data that we just tried to write.
769   socket->shutdownWriteNow();
770
771   // Verify that writeError() was invoked on the write callback.
772   ASSERT_EQ(wcb.state, STATE_FAILED);
773   ASSERT_EQ(wcb.bytesWritten, 0);
774
775   // Even though we haven't looped yet, we should be able to accept
776   // the connection.
777   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
778
779   // Since the connection is still in progress, there should be no data to
780   // read yet.  Verify that the accepted socket is not readable.
781   struct pollfd fds[1];
782   fds[0].fd = acceptedSocket->getSocketFD();
783   fds[0].events = POLLIN;
784   fds[0].revents = 0;
785   int rc = poll(fds, 1, 0);
786   ASSERT_EQ(rc, 0);
787
788   // Write data to the accepted socket
789   uint8_t acceptedWbuf[192];
790   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
791   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
792   acceptedSocket->flush();
793   // Shutdown writes to the accepted socket.  This will cause it to see EOF
794   // and uninstall the read callback.
795   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
796
797   // Loop
798   evb.loop();
799
800   // The loop should have completed the connection, written the queued data,
801   // shutdown writes on the socket, read the data we wrote to it, and see the
802   // EOF.
803   //
804   // Check that the connection was completed successfully and that the read
805   // callback was invoked as expected.
806   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
807   ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
808   ASSERT_EQ(rcb.buffers.size(), 1);
809   ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
810   ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
811                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
812
813   // Since we used shutdownWriteNow(), it should have discarded all pending
814   // write data.  Verify we see an immediate EOF when reading from the accepted
815   // socket.
816   uint8_t readbuf[sizeof(wbuf)];
817   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
818   ASSERT_EQ(bytesRead, 0);
819
820   // Fully close both sockets
821   acceptedSocket->close();
822   socket->close();
823
824   ASSERT_FALSE(socket->isClosedBySelf());
825   ASSERT_TRUE(socket->isClosedByPeer());
826 }
827
828 // Helper function for use in testConnectOptWrite()
829 // Temporarily disable the read callback
830 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
831   // Uninstall the read callback
832   socket->setReadCB(nullptr);
833   // Schedule the read callback to be reinstalled after 1ms
834   socket->getEventBase()->runInLoop(
835       std::bind(&AsyncSocket::setReadCB, socket, rcb));
836 }
837
838 /**
839  * Test connect+write, then have the connect callback perform another write.
840  *
841  * This tests interaction of the optimistic writing after connect with
842  * additional write attempts that occur in the connect callback.
843  */
844 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
845   TestServer server;
846   EventBase evb;
847   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
848
849   // connect()
850   ConnCallback ccb;
851   socket->connect(&ccb, server.getAddress(), 30);
852
853   // Hopefully the connect didn't succeed immediately.
854   // If it did, we can't exercise the optimistic write code path.
855   if (ccb.state == STATE_SUCCEEDED) {
856     LOG(INFO) << "connect() succeeded immediately; aborting test "
857                        "of optimistic write behavior";
858     return;
859   }
860
861   // Tell the connect callback to perform a write when the connect succeeds
862   WriteCallback wcb2;
863   scoped_array<char> buf2(new char[size2]);
864   memset(buf2.get(), 'b', size2);
865   if (size2 > 0) {
866     ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
867     // Tell the second write callback to close the connection when it is done
868     wcb2.successCallback = [&] { socket->closeNow(); };
869   }
870
871   // Schedule one write() immediately, before the connect finishes
872   scoped_array<char> buf1(new char[size1]);
873   memset(buf1.get(), 'a', size1);
874   WriteCallback wcb1;
875   if (size1 > 0) {
876     socket->write(&wcb1, buf1.get(), size1);
877   }
878
879   if (close) {
880     // immediately perform a close, before connect() completes
881     socket->close();
882   }
883
884   // Start reading from the other endpoint after 10ms.
885   // If we're using large buffers, we have to read so that the writes don't
886   // block forever.
887   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
888   ReadCallback rcb;
889   rcb.dataAvailableCallback = std::bind(tmpDisableReads,
890                                         acceptedSocket.get(), &rcb);
891   socket->getEventBase()->tryRunAfterDelay(
892       std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
893       10);
894
895   // Loop.  We don't bother accepting on the server socket yet.
896   // The kernel should be able to buffer the write request so it can succeed.
897   evb.loop();
898
899   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
900   if (size1 > 0) {
901     ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
902   }
903   if (size2 > 0) {
904     ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
905   }
906
907   socket->close();
908
909   // Make sure the read callback received all of the data
910   size_t bytesRead = 0;
911   for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
912        it != rcb.buffers.end();
913        ++it) {
914     size_t start = bytesRead;
915     bytesRead += it->length;
916     size_t end = bytesRead;
917     if (start < size1) {
918       size_t cmpLen = min(size1, end) - start;
919       ASSERT_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
920     }
921     if (end > size1 && end <= size1 + size2) {
922       size_t itOffset;
923       size_t buf2Offset;
924       size_t cmpLen;
925       if (start >= size1) {
926         itOffset = 0;
927         buf2Offset = start - size1;
928         cmpLen = end - start;
929       } else {
930         itOffset = size1 - start;
931         buf2Offset = 0;
932         cmpLen = end - size1;
933       }
934       ASSERT_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
935                                cmpLen),
936                         0);
937     }
938   }
939   ASSERT_EQ(bytesRead, size1 + size2);
940 }
941
942 TEST(AsyncSocketTest, ConnectCallbackWrite) {
943   // Test using small writes that should both succeed immediately
944   testConnectOptWrite(100, 200);
945
946   // Test using a large buffer in the connect callback, that should block
947   const size_t largeSize = 32 * 1024 * 1024;
948   testConnectOptWrite(100, largeSize);
949
950   // Test using a large initial write
951   testConnectOptWrite(largeSize, 100);
952
953   // Test using two large buffers
954   testConnectOptWrite(largeSize, largeSize);
955
956   // Test a small write in the connect callback,
957   // but no immediate write before connect completes
958   testConnectOptWrite(0, 64);
959
960   // Test a large write in the connect callback,
961   // but no immediate write before connect completes
962   testConnectOptWrite(0, largeSize);
963
964   // Test connect, a small write, then immediately call close() before connect
965   // completes
966   testConnectOptWrite(211, 0, true);
967
968   // Test connect, a large immediate write (that will block), then immediately
969   // call close() before connect completes
970   testConnectOptWrite(largeSize, 0, true);
971 }
972
973 ///////////////////////////////////////////////////////////////////////////
974 // write() related tests
975 ///////////////////////////////////////////////////////////////////////////
976
977 /**
978  * Test writing using a nullptr callback
979  */
980 TEST(AsyncSocketTest, WriteNullCallback) {
981   TestServer server;
982
983   // connect()
984   EventBase evb;
985   std::shared_ptr<AsyncSocket> socket =
986     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
987   evb.loop(); // loop until the socket is connected
988
989   // write() with a nullptr callback
990   char buf[128];
991   memset(buf, 'a', sizeof(buf));
992   socket->write(nullptr, buf, sizeof(buf));
993
994   evb.loop(); // loop until the data is sent
995
996   // Make sure the server got a connection and received the data
997   socket->close();
998   server.verifyConnection(buf, sizeof(buf));
999
1000   ASSERT_TRUE(socket->isClosedBySelf());
1001   ASSERT_FALSE(socket->isClosedByPeer());
1002 }
1003
1004 /**
1005  * Test writing with a send timeout
1006  */
1007 TEST(AsyncSocketTest, WriteTimeout) {
1008   TestServer server;
1009
1010   // connect()
1011   EventBase evb;
1012   std::shared_ptr<AsyncSocket> socket =
1013     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1014   evb.loop(); // loop until the socket is connected
1015
1016   // write() a large chunk of data, with no-one on the other end reading.
1017   // Tricky: the kernel caches the connection metrics for recently-used
1018   // routes (see tcp_no_metrics_save) so a freshly opened connection can
1019   // have a send buffer size bigger than wmem_default.  This makes the test
1020   // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
1021   size_t writeLength = 32 * 1024 * 1024;
1022   uint32_t timeout = 200;
1023   socket->setSendTimeout(timeout);
1024   scoped_array<char> buf(new char[writeLength]);
1025   memset(buf.get(), 'a', writeLength);
1026   WriteCallback wcb;
1027   socket->write(&wcb, buf.get(), writeLength);
1028
1029   TimePoint start;
1030   evb.loop();
1031   TimePoint end;
1032
1033   // Make sure the write attempt timed out as requested
1034   ASSERT_EQ(wcb.state, STATE_FAILED);
1035   ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
1036
1037   // Check that the write timed out within a reasonable period of time.
1038   // We don't check for exactly the specified timeout, since AsyncSocket only
1039   // times out when it hasn't made progress for that period of time.
1040   //
1041   // On linux, the first write sends a few hundred kb of data, then blocks for
1042   // writability, and then unblocks again after 40ms and is able to write
1043   // another smaller of data before blocking permanently.  Therefore it doesn't
1044   // time out until 40ms + timeout.
1045   //
1046   // I haven't fully verified the cause of this, but I believe it probably
1047   // occurs because the receiving end delays sending an ack for up to 40ms.
1048   // (This is the default value for TCP_DELACK_MIN.)  Once the sender receives
1049   // the ack, it can send some more data.  However, after that point the
1050   // receiver's kernel buffer is full.  This 40ms delay happens even with
1051   // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints.  However, the
1052   // kernel may be automatically disabling TCP_QUICKACK after receiving some
1053   // data.
1054   //
1055   // For now, we simply check that the timeout occurred within 160ms of
1056   // the requested value.
1057   T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1058 }
1059
1060 /**
1061  * Test writing to a socket that the remote endpoint has closed
1062  */
1063 TEST(AsyncSocketTest, WritePipeError) {
1064   TestServer server;
1065
1066   // connect()
1067   EventBase evb;
1068   std::shared_ptr<AsyncSocket> socket =
1069     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1070   socket->setSendTimeout(1000);
1071   evb.loop(); // loop until the socket is connected
1072
1073   // accept and immediately close the socket
1074   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1075   acceptedSocket->close();
1076
1077   // write() a large chunk of data
1078   size_t writeLength = 32 * 1024 * 1024;
1079   scoped_array<char> buf(new char[writeLength]);
1080   memset(buf.get(), 'a', writeLength);
1081   WriteCallback wcb;
1082   socket->write(&wcb, buf.get(), writeLength);
1083
1084   evb.loop();
1085
1086   // Make sure the write failed.
1087   // It would be nice if AsyncSocketException could convey the errno value,
1088   // so that we could check for EPIPE
1089   ASSERT_EQ(wcb.state, STATE_FAILED);
1090   ASSERT_EQ(wcb.exception.getType(),
1091                     AsyncSocketException::INTERNAL_ERROR);
1092
1093   ASSERT_FALSE(socket->isClosedBySelf());
1094   ASSERT_FALSE(socket->isClosedByPeer());
1095 }
1096
1097 /**
1098  * Test that bytes written is correctly computed in case of write failure
1099  */
1100 TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) {
1101   // Send and receive buffer sizes for the sockets.
1102   const int sockBufSize = 8 * 1024;
1103
1104   TestServer server(false, sockBufSize);
1105
1106   AsyncSocket::OptionMap options{
1107       {{SOL_SOCKET, SO_SNDBUF}, sockBufSize},
1108       {{SOL_SOCKET, SO_RCVBUF}, sockBufSize},
1109       {{IPPROTO_TCP, TCP_NODELAY}, 1},
1110   };
1111
1112   // The current thread will be used by the receiver - use a separate thread
1113   // for the sender.
1114   EventBase senderEvb;
1115   std::thread senderThread([&]() { senderEvb.loopForever(); });
1116
1117   ConnCallback ccb;
1118   std::shared_ptr<AsyncSocket> socket;
1119
1120   senderEvb.runInEventBaseThreadAndWait([&]() {
1121     socket = AsyncSocket::newSocket(&senderEvb);
1122     socket->connect(&ccb, server.getAddress(), 30, options);
1123   });
1124
1125   // accept the socket on the server side
1126   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1127
1128   // Send a big (45KB) write so that it is partially written. The first write
1129   // is 16KB (8KB on both sides) and subsequent writes are 8KB each. Reading
1130   // just under 24KB would cause 3-4 writes for the total of 32-40KB in the
1131   // following sequence: 16KB + 8KB + 8KB (+ 8KB). This ensures that not all
1132   // bytes are written when the socket is reset. Having at least 3 writes
1133   // ensures that the total size (45KB) would be exceeed in case of overcounting
1134   // based on the initial write size of 16KB.
1135   constexpr size_t sendSize = 45 * 1024;
1136   auto const sendBuf = std::vector<char>(sendSize, 'a');
1137
1138   WriteCallback wcb;
1139
1140   senderEvb.runInEventBaseThreadAndWait(
1141       [&]() { socket->write(&wcb, sendBuf.data(), sendSize); });
1142
1143   // Reading 20KB would cause three additional writes of 8KB, but less
1144   // than 45KB total, so the socket is reset before all bytes are written.
1145   constexpr size_t recvSize = 20 * 1024;
1146   uint8_t recvBuf[recvSize];
1147   int bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf));
1148
1149   acceptedSocket->closeWithReset();
1150
1151   senderEvb.terminateLoopSoon();
1152   senderThread.join();
1153
1154   LOG(INFO) << "Bytes written: " << wcb.bytesWritten;
1155
1156   ASSERT_EQ(STATE_FAILED, wcb.state);
1157   ASSERT_GE(wcb.bytesWritten, bytesRead);
1158   ASSERT_LE(wcb.bytesWritten, sendSize);
1159   ASSERT_EQ(recvSize, bytesRead);
1160   ASSERT(32 * 1024 == wcb.bytesWritten || 40 * 1024 == wcb.bytesWritten);
1161 }
1162
1163 /**
1164  * Test writing a mix of simple buffers and IOBufs
1165  */
1166 TEST(AsyncSocketTest, WriteIOBuf) {
1167   TestServer server;
1168
1169   // connect()
1170   EventBase evb;
1171   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1172   ConnCallback ccb;
1173   socket->connect(&ccb, server.getAddress(), 30);
1174
1175   // Accept the connection
1176   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1177   ReadCallback rcb;
1178   acceptedSocket->setReadCB(&rcb);
1179
1180   // Check if EOR tracking flag can be set and reset.
1181   EXPECT_FALSE(socket->isEorTrackingEnabled());
1182   socket->setEorTracking(true);
1183   EXPECT_TRUE(socket->isEorTrackingEnabled());
1184   socket->setEorTracking(false);
1185   EXPECT_FALSE(socket->isEorTrackingEnabled());
1186
1187   // Write a simple buffer to the socket
1188   constexpr size_t simpleBufLength = 5;
1189   char simpleBuf[simpleBufLength];
1190   memset(simpleBuf, 'a', simpleBufLength);
1191   WriteCallback wcb;
1192   socket->write(&wcb, simpleBuf, simpleBufLength);
1193
1194   // Write a single-element IOBuf chain
1195   size_t buf1Length = 7;
1196   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1197   memset(buf1->writableData(), 'b', buf1Length);
1198   buf1->append(buf1Length);
1199   unique_ptr<IOBuf> buf1Copy(buf1->clone());
1200   WriteCallback wcb2;
1201   socket->writeChain(&wcb2, std::move(buf1));
1202
1203   // Write a multiple-element IOBuf chain
1204   size_t buf2Length = 11;
1205   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1206   memset(buf2->writableData(), 'c', buf2Length);
1207   buf2->append(buf2Length);
1208   size_t buf3Length = 13;
1209   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1210   memset(buf3->writableData(), 'd', buf3Length);
1211   buf3->append(buf3Length);
1212   buf2->appendChain(std::move(buf3));
1213   unique_ptr<IOBuf> buf2Copy(buf2->clone());
1214   buf2Copy->coalesce();
1215   WriteCallback wcb3;
1216   socket->writeChain(&wcb3, std::move(buf2));
1217   socket->shutdownWrite();
1218
1219   // Let the reads and writes run to completion
1220   evb.loop();
1221
1222   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1223   ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1224   ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1225
1226   // Make sure the reader got the right data in the right order
1227   ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
1228   ASSERT_EQ(rcb.buffers.size(), 1);
1229   ASSERT_EQ(rcb.buffers[0].length,
1230       simpleBufLength + buf1Length + buf2Length + buf3Length);
1231   ASSERT_EQ(
1232       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1233   ASSERT_EQ(
1234       memcmp(rcb.buffers[0].buffer + simpleBufLength,
1235           buf1Copy->data(), buf1Copy->length()), 0);
1236   ASSERT_EQ(
1237       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1238           buf2Copy->data(), buf2Copy->length()), 0);
1239
1240   acceptedSocket->close();
1241   socket->close();
1242
1243   ASSERT_TRUE(socket->isClosedBySelf());
1244   ASSERT_FALSE(socket->isClosedByPeer());
1245 }
1246
1247 TEST(AsyncSocketTest, WriteIOBufCorked) {
1248   TestServer server;
1249
1250   // connect()
1251   EventBase evb;
1252   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1253   ConnCallback ccb;
1254   socket->connect(&ccb, server.getAddress(), 30);
1255
1256   // Accept the connection
1257   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1258   ReadCallback rcb;
1259   acceptedSocket->setReadCB(&rcb);
1260
1261   // Do three writes, 100ms apart, with the "cork" flag set
1262   // on the second write.  The reader should see the first write
1263   // arrive by itself, followed by the second and third writes
1264   // arriving together.
1265   size_t buf1Length = 5;
1266   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1267   memset(buf1->writableData(), 'a', buf1Length);
1268   buf1->append(buf1Length);
1269   size_t buf2Length = 7;
1270   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1271   memset(buf2->writableData(), 'b', buf2Length);
1272   buf2->append(buf2Length);
1273   size_t buf3Length = 11;
1274   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1275   memset(buf3->writableData(), 'c', buf3Length);
1276   buf3->append(buf3Length);
1277   WriteCallback wcb1;
1278   socket->writeChain(&wcb1, std::move(buf1));
1279   WriteCallback wcb2;
1280   DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1281   write2.scheduleTimeout(100);
1282   WriteCallback wcb3;
1283   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1284   write3.scheduleTimeout(140);
1285
1286   evb.loop();
1287   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
1288   ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1289   ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1290   if (wcb3.state != STATE_SUCCEEDED) {
1291     throw(wcb3.exception);
1292   }
1293   ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1294
1295   // Make sure the reader got the data with the right grouping
1296   ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
1297   ASSERT_EQ(rcb.buffers.size(), 2);
1298   ASSERT_EQ(rcb.buffers[0].length, buf1Length);
1299   ASSERT_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1300
1301   acceptedSocket->close();
1302   socket->close();
1303
1304   ASSERT_TRUE(socket->isClosedBySelf());
1305   ASSERT_FALSE(socket->isClosedByPeer());
1306 }
1307
1308 /**
1309  * Test performing a zero-length write
1310  */
1311 TEST(AsyncSocketTest, ZeroLengthWrite) {
1312   TestServer server;
1313
1314   // connect()
1315   EventBase evb;
1316   std::shared_ptr<AsyncSocket> socket =
1317     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1318   evb.loop(); // loop until the socket is connected
1319
1320   auto acceptedSocket = server.acceptAsync(&evb);
1321   ReadCallback rcb;
1322   acceptedSocket->setReadCB(&rcb);
1323
1324   size_t len1 = 1024*1024;
1325   size_t len2 = 1024*1024;
1326   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1327   memset(buf.get(), 'a', len1);
1328   memset(buf.get(), 'b', len2);
1329
1330   WriteCallback wcb1;
1331   WriteCallback wcb2;
1332   WriteCallback wcb3;
1333   WriteCallback wcb4;
1334   socket->write(&wcb1, buf.get(), 0);
1335   socket->write(&wcb2, buf.get(), len1);
1336   socket->write(&wcb3, buf.get() + len1, 0);
1337   socket->write(&wcb4, buf.get() + len1, len2);
1338   socket->close();
1339
1340   evb.loop(); // loop until the data is sent
1341
1342   ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1343   ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1344   ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1345   ASSERT_EQ(wcb4.state, STATE_SUCCEEDED);
1346   rcb.verifyData(buf.get(), len1 + len2);
1347
1348   ASSERT_TRUE(socket->isClosedBySelf());
1349   ASSERT_FALSE(socket->isClosedByPeer());
1350 }
1351
1352 TEST(AsyncSocketTest, ZeroLengthWritev) {
1353   TestServer server;
1354
1355   // connect()
1356   EventBase evb;
1357   std::shared_ptr<AsyncSocket> socket =
1358     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1359   evb.loop(); // loop until the socket is connected
1360
1361   auto acceptedSocket = server.acceptAsync(&evb);
1362   ReadCallback rcb;
1363   acceptedSocket->setReadCB(&rcb);
1364
1365   size_t len1 = 1024*1024;
1366   size_t len2 = 1024*1024;
1367   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1368   memset(buf.get(), 'a', len1);
1369   memset(buf.get(), 'b', len2);
1370
1371   WriteCallback wcb;
1372   constexpr size_t iovCount = 4;
1373   struct iovec iov[iovCount];
1374   iov[0].iov_base = buf.get();
1375   iov[0].iov_len = len1;
1376   iov[1].iov_base = buf.get() + len1;
1377   iov[1].iov_len = 0;
1378   iov[2].iov_base = buf.get() + len1;
1379   iov[2].iov_len = len2;
1380   iov[3].iov_base = buf.get() + len1 + len2;
1381   iov[3].iov_len = 0;
1382
1383   socket->writev(&wcb, iov, iovCount);
1384   socket->close();
1385   evb.loop(); // loop until the data is sent
1386
1387   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1388   rcb.verifyData(buf.get(), len1 + len2);
1389
1390   ASSERT_TRUE(socket->isClosedBySelf());
1391   ASSERT_FALSE(socket->isClosedByPeer());
1392 }
1393
1394 ///////////////////////////////////////////////////////////////////////////
1395 // close() related tests
1396 ///////////////////////////////////////////////////////////////////////////
1397
1398 /**
1399  * Test calling close() with pending writes when the socket is already closing.
1400  */
1401 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1402   TestServer server;
1403
1404   // connect()
1405   EventBase evb;
1406   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1407   ConnCallback ccb;
1408   socket->connect(&ccb, server.getAddress(), 30);
1409
1410   // accept the socket on the server side
1411   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1412
1413   // Loop to ensure the connect has completed
1414   evb.loop();
1415
1416   // Make sure we are connected
1417   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
1418
1419   // Schedule pending writes, until several write attempts have blocked
1420   char buf[128];
1421   memset(buf, 'a', sizeof(buf));
1422   typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1423   WriteCallbackVector writeCallbacks;
1424
1425   writeCallbacks.reserve(5);
1426   while (writeCallbacks.size() < 5) {
1427     std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1428
1429     socket->write(wcb.get(), buf, sizeof(buf));
1430     if (wcb->state == STATE_SUCCEEDED) {
1431       // Succeeded immediately.  Keep performing more writes
1432       continue;
1433     }
1434
1435     // This write is blocked.
1436     // Have the write callback call close() when writeError() is invoked
1437     wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1438     writeCallbacks.push_back(wcb);
1439   }
1440
1441   // Call closeNow() to immediately fail the pending writes
1442   socket->closeNow();
1443
1444   // Make sure writeError() was invoked on all of the pending write callbacks
1445   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1446        it != writeCallbacks.end();
1447        ++it) {
1448     ASSERT_EQ((*it)->state, STATE_FAILED);
1449   }
1450
1451   ASSERT_TRUE(socket->isClosedBySelf());
1452   ASSERT_FALSE(socket->isClosedByPeer());
1453 }
1454
1455 ///////////////////////////////////////////////////////////////////////////
1456 // ImmediateRead related tests
1457 ///////////////////////////////////////////////////////////////////////////
1458
1459 /* AsyncSocket use to verify immediate read works */
1460 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1461  public:
1462   bool immediateReadCalled = false;
1463   explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1464  protected:
1465   void checkForImmediateRead() noexcept override {
1466     immediateReadCalled = true;
1467     AsyncSocket::handleRead();
1468   }
1469 };
1470
1471 TEST(AsyncSocket, ConnectReadImmediateRead) {
1472   TestServer server;
1473
1474   const size_t maxBufferSz = 100;
1475   const size_t maxReadsPerEvent = 1;
1476   const size_t expectedDataSz = maxBufferSz * 3;
1477   char expectedData[expectedDataSz];
1478   memset(expectedData, 'j', expectedDataSz);
1479
1480   EventBase evb;
1481   ReadCallback rcb(maxBufferSz);
1482   AsyncSocketImmediateRead socket(&evb);
1483   socket.connect(nullptr, server.getAddress(), 30);
1484
1485   evb.loop(); // loop until the socket is connected
1486
1487   socket.setReadCB(&rcb);
1488   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1489   socket.immediateReadCalled = false;
1490
1491   auto acceptedSocket = server.acceptAsync(&evb);
1492
1493   ReadCallback rcbServer;
1494   WriteCallback wcbServer;
1495   rcbServer.dataAvailableCallback = [&]() {
1496     if (rcbServer.dataRead() == expectedDataSz) {
1497       // write back all data read
1498       rcbServer.verifyData(expectedData, expectedDataSz);
1499       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1500       acceptedSocket->close();
1501     }
1502   };
1503   acceptedSocket->setReadCB(&rcbServer);
1504
1505   // write data
1506   WriteCallback wcb1;
1507   socket.write(&wcb1, expectedData, expectedDataSz);
1508   evb.loop();
1509   ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1510   rcb.verifyData(expectedData, expectedDataSz);
1511   ASSERT_EQ(socket.immediateReadCalled, true);
1512
1513   ASSERT_FALSE(socket.isClosedBySelf());
1514   ASSERT_FALSE(socket.isClosedByPeer());
1515 }
1516
1517 TEST(AsyncSocket, ConnectReadUninstallRead) {
1518   TestServer server;
1519
1520   const size_t maxBufferSz = 100;
1521   const size_t maxReadsPerEvent = 1;
1522   const size_t expectedDataSz = maxBufferSz * 3;
1523   char expectedData[expectedDataSz];
1524   memset(expectedData, 'k', expectedDataSz);
1525
1526   EventBase evb;
1527   ReadCallback rcb(maxBufferSz);
1528   AsyncSocketImmediateRead socket(&evb);
1529   socket.connect(nullptr, server.getAddress(), 30);
1530
1531   evb.loop(); // loop until the socket is connected
1532
1533   socket.setReadCB(&rcb);
1534   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1535   socket.immediateReadCalled = false;
1536
1537   auto acceptedSocket = server.acceptAsync(&evb);
1538
1539   ReadCallback rcbServer;
1540   WriteCallback wcbServer;
1541   rcbServer.dataAvailableCallback = [&]() {
1542     if (rcbServer.dataRead() == expectedDataSz) {
1543       // write back all data read
1544       rcbServer.verifyData(expectedData, expectedDataSz);
1545       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1546       acceptedSocket->close();
1547     }
1548   };
1549   acceptedSocket->setReadCB(&rcbServer);
1550
1551   rcb.dataAvailableCallback = [&]() {
1552     // we read data and reset readCB
1553     socket.setReadCB(nullptr);
1554   };
1555
1556   // write data
1557   WriteCallback wcb;
1558   socket.write(&wcb, expectedData, expectedDataSz);
1559   evb.loop();
1560   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1561
1562   /* we shoud've only read maxBufferSz data since readCallback_
1563    * was reset in dataAvailableCallback */
1564   ASSERT_EQ(rcb.dataRead(), maxBufferSz);
1565   ASSERT_EQ(socket.immediateReadCalled, false);
1566
1567   ASSERT_FALSE(socket.isClosedBySelf());
1568   ASSERT_FALSE(socket.isClosedByPeer());
1569 }
1570
1571 // TODO:
1572 // - Test connect() and have the connect callback set the read callback
1573 // - Test connect() and have the connect callback unset the read callback
1574 // - Test reading/writing/closing/destroying the socket in the connect callback
1575 // - Test reading/writing/closing/destroying the socket in the read callback
1576 // - Test reading/writing/closing/destroying the socket in the write callback
1577 // - Test one-way shutdown behavior
1578 // - Test changing the EventBase
1579 //
1580 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1581 //   in connectSuccess(), readDataAvailable(), writeSuccess()
1582
1583
1584 ///////////////////////////////////////////////////////////////////////////
1585 // AsyncServerSocket tests
1586 ///////////////////////////////////////////////////////////////////////////
1587 namespace {
1588 /**
1589  * Helper ConnectionEventCallback class for the test code.
1590  * It maintains counters protected by a spin lock.
1591  */
1592 class TestConnectionEventCallback :
1593   public AsyncServerSocket::ConnectionEventCallback {
1594  public:
1595   virtual void onConnectionAccepted(
1596       const int /* socket */,
1597       const SocketAddress& /* addr */) noexcept override {
1598     folly::RWSpinLock::WriteHolder holder(spinLock_);
1599     connectionAccepted_++;
1600   }
1601
1602   virtual void onConnectionAcceptError(const int /* err */) noexcept override {
1603     folly::RWSpinLock::WriteHolder holder(spinLock_);
1604     connectionAcceptedError_++;
1605   }
1606
1607   virtual void onConnectionDropped(
1608       const int /* socket */,
1609       const SocketAddress& /* addr */) noexcept override {
1610     folly::RWSpinLock::WriteHolder holder(spinLock_);
1611     connectionDropped_++;
1612   }
1613
1614   virtual void onConnectionEnqueuedForAcceptorCallback(
1615       const int /* socket */,
1616       const SocketAddress& /* addr */) noexcept override {
1617     folly::RWSpinLock::WriteHolder holder(spinLock_);
1618     connectionEnqueuedForAcceptCallback_++;
1619   }
1620
1621   virtual void onConnectionDequeuedByAcceptorCallback(
1622       const int /* socket */,
1623       const SocketAddress& /* addr */) noexcept override {
1624     folly::RWSpinLock::WriteHolder holder(spinLock_);
1625     connectionDequeuedByAcceptCallback_++;
1626   }
1627
1628   virtual void onBackoffStarted() noexcept override {
1629     folly::RWSpinLock::WriteHolder holder(spinLock_);
1630     backoffStarted_++;
1631   }
1632
1633   virtual void onBackoffEnded() noexcept override {
1634     folly::RWSpinLock::WriteHolder holder(spinLock_);
1635     backoffEnded_++;
1636   }
1637
1638   virtual void onBackoffError() noexcept override {
1639     folly::RWSpinLock::WriteHolder holder(spinLock_);
1640     backoffError_++;
1641   }
1642
1643   unsigned int getConnectionAccepted() const {
1644     folly::RWSpinLock::ReadHolder holder(spinLock_);
1645     return connectionAccepted_;
1646   }
1647
1648   unsigned int getConnectionAcceptedError() const {
1649     folly::RWSpinLock::ReadHolder holder(spinLock_);
1650     return connectionAcceptedError_;
1651   }
1652
1653   unsigned int getConnectionDropped() const {
1654     folly::RWSpinLock::ReadHolder holder(spinLock_);
1655     return connectionDropped_;
1656   }
1657
1658   unsigned int getConnectionEnqueuedForAcceptCallback() const {
1659     folly::RWSpinLock::ReadHolder holder(spinLock_);
1660     return connectionEnqueuedForAcceptCallback_;
1661   }
1662
1663   unsigned int getConnectionDequeuedByAcceptCallback() const {
1664     folly::RWSpinLock::ReadHolder holder(spinLock_);
1665     return connectionDequeuedByAcceptCallback_;
1666   }
1667
1668   unsigned int getBackoffStarted() const {
1669     folly::RWSpinLock::ReadHolder holder(spinLock_);
1670     return backoffStarted_;
1671   }
1672
1673   unsigned int getBackoffEnded() const {
1674     folly::RWSpinLock::ReadHolder holder(spinLock_);
1675     return backoffEnded_;
1676   }
1677
1678   unsigned int getBackoffError() const {
1679     folly::RWSpinLock::ReadHolder holder(spinLock_);
1680     return backoffError_;
1681   }
1682
1683  private:
1684   mutable folly::RWSpinLock spinLock_;
1685   unsigned int connectionAccepted_{0};
1686   unsigned int connectionAcceptedError_{0};
1687   unsigned int connectionDropped_{0};
1688   unsigned int connectionEnqueuedForAcceptCallback_{0};
1689   unsigned int connectionDequeuedByAcceptCallback_{0};
1690   unsigned int backoffStarted_{0};
1691   unsigned int backoffEnded_{0};
1692   unsigned int backoffError_{0};
1693 };
1694
1695 /**
1696  * Helper AcceptCallback class for the test code
1697  * It records the callbacks that were invoked, and also supports calling
1698  * generic std::function objects in each callback.
1699  */
1700 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1701  public:
1702   enum EventType {
1703     TYPE_START,
1704     TYPE_ACCEPT,
1705     TYPE_ERROR,
1706     TYPE_STOP
1707   };
1708   struct EventInfo {
1709     EventInfo(int fd, const folly::SocketAddress& addr)
1710       : type(TYPE_ACCEPT),
1711         fd(fd),
1712         address(addr),
1713         errorMsg() {}
1714     explicit EventInfo(const std::string& msg)
1715       : type(TYPE_ERROR),
1716         fd(-1),
1717         address(),
1718         errorMsg(msg) {}
1719     explicit EventInfo(EventType et)
1720       : type(et),
1721         fd(-1),
1722         address(),
1723         errorMsg() {}
1724
1725     EventType type;
1726     int fd;  // valid for TYPE_ACCEPT
1727     folly::SocketAddress address;  // valid for TYPE_ACCEPT
1728     string errorMsg;  // valid for TYPE_ERROR
1729   };
1730   typedef std::deque<EventInfo> EventList;
1731
1732   TestAcceptCallback()
1733     : connectionAcceptedFn_(),
1734       acceptErrorFn_(),
1735       acceptStoppedFn_(),
1736       events_() {}
1737
1738   std::deque<EventInfo>* getEvents() {
1739     return &events_;
1740   }
1741
1742   void setConnectionAcceptedFn(
1743       const std::function<void(int, const folly::SocketAddress&)>& fn) {
1744     connectionAcceptedFn_ = fn;
1745   }
1746   void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1747     acceptErrorFn_ = fn;
1748   }
1749   void setAcceptStartedFn(const std::function<void()>& fn) {
1750     acceptStartedFn_ = fn;
1751   }
1752   void setAcceptStoppedFn(const std::function<void()>& fn) {
1753     acceptStoppedFn_ = fn;
1754   }
1755
1756   void connectionAccepted(
1757       int fd, const folly::SocketAddress& clientAddr) noexcept override {
1758     events_.emplace_back(fd, clientAddr);
1759
1760     if (connectionAcceptedFn_) {
1761       connectionAcceptedFn_(fd, clientAddr);
1762     }
1763   }
1764   void acceptError(const std::exception& ex) noexcept override {
1765     events_.emplace_back(ex.what());
1766
1767     if (acceptErrorFn_) {
1768       acceptErrorFn_(ex);
1769     }
1770   }
1771   void acceptStarted() noexcept override {
1772     events_.emplace_back(TYPE_START);
1773
1774     if (acceptStartedFn_) {
1775       acceptStartedFn_();
1776     }
1777   }
1778   void acceptStopped() noexcept override {
1779     events_.emplace_back(TYPE_STOP);
1780
1781     if (acceptStoppedFn_) {
1782       acceptStoppedFn_();
1783     }
1784   }
1785
1786  private:
1787   std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1788   std::function<void(const std::exception&)> acceptErrorFn_;
1789   std::function<void()> acceptStartedFn_;
1790   std::function<void()> acceptStoppedFn_;
1791
1792   std::deque<EventInfo> events_;
1793 };
1794 }
1795
1796 /**
1797  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1798  */
1799 TEST(AsyncSocketTest, ServerAcceptOptions) {
1800   EventBase eventBase;
1801
1802   // Create a server socket
1803   std::shared_ptr<AsyncServerSocket> serverSocket(
1804       AsyncServerSocket::newSocket(&eventBase));
1805   serverSocket->bind(0);
1806   serverSocket->listen(16);
1807   folly::SocketAddress serverAddress;
1808   serverSocket->getAddress(&serverAddress);
1809
1810   // Add a callback to accept one connection then stop the loop
1811   TestAcceptCallback acceptCallback;
1812   acceptCallback.setConnectionAcceptedFn(
1813       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1814         serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1815       });
1816   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1817     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1818   });
1819   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
1820   serverSocket->startAccepting();
1821
1822   // Connect to the server socket
1823   std::shared_ptr<AsyncSocket> socket(
1824       AsyncSocket::newSocket(&eventBase, serverAddress));
1825
1826   eventBase.loop();
1827
1828   // Verify that the server accepted a connection
1829   ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
1830   ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
1831                     TestAcceptCallback::TYPE_START);
1832   ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
1833                     TestAcceptCallback::TYPE_ACCEPT);
1834   ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
1835                     TestAcceptCallback::TYPE_STOP);
1836   int fd = acceptCallback.getEvents()->at(1).fd;
1837
1838   // The accepted connection should already be in non-blocking mode
1839   int flags = fcntl(fd, F_GETFL, 0);
1840   ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1841
1842 #ifndef TCP_NOPUSH
1843   // The accepted connection should already have TCP_NODELAY set
1844   int value;
1845   socklen_t valueLength = sizeof(value);
1846   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1847   ASSERT_EQ(rc, 0);
1848   ASSERT_EQ(value, 1);
1849 #endif
1850 }
1851
1852 /**
1853  * Test AsyncServerSocket::removeAcceptCallback()
1854  */
1855 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1856   // Create a new AsyncServerSocket
1857   EventBase eventBase;
1858   std::shared_ptr<AsyncServerSocket> serverSocket(
1859       AsyncServerSocket::newSocket(&eventBase));
1860   serverSocket->bind(0);
1861   serverSocket->listen(16);
1862   folly::SocketAddress serverAddress;
1863   serverSocket->getAddress(&serverAddress);
1864
1865   // Add several accept callbacks
1866   TestAcceptCallback cb1;
1867   TestAcceptCallback cb2;
1868   TestAcceptCallback cb3;
1869   TestAcceptCallback cb4;
1870   TestAcceptCallback cb5;
1871   TestAcceptCallback cb6;
1872   TestAcceptCallback cb7;
1873
1874   // Test having callbacks remove other callbacks before them on the list,
1875   // after them on the list, or removing themselves.
1876   //
1877   // Have callback 2 remove callback 3 and callback 5 the first time it is
1878   // called.
1879   int cb2Count = 0;
1880   cb1.setConnectionAcceptedFn([&](int /* fd */,
1881                                   const folly::SocketAddress& /* addr */) {
1882     std::shared_ptr<AsyncSocket> sock2(
1883         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1884   });
1885   cb3.setConnectionAcceptedFn(
1886       [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1887   cb4.setConnectionAcceptedFn(
1888       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1889         std::shared_ptr<AsyncSocket> sock3(
1890             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1891       });
1892   cb5.setConnectionAcceptedFn(
1893       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1894         std::shared_ptr<AsyncSocket> sock5(
1895             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1896
1897       });
1898   cb2.setConnectionAcceptedFn(
1899       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1900         if (cb2Count == 0) {
1901           serverSocket->removeAcceptCallback(&cb3, nullptr);
1902           serverSocket->removeAcceptCallback(&cb5, nullptr);
1903         }
1904         ++cb2Count;
1905       });
1906   // Have callback 6 remove callback 4 the first time it is called,
1907   // and destroy the server socket the second time it is called
1908   int cb6Count = 0;
1909   cb6.setConnectionAcceptedFn(
1910       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1911         if (cb6Count == 0) {
1912           serverSocket->removeAcceptCallback(&cb4, nullptr);
1913           std::shared_ptr<AsyncSocket> sock6(
1914               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1915           std::shared_ptr<AsyncSocket> sock7(
1916               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1917           std::shared_ptr<AsyncSocket> sock8(
1918               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1919
1920         } else {
1921           serverSocket.reset();
1922         }
1923         ++cb6Count;
1924       });
1925   // Have callback 7 remove itself
1926   cb7.setConnectionAcceptedFn(
1927       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1928         serverSocket->removeAcceptCallback(&cb7, nullptr);
1929       });
1930
1931   serverSocket->addAcceptCallback(&cb1, &eventBase);
1932   serverSocket->addAcceptCallback(&cb2, &eventBase);
1933   serverSocket->addAcceptCallback(&cb3, &eventBase);
1934   serverSocket->addAcceptCallback(&cb4, &eventBase);
1935   serverSocket->addAcceptCallback(&cb5, &eventBase);
1936   serverSocket->addAcceptCallback(&cb6, &eventBase);
1937   serverSocket->addAcceptCallback(&cb7, &eventBase);
1938   serverSocket->startAccepting();
1939
1940   // Make several connections to the socket
1941   std::shared_ptr<AsyncSocket> sock1(
1942       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1943   std::shared_ptr<AsyncSocket> sock4(
1944       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1945
1946   // Loop until we are stopped
1947   eventBase.loop();
1948
1949   // Check to make sure that the expected callbacks were invoked.
1950   //
1951   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1952   // the AcceptCallbacks in round-robin fashion, in the order that they were
1953   // added.  The code is implemented this way right now, but the API doesn't
1954   // explicitly require it be done this way.  If we change the code not to be
1955   // exactly round robin in the future, we can simplify the test checks here.
1956   // (We'll also need to update the termination code, since we expect cb6 to
1957   // get called twice to terminate the loop.)
1958   ASSERT_EQ(cb1.getEvents()->size(), 4);
1959   ASSERT_EQ(cb1.getEvents()->at(0).type,
1960                     TestAcceptCallback::TYPE_START);
1961   ASSERT_EQ(cb1.getEvents()->at(1).type,
1962                     TestAcceptCallback::TYPE_ACCEPT);
1963   ASSERT_EQ(cb1.getEvents()->at(2).type,
1964                     TestAcceptCallback::TYPE_ACCEPT);
1965   ASSERT_EQ(cb1.getEvents()->at(3).type,
1966                     TestAcceptCallback::TYPE_STOP);
1967
1968   ASSERT_EQ(cb2.getEvents()->size(), 4);
1969   ASSERT_EQ(cb2.getEvents()->at(0).type,
1970                     TestAcceptCallback::TYPE_START);
1971   ASSERT_EQ(cb2.getEvents()->at(1).type,
1972                     TestAcceptCallback::TYPE_ACCEPT);
1973   ASSERT_EQ(cb2.getEvents()->at(2).type,
1974                     TestAcceptCallback::TYPE_ACCEPT);
1975   ASSERT_EQ(cb2.getEvents()->at(3).type,
1976                     TestAcceptCallback::TYPE_STOP);
1977
1978   ASSERT_EQ(cb3.getEvents()->size(), 2);
1979   ASSERT_EQ(cb3.getEvents()->at(0).type,
1980                     TestAcceptCallback::TYPE_START);
1981   ASSERT_EQ(cb3.getEvents()->at(1).type,
1982                     TestAcceptCallback::TYPE_STOP);
1983
1984   ASSERT_EQ(cb4.getEvents()->size(), 3);
1985   ASSERT_EQ(cb4.getEvents()->at(0).type,
1986                     TestAcceptCallback::TYPE_START);
1987   ASSERT_EQ(cb4.getEvents()->at(1).type,
1988                     TestAcceptCallback::TYPE_ACCEPT);
1989   ASSERT_EQ(cb4.getEvents()->at(2).type,
1990                     TestAcceptCallback::TYPE_STOP);
1991
1992   ASSERT_EQ(cb5.getEvents()->size(), 2);
1993   ASSERT_EQ(cb5.getEvents()->at(0).type,
1994                     TestAcceptCallback::TYPE_START);
1995   ASSERT_EQ(cb5.getEvents()->at(1).type,
1996                     TestAcceptCallback::TYPE_STOP);
1997
1998   ASSERT_EQ(cb6.getEvents()->size(), 4);
1999   ASSERT_EQ(cb6.getEvents()->at(0).type,
2000                     TestAcceptCallback::TYPE_START);
2001   ASSERT_EQ(cb6.getEvents()->at(1).type,
2002                     TestAcceptCallback::TYPE_ACCEPT);
2003   ASSERT_EQ(cb6.getEvents()->at(2).type,
2004                     TestAcceptCallback::TYPE_ACCEPT);
2005   ASSERT_EQ(cb6.getEvents()->at(3).type,
2006                     TestAcceptCallback::TYPE_STOP);
2007
2008   ASSERT_EQ(cb7.getEvents()->size(), 3);
2009   ASSERT_EQ(cb7.getEvents()->at(0).type,
2010                     TestAcceptCallback::TYPE_START);
2011   ASSERT_EQ(cb7.getEvents()->at(1).type,
2012                     TestAcceptCallback::TYPE_ACCEPT);
2013   ASSERT_EQ(cb7.getEvents()->at(2).type,
2014                     TestAcceptCallback::TYPE_STOP);
2015 }
2016
2017 /**
2018  * Test AsyncServerSocket::removeAcceptCallback()
2019  */
2020 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
2021   // Create a new AsyncServerSocket
2022   EventBase eventBase;
2023   std::shared_ptr<AsyncServerSocket> serverSocket(
2024       AsyncServerSocket::newSocket(&eventBase));
2025   serverSocket->bind(0);
2026   serverSocket->listen(16);
2027   folly::SocketAddress serverAddress;
2028   serverSocket->getAddress(&serverAddress);
2029
2030   // Add several accept callbacks
2031   TestAcceptCallback cb1;
2032   auto thread_id = std::this_thread::get_id();
2033   cb1.setAcceptStartedFn([&](){
2034     CHECK_NE(thread_id, std::this_thread::get_id());
2035     thread_id = std::this_thread::get_id();
2036   });
2037   cb1.setConnectionAcceptedFn(
2038       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2039         ASSERT_EQ(thread_id, std::this_thread::get_id());
2040         serverSocket->removeAcceptCallback(&cb1, &eventBase);
2041       });
2042   cb1.setAcceptStoppedFn([&](){
2043     ASSERT_EQ(thread_id, std::this_thread::get_id());
2044   });
2045
2046   // Test having callbacks remove other callbacks before them on the list,
2047   serverSocket->addAcceptCallback(&cb1, &eventBase);
2048   serverSocket->startAccepting();
2049
2050   // Make several connections to the socket
2051   std::shared_ptr<AsyncSocket> sock1(
2052       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
2053
2054   // Loop in another thread
2055   auto other = std::thread([&](){
2056     eventBase.loop();
2057   });
2058   other.join();
2059
2060   // Check to make sure that the expected callbacks were invoked.
2061   //
2062   // NOTE: This code depends on the AsyncServerSocket operating calling all of
2063   // the AcceptCallbacks in round-robin fashion, in the order that they were
2064   // added.  The code is implemented this way right now, but the API doesn't
2065   // explicitly require it be done this way.  If we change the code not to be
2066   // exactly round robin in the future, we can simplify the test checks here.
2067   // (We'll also need to update the termination code, since we expect cb6 to
2068   // get called twice to terminate the loop.)
2069   ASSERT_EQ(cb1.getEvents()->size(), 3);
2070   ASSERT_EQ(cb1.getEvents()->at(0).type,
2071                     TestAcceptCallback::TYPE_START);
2072   ASSERT_EQ(cb1.getEvents()->at(1).type,
2073                     TestAcceptCallback::TYPE_ACCEPT);
2074   ASSERT_EQ(cb1.getEvents()->at(2).type,
2075                     TestAcceptCallback::TYPE_STOP);
2076
2077 }
2078
2079 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
2080   EventBase* eventBase = serverSocket->getEventBase();
2081   CHECK(eventBase);
2082
2083   // Add a callback to accept one connection then stop accepting
2084   TestAcceptCallback acceptCallback;
2085   acceptCallback.setConnectionAcceptedFn(
2086       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2087         serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
2088       });
2089   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2090     serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
2091   });
2092   serverSocket->addAcceptCallback(&acceptCallback, eventBase);
2093   serverSocket->startAccepting();
2094
2095   // Connect to the server socket
2096   folly::SocketAddress serverAddress;
2097   serverSocket->getAddress(&serverAddress);
2098   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
2099
2100   // Loop to process all events
2101   eventBase->loop();
2102
2103   // Verify that the server accepted a connection
2104   ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
2105   ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
2106                     TestAcceptCallback::TYPE_START);
2107   ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
2108                     TestAcceptCallback::TYPE_ACCEPT);
2109   ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
2110                     TestAcceptCallback::TYPE_STOP);
2111 }
2112
2113 /* Verify that we don't leak sockets if we are destroyed()
2114  * and there are still writes pending
2115  *
2116  * If destroy() only calls close() instead of closeNow(),
2117  * it would shutdown(writes) on the socket, but it would
2118  * never be close()'d, and the socket would leak
2119  */
2120 TEST(AsyncSocketTest, DestroyCloseTest) {
2121   TestServer server;
2122
2123   // connect()
2124   EventBase clientEB;
2125   EventBase serverEB;
2126   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
2127   ConnCallback ccb;
2128   socket->connect(&ccb, server.getAddress(), 30);
2129
2130   // Accept the connection
2131   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
2132   ReadCallback rcb;
2133   acceptedSocket->setReadCB(&rcb);
2134
2135   // Write a large buffer to the socket that is larger than kernel buffer
2136   size_t simpleBufLength = 5000000;
2137   char* simpleBuf = new char[simpleBufLength];
2138   memset(simpleBuf, 'a', simpleBufLength);
2139   WriteCallback wcb;
2140
2141   // Let the reads and writes run to completion
2142   int fd = acceptedSocket->getFd();
2143
2144   acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2145   socket.reset();
2146   acceptedSocket.reset();
2147
2148   // Test that server socket was closed
2149   folly::test::msvcSuppressAbortOnInvalidParams([&] {
2150     ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2151     ASSERT_EQ(sz, -1);
2152     ASSERT_EQ(errno, EBADF);
2153   });
2154   delete[] simpleBuf;
2155 }
2156
2157 /**
2158  * Test AsyncServerSocket::useExistingSocket()
2159  */
2160 TEST(AsyncSocketTest, ServerExistingSocket) {
2161   EventBase eventBase;
2162
2163   // Test creating a socket, and letting AsyncServerSocket bind and listen
2164   {
2165     // Manually create a socket
2166     int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2167     ASSERT_GE(fd, 0);
2168
2169     // Create a server socket
2170     AsyncServerSocket::UniquePtr serverSocket(
2171         new AsyncServerSocket(&eventBase));
2172     serverSocket->useExistingSocket(fd);
2173     folly::SocketAddress address;
2174     serverSocket->getAddress(&address);
2175     address.setPort(0);
2176     serverSocket->bind(address);
2177     serverSocket->listen(16);
2178
2179     // Make sure the socket works
2180     serverSocketSanityTest(serverSocket.get());
2181   }
2182
2183   // Test creating a socket and binding manually,
2184   // then letting AsyncServerSocket listen
2185   {
2186     // Manually create a socket
2187     int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2188     ASSERT_GE(fd, 0);
2189     // bind
2190     struct sockaddr_in addr;
2191     addr.sin_family = AF_INET;
2192     addr.sin_port = 0;
2193     addr.sin_addr.s_addr = INADDR_ANY;
2194     ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2195                              sizeof(addr)), 0);
2196     // Look up the address that we bound to
2197     folly::SocketAddress boundAddress;
2198     boundAddress.setFromLocalAddress(fd);
2199
2200     // Create a server socket
2201     AsyncServerSocket::UniquePtr serverSocket(
2202         new AsyncServerSocket(&eventBase));
2203     serverSocket->useExistingSocket(fd);
2204     serverSocket->listen(16);
2205
2206     // Make sure AsyncServerSocket reports the same address that we bound to
2207     folly::SocketAddress serverSocketAddress;
2208     serverSocket->getAddress(&serverSocketAddress);
2209     ASSERT_EQ(boundAddress, serverSocketAddress);
2210
2211     // Make sure the socket works
2212     serverSocketSanityTest(serverSocket.get());
2213   }
2214
2215   // Test creating a socket, binding and listening manually,
2216   // then giving it to AsyncServerSocket
2217   {
2218     // Manually create a socket
2219     int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2220     ASSERT_GE(fd, 0);
2221     // bind
2222     struct sockaddr_in addr;
2223     addr.sin_family = AF_INET;
2224     addr.sin_port = 0;
2225     addr.sin_addr.s_addr = INADDR_ANY;
2226     ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2227                              sizeof(addr)), 0);
2228     // Look up the address that we bound to
2229     folly::SocketAddress boundAddress;
2230     boundAddress.setFromLocalAddress(fd);
2231     // listen
2232     ASSERT_EQ(listen(fd, 16), 0);
2233
2234     // Create a server socket
2235     AsyncServerSocket::UniquePtr serverSocket(
2236         new AsyncServerSocket(&eventBase));
2237     serverSocket->useExistingSocket(fd);
2238
2239     // Make sure AsyncServerSocket reports the same address that we bound to
2240     folly::SocketAddress serverSocketAddress;
2241     serverSocket->getAddress(&serverSocketAddress);
2242     ASSERT_EQ(boundAddress, serverSocketAddress);
2243
2244     // Make sure the socket works
2245     serverSocketSanityTest(serverSocket.get());
2246   }
2247 }
2248
2249 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2250   EventBase eventBase;
2251
2252   // Create a server socket
2253   std::shared_ptr<AsyncServerSocket> serverSocket(
2254       AsyncServerSocket::newSocket(&eventBase));
2255   string path(1, 0);
2256   path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2257   folly::SocketAddress serverAddress;
2258   serverAddress.setFromPath(path);
2259   serverSocket->bind(serverAddress);
2260   serverSocket->listen(16);
2261
2262   // Add a callback to accept one connection then stop the loop
2263   TestAcceptCallback acceptCallback;
2264   acceptCallback.setConnectionAcceptedFn(
2265       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2266         serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2267       });
2268   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2269     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2270   });
2271   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2272   serverSocket->startAccepting();
2273
2274   // Connect to the server socket
2275   std::shared_ptr<AsyncSocket> socket(
2276       AsyncSocket::newSocket(&eventBase, serverAddress));
2277
2278   eventBase.loop();
2279
2280   // Verify that the server accepted a connection
2281   ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
2282   ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
2283                     TestAcceptCallback::TYPE_START);
2284   ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
2285                     TestAcceptCallback::TYPE_ACCEPT);
2286   ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
2287                     TestAcceptCallback::TYPE_STOP);
2288   int fd = acceptCallback.getEvents()->at(1).fd;
2289
2290   // The accepted connection should already be in non-blocking mode
2291   int flags = fcntl(fd, F_GETFL, 0);
2292   ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2293 }
2294
2295 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2296   EventBase eventBase;
2297   TestConnectionEventCallback connectionEventCallback;
2298
2299   // Create a server socket
2300   std::shared_ptr<AsyncServerSocket> serverSocket(
2301       AsyncServerSocket::newSocket(&eventBase));
2302   serverSocket->setConnectionEventCallback(&connectionEventCallback);
2303   serverSocket->bind(0);
2304   serverSocket->listen(16);
2305   folly::SocketAddress serverAddress;
2306   serverSocket->getAddress(&serverAddress);
2307
2308   // Add a callback to accept one connection then stop the loop
2309   TestAcceptCallback acceptCallback;
2310   acceptCallback.setConnectionAcceptedFn(
2311       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2312         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2313       });
2314   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2315     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2316   });
2317   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2318   serverSocket->startAccepting();
2319
2320   // Connect to the server socket
2321   std::shared_ptr<AsyncSocket> socket(
2322       AsyncSocket::newSocket(&eventBase, serverAddress));
2323
2324   eventBase.loop();
2325
2326   // Validate the connection event counters
2327   ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2328   ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2329   ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2330   ASSERT_EQ(
2331       connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2332   ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2333   ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2334   ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2335   ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2336 }
2337
2338 TEST(AsyncSocketTest, CallbackInPrimaryEventBase) {
2339   EventBase eventBase;
2340   TestConnectionEventCallback connectionEventCallback;
2341
2342   // Create a server socket
2343   std::shared_ptr<AsyncServerSocket> serverSocket(
2344       AsyncServerSocket::newSocket(&eventBase));
2345   serverSocket->setConnectionEventCallback(&connectionEventCallback);
2346   serverSocket->bind(0);
2347   serverSocket->listen(16);
2348   folly::SocketAddress serverAddress;
2349   serverSocket->getAddress(&serverAddress);
2350
2351   // Add a callback to accept one connection then stop the loop
2352   TestAcceptCallback acceptCallback;
2353   acceptCallback.setConnectionAcceptedFn(
2354       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2355         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2356       });
2357   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2358     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2359   });
2360   bool acceptStartedFlag{false};
2361   acceptCallback.setAcceptStartedFn([&acceptStartedFlag](){
2362     acceptStartedFlag = true;
2363   });
2364   bool acceptStoppedFlag{false};
2365   acceptCallback.setAcceptStoppedFn([&acceptStoppedFlag](){
2366     acceptStoppedFlag = true;
2367   });
2368   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2369   serverSocket->startAccepting();
2370
2371   // Connect to the server socket
2372   std::shared_ptr<AsyncSocket> socket(
2373       AsyncSocket::newSocket(&eventBase, serverAddress));
2374
2375   eventBase.loop();
2376
2377   ASSERT_TRUE(acceptStartedFlag);
2378   ASSERT_TRUE(acceptStoppedFlag);
2379   // Validate the connection event counters
2380   ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2381   ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2382   ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2383   ASSERT_EQ(
2384       connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
2385   ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
2386   ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2387   ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2388   ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2389 }
2390
2391
2392
2393 /**
2394  * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2395  */
2396 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2397   EventBase eventBase;
2398
2399   // Counter of how many connections have been accepted
2400   int count = 0;
2401
2402   // Create a server socket
2403   auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2404   serverSocket->bind(0);
2405   serverSocket->listen(16);
2406   folly::SocketAddress serverAddress;
2407   serverSocket->getAddress(&serverAddress);
2408
2409   // Add a callback to accept connections
2410   TestAcceptCallback acceptCallback;
2411   acceptCallback.setConnectionAcceptedFn(
2412       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2413         count++;
2414         ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2415
2416         if (count == 4) {
2417           // all messages are processed, remove accept callback
2418           serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2419         }
2420       });
2421   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2422     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2423   });
2424   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2425   serverSocket->startAccepting();
2426
2427   // Connect to the server socket, 4 clients, there are 4 connections
2428   auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2429   auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2430   auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2431   auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2432
2433   eventBase.loop();
2434 }
2435
2436 /**
2437  * Test AsyncTransport::BufferCallback
2438  */
2439 TEST(AsyncSocketTest, BufferTest) {
2440   TestServer server;
2441
2442   EventBase evb;
2443   AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2444   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2445   ConnCallback ccb;
2446   socket->connect(&ccb, server.getAddress(), 30, option);
2447
2448   char buf[100 * 1024];
2449   memset(buf, 'c', sizeof(buf));
2450   WriteCallback wcb;
2451   BufferCallback bcb;
2452   socket->setBufferCallback(&bcb);
2453   socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2454
2455   evb.loop();
2456   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2457   ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
2458
2459   ASSERT_TRUE(bcb.hasBuffered());
2460   ASSERT_TRUE(bcb.hasBufferCleared());
2461
2462   socket->close();
2463   server.verifyConnection(buf, sizeof(buf));
2464
2465   ASSERT_TRUE(socket->isClosedBySelf());
2466   ASSERT_FALSE(socket->isClosedByPeer());
2467 }
2468
2469 TEST(AsyncSocketTest, BufferCallbackKill) {
2470   TestServer server;
2471   EventBase evb;
2472   AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2473   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2474   ConnCallback ccb;
2475   socket->connect(&ccb, server.getAddress(), 30, option);
2476   evb.loopOnce();
2477
2478   char buf[100 * 1024];
2479   memset(buf, 'c', sizeof(buf));
2480   BufferCallback bcb;
2481   socket->setBufferCallback(&bcb);
2482   WriteCallback wcb;
2483   wcb.successCallback = [&] {
2484     ASSERT_TRUE(socket.unique());
2485     socket.reset();
2486   };
2487
2488   // This will trigger AsyncSocket::handleWrite,
2489   // which calls WriteCallback::writeSuccess,
2490   // which calls wcb.successCallback above,
2491   // which tries to delete socket
2492   // Then, the socket will also try to use this BufferCallback
2493   // And that should crash us, if there is no DestructorGuard on the stack
2494   socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2495
2496   evb.loop();
2497   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2498 }
2499
2500 #if FOLLY_ALLOW_TFO
2501 TEST(AsyncSocketTest, ConnectTFO) {
2502   // Start listening on a local port
2503   TestServer server(true);
2504
2505   // Connect using a AsyncSocket
2506   EventBase evb;
2507   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2508   socket->enableTFO();
2509   ConnCallback cb;
2510   socket->connect(&cb, server.getAddress(), 30);
2511
2512   std::array<uint8_t, 128> buf;
2513   memset(buf.data(), 'a', buf.size());
2514
2515   std::array<uint8_t, 3> readBuf;
2516   auto sendBuf = IOBuf::copyBuffer("hey");
2517
2518   std::thread t([&] {
2519     auto acceptedSocket = server.accept();
2520     acceptedSocket->write(buf.data(), buf.size());
2521     acceptedSocket->flush();
2522     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2523     acceptedSocket->close();
2524   });
2525
2526   evb.loop();
2527
2528   ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2529   EXPECT_LE(0, socket->getConnectTime().count());
2530   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2531   EXPECT_TRUE(socket->getTFOAttempted());
2532
2533   // Should trigger the connect
2534   WriteCallback write;
2535   ReadCallback rcb;
2536   socket->writeChain(&write, sendBuf->clone());
2537   socket->setReadCB(&rcb);
2538   evb.loop();
2539
2540   t.join();
2541
2542   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2543   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2544   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2545   ASSERT_EQ(1, rcb.buffers.size());
2546   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2547   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2548   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2549 }
2550
2551 TEST(AsyncSocketTest, ConnectTFOSupplyEarlyReadCB) {
2552   // Start listening on a local port
2553   TestServer server(true);
2554
2555   // Connect using a AsyncSocket
2556   EventBase evb;
2557   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2558   socket->enableTFO();
2559   ConnCallback cb;
2560   socket->connect(&cb, server.getAddress(), 30);
2561   ReadCallback rcb;
2562   socket->setReadCB(&rcb);
2563
2564   std::array<uint8_t, 128> buf;
2565   memset(buf.data(), 'a', buf.size());
2566
2567   std::array<uint8_t, 3> readBuf;
2568   auto sendBuf = IOBuf::copyBuffer("hey");
2569
2570   std::thread t([&] {
2571     auto acceptedSocket = server.accept();
2572     acceptedSocket->write(buf.data(), buf.size());
2573     acceptedSocket->flush();
2574     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2575     acceptedSocket->close();
2576   });
2577
2578   evb.loop();
2579
2580   ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2581   EXPECT_LE(0, socket->getConnectTime().count());
2582   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2583   EXPECT_TRUE(socket->getTFOAttempted());
2584
2585   // Should trigger the connect
2586   WriteCallback write;
2587   socket->writeChain(&write, sendBuf->clone());
2588   evb.loop();
2589
2590   t.join();
2591
2592   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2593   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2594   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2595   ASSERT_EQ(1, rcb.buffers.size());
2596   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2597   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2598   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2599 }
2600
2601 /**
2602  * Test connecting to a server that isn't listening
2603  */
2604 TEST(AsyncSocketTest, ConnectRefusedImmediatelyTFO) {
2605   EventBase evb;
2606
2607   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2608
2609   socket->enableTFO();
2610
2611   // Hopefully nothing is actually listening on this address
2612   folly::SocketAddress addr("::1", 65535);
2613   ConnCallback cb;
2614   socket->connect(&cb, addr, 30);
2615
2616   evb.loop();
2617
2618   WriteCallback write1;
2619   // Trigger the connect if TFO attempt is supported.
2620   socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2621   WriteCallback write2;
2622   socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2623   evb.loop();
2624
2625   if (!socket->getTFOFinished()) {
2626     EXPECT_EQ(STATE_FAILED, write1.state);
2627   } else {
2628     EXPECT_EQ(STATE_SUCCEEDED, write1.state);
2629     EXPECT_FALSE(socket->getTFOSucceded());
2630   }
2631
2632   EXPECT_EQ(STATE_FAILED, write2.state);
2633
2634   EXPECT_EQ(STATE_SUCCEEDED, cb.state);
2635   EXPECT_LE(0, socket->getConnectTime().count());
2636   EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2637   EXPECT_TRUE(socket->getTFOAttempted());
2638 }
2639
2640 /**
2641  * Test calling closeNow() immediately after connecting.
2642  */
2643 TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
2644   TestServer server(true);
2645
2646   // connect()
2647   EventBase evb;
2648   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2649   socket->enableTFO();
2650
2651   ConnCallback ccb;
2652   socket->connect(&ccb, server.getAddress(), 30);
2653
2654   // write()
2655   std::array<char, 128> buf;
2656   memset(buf.data(), 'a', buf.size());
2657
2658   // close()
2659   socket->closeNow();
2660
2661   // Loop, although there shouldn't be anything to do.
2662   evb.loop();
2663
2664   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2665
2666   ASSERT_TRUE(socket->isClosedBySelf());
2667   ASSERT_FALSE(socket->isClosedByPeer());
2668 }
2669
2670 /**
2671  * Test calling close() immediately after connect()
2672  */
2673 TEST(AsyncSocketTest, ConnectAndCloseTFO) {
2674   TestServer server(true);
2675
2676   // Connect using a AsyncSocket
2677   EventBase evb;
2678   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2679   socket->enableTFO();
2680
2681   ConnCallback ccb;
2682   socket->connect(&ccb, server.getAddress(), 30);
2683
2684   socket->close();
2685
2686   // Loop, although there shouldn't be anything to do.
2687   evb.loop();
2688
2689   // Make sure the connection was aborted
2690   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2691
2692   ASSERT_TRUE(socket->isClosedBySelf());
2693   ASSERT_FALSE(socket->isClosedByPeer());
2694 }
2695
2696 class MockAsyncTFOSocket : public AsyncSocket {
2697  public:
2698   using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
2699
2700   explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
2701
2702   MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
2703 };
2704
2705 TEST(AsyncSocketTest, TestTFOUnsupported) {
2706   TestServer server(true);
2707
2708   // Connect using a AsyncSocket
2709   EventBase evb;
2710   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2711   socket->enableTFO();
2712
2713   ConnCallback ccb;
2714   socket->connect(&ccb, server.getAddress(), 30);
2715   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2716
2717   ReadCallback rcb;
2718   socket->setReadCB(&rcb);
2719
2720   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2721       .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2722   WriteCallback write;
2723   auto sendBuf = IOBuf::copyBuffer("hey");
2724   socket->writeChain(&write, sendBuf->clone());
2725   EXPECT_EQ(STATE_WAITING, write.state);
2726
2727   std::array<uint8_t, 128> buf;
2728   memset(buf.data(), 'a', buf.size());
2729
2730   std::array<uint8_t, 3> readBuf;
2731
2732   std::thread t([&] {
2733     std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2734     acceptedSocket->write(buf.data(), buf.size());
2735     acceptedSocket->flush();
2736     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2737     acceptedSocket->close();
2738   });
2739
2740   evb.loop();
2741
2742   t.join();
2743   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2744   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2745
2746   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2747   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2748   ASSERT_EQ(1, rcb.buffers.size());
2749   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2750   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2751   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2752 }
2753
2754 TEST(AsyncSocketTest, ConnectRefusedDelayedTFO) {
2755   EventBase evb;
2756
2757   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2758   socket->enableTFO();
2759
2760   // Hopefully this fails
2761   folly::SocketAddress fakeAddr("127.0.0.1", 65535);
2762   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2763       .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2764         sockaddr_storage addr;
2765         auto len = fakeAddr.getAddress(&addr);
2766         int ret = connect(fd, (const struct sockaddr*)&addr, len);
2767         LOG(INFO) << "connecting the socket " << fd << " : " << ret << " : "
2768                   << errno;
2769         return ret;
2770       }));
2771
2772   // Hopefully nothing is actually listening on this address
2773   ConnCallback cb;
2774   socket->connect(&cb, fakeAddr, 30);
2775
2776   WriteCallback write1;
2777   // Trigger the connect if TFO attempt is supported.
2778   socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2779
2780   if (socket->getTFOFinished()) {
2781     // This test is useless now.
2782     return;
2783   }
2784   WriteCallback write2;
2785   // Trigger the connect if TFO attempt is supported.
2786   socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2787   evb.loop();
2788
2789   EXPECT_EQ(STATE_FAILED, write1.state);
2790   EXPECT_EQ(STATE_FAILED, write2.state);
2791   EXPECT_FALSE(socket->getTFOSucceded());
2792
2793   EXPECT_EQ(STATE_SUCCEEDED, cb.state);
2794   EXPECT_LE(0, socket->getConnectTime().count());
2795   EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2796   EXPECT_TRUE(socket->getTFOAttempted());
2797 }
2798
2799 TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
2800   // Try connecting to server that won't respond.
2801   //
2802   // This depends somewhat on the network where this test is run.
2803   // Hopefully this IP will be routable but unresponsive.
2804   // (Alternatively, we could try listening on a local raw socket, but that
2805   // normally requires root privileges.)
2806   auto host = SocketAddressTestHelper::isIPv6Enabled()
2807       ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2808       : SocketAddressTestHelper::isIPv4Enabled()
2809           ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2810           : nullptr;
2811   SocketAddress addr(host, 65535);
2812
2813   // Connect using a AsyncSocket
2814   EventBase evb;
2815   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2816   socket->enableTFO();
2817
2818   ConnCallback ccb;
2819   // Set a very small timeout
2820   socket->connect(&ccb, addr, 1);
2821   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2822
2823   ReadCallback rcb;
2824   socket->setReadCB(&rcb);
2825
2826   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2827       .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2828   WriteCallback write;
2829   socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2830
2831   evb.loop();
2832
2833   EXPECT_EQ(STATE_FAILED, write.state);
2834 }
2835
2836 TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
2837   TestServer server(true);
2838
2839   // Connect using a AsyncSocket
2840   EventBase evb;
2841   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2842   socket->enableTFO();
2843
2844   ConnCallback ccb;
2845   socket->connect(&ccb, server.getAddress(), 30);
2846   ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2847
2848   ReadCallback rcb;
2849   socket->setReadCB(&rcb);
2850
2851   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2852       .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2853         sockaddr_storage addr;
2854         auto len = server.getAddress().getAddress(&addr);
2855         return connect(fd, (const struct sockaddr*)&addr, len);
2856       }));
2857   WriteCallback write;
2858   auto sendBuf = IOBuf::copyBuffer("hey");
2859   socket->writeChain(&write, sendBuf->clone());
2860   EXPECT_EQ(STATE_WAITING, write.state);
2861
2862   std::array<uint8_t, 128> buf;
2863   memset(buf.data(), 'a', buf.size());
2864
2865   std::array<uint8_t, 3> readBuf;
2866
2867   std::thread t([&] {
2868     std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2869     acceptedSocket->write(buf.data(), buf.size());
2870     acceptedSocket->flush();
2871     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2872     acceptedSocket->close();
2873   });
2874
2875   evb.loop();
2876
2877   t.join();
2878   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2879
2880   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2881   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2882
2883   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2884   ASSERT_EQ(1, rcb.buffers.size());
2885   ASSERT_EQ(buf.size(), rcb.buffers[0].length);
2886   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2887 }
2888
2889 TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
2890   // Try connecting to server that won't respond.
2891   //
2892   // This depends somewhat on the network where this test is run.
2893   // Hopefully this IP will be routable but unresponsive.
2894   // (Alternatively, we could try listening on a local raw socket, but that
2895   // normally requires root privileges.)
2896   auto host = SocketAddressTestHelper::isIPv6Enabled()
2897       ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2898       : SocketAddressTestHelper::isIPv4Enabled()
2899           ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2900           : nullptr;
2901   SocketAddress addr(host, 65535);
2902
2903   // Connect using a AsyncSocket
2904   EventBase evb;
2905   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2906   socket->enableTFO();
2907
2908   ConnCallback ccb;
2909   // Set a very small timeout
2910   socket->connect(&ccb, addr, 1);
2911   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2912
2913   ReadCallback rcb;
2914   socket->setReadCB(&rcb);
2915
2916   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2917       .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2918         sockaddr_storage addr2;
2919         auto len = addr.getAddress(&addr2);
2920         return connect(fd, (const struct sockaddr*)&addr2, len);
2921       }));
2922   WriteCallback write;
2923   socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2924
2925   evb.loop();
2926
2927   EXPECT_EQ(STATE_FAILED, write.state);
2928 }
2929
2930 TEST(AsyncSocketTest, TestTFOEagain) {
2931   TestServer server(true);
2932
2933   // Connect using a AsyncSocket
2934   EventBase evb;
2935   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2936   socket->enableTFO();
2937
2938   ConnCallback ccb;
2939   socket->connect(&ccb, server.getAddress(), 30);
2940
2941   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2942       .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
2943   WriteCallback write;
2944   socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2945
2946   evb.loop();
2947
2948   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2949   EXPECT_EQ(STATE_FAILED, write.state);
2950 }
2951
2952 // Sending a large amount of data in the first write which will
2953 // definitely not fit into MSS.
2954 TEST(AsyncSocketTest, ConnectTFOWithBigData) {
2955   // Start listening on a local port
2956   TestServer server(true);
2957
2958   // Connect using a AsyncSocket
2959   EventBase evb;
2960   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2961   socket->enableTFO();
2962   ConnCallback cb;
2963   socket->connect(&cb, server.getAddress(), 30);
2964
2965   std::array<uint8_t, 128> buf;
2966   memset(buf.data(), 'a', buf.size());
2967
2968   constexpr size_t len = 10 * 1024;
2969   auto sendBuf = IOBuf::create(len);
2970   sendBuf->append(len);
2971   std::array<uint8_t, len> readBuf;
2972
2973   std::thread t([&] {
2974     auto acceptedSocket = server.accept();
2975     acceptedSocket->write(buf.data(), buf.size());
2976     acceptedSocket->flush();
2977     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2978     acceptedSocket->close();
2979   });
2980
2981   evb.loop();
2982
2983   ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2984   EXPECT_LE(0, socket->getConnectTime().count());
2985   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2986   EXPECT_TRUE(socket->getTFOAttempted());
2987
2988   // Should trigger the connect
2989   WriteCallback write;
2990   ReadCallback rcb;
2991   socket->writeChain(&write, sendBuf->clone());
2992   socket->setReadCB(&rcb);
2993   evb.loop();
2994
2995   t.join();
2996
2997   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2998   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2999   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
3000   ASSERT_EQ(1, rcb.buffers.size());
3001   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
3002   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
3003   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
3004 }
3005
3006 class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback {
3007  public:
3008   MOCK_METHOD1(evbAttached, void(AsyncSocket*));
3009   MOCK_METHOD1(evbDetached, void(AsyncSocket*));
3010 };
3011
3012 TEST(AsyncSocketTest, EvbCallbacks) {
3013   auto cb = folly::make_unique<MockEvbChangeCallback>();
3014   EventBase evb;
3015   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3016
3017   InSequence seq;
3018   EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
3019   EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
3020
3021   socket->setEvbChangedCallback(std::move(cb));
3022   socket->detachEventBase();
3023   socket->attachEventBase(&evb);
3024 }
3025
3026 #endif