Add logs for TFO succeded
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <folly/ExceptionWrapper.h>
17 #include <folly/RWSpinLock.h>
18 #include <folly/Random.h>
19 #include <folly/SocketAddress.h>
20 #include <folly/io/async/AsyncServerSocket.h>
21 #include <folly/io/async/AsyncSocket.h>
22 #include <folly/io/async/AsyncTimeout.h>
23 #include <folly/io/async/EventBase.h>
24
25 #include <folly/io/IOBuf.h>
26 #include <folly/io/async/test/AsyncSocketTest.h>
27 #include <folly/io/async/test/Util.h>
28 #include <folly/portability/GMock.h>
29 #include <folly/portability/GTest.h>
30 #include <folly/portability/Sockets.h>
31 #include <folly/portability/Unistd.h>
32 #include <folly/test/SocketAddressTestHelper.h>
33
34 #include <boost/scoped_array.hpp>
35 #include <fcntl.h>
36 #include <sys/types.h>
37 #include <iostream>
38 #include <thread>
39
40 using namespace boost;
41
42 using std::string;
43 using std::vector;
44 using std::min;
45 using std::cerr;
46 using std::endl;
47 using std::unique_ptr;
48 using std::chrono::milliseconds;
49 using boost::scoped_array;
50
51 using namespace folly;
52 using namespace testing;
53
54 namespace fsp = folly::portability::sockets;
55
56 class DelayedWrite: public AsyncTimeout {
57  public:
58   DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
59       unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
60       bool cork, bool lastWrite = false):
61     AsyncTimeout(socket->getEventBase()),
62     socket_(socket),
63     bufs_(std::move(bufs)),
64     wcb_(wcb),
65     cork_(cork),
66     lastWrite_(lastWrite) {}
67
68  private:
69   void timeoutExpired() noexcept override {
70     WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
71     socket_->writeChain(wcb_, std::move(bufs_), flags);
72     if (lastWrite_) {
73       socket_->shutdownWrite();
74     }
75   }
76
77   std::shared_ptr<AsyncSocket> socket_;
78   unique_ptr<IOBuf> bufs_;
79   AsyncTransportWrapper::WriteCallback* wcb_;
80   bool cork_;
81   bool lastWrite_;
82 };
83
84 ///////////////////////////////////////////////////////////////////////////
85 // connect() tests
86 ///////////////////////////////////////////////////////////////////////////
87
88 /**
89  * Test connecting to a server
90  */
91 TEST(AsyncSocketTest, Connect) {
92   // Start listening on a local port
93   TestServer server;
94
95   // Connect using a AsyncSocket
96   EventBase evb;
97   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
98   ConnCallback cb;
99   socket->connect(&cb, server.getAddress(), 30);
100
101   evb.loop();
102
103   CHECK_EQ(cb.state, STATE_SUCCEEDED);
104   EXPECT_LE(0, socket->getConnectTime().count());
105   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
106 }
107
108 enum class TFOState {
109   DISABLED,
110   ENABLED,
111 };
112
113 class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
114
115 std::vector<TFOState> getTestingValues() {
116   std::vector<TFOState> vals;
117   vals.emplace_back(TFOState::DISABLED);
118
119 #if FOLLY_ALLOW_TFO
120   vals.emplace_back(TFOState::ENABLED);
121 #endif
122   return vals;
123 }
124
125 INSTANTIATE_TEST_CASE_P(
126     ConnectTests,
127     AsyncSocketConnectTest,
128     ::testing::ValuesIn(getTestingValues()));
129
130 /**
131  * Test connecting to a server that isn't listening
132  */
133 TEST(AsyncSocketTest, ConnectRefused) {
134   EventBase evb;
135
136   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
137
138   // Hopefully nothing is actually listening on this address
139   folly::SocketAddress addr("127.0.0.1", 65535);
140   ConnCallback cb;
141   socket->connect(&cb, addr, 30);
142
143   evb.loop();
144
145   EXPECT_EQ(STATE_FAILED, cb.state);
146   EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
147   EXPECT_LE(0, socket->getConnectTime().count());
148   EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
149 }
150
151 /**
152  * Test connection timeout
153  */
154 TEST(AsyncSocketTest, ConnectTimeout) {
155   EventBase evb;
156
157   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
158
159   // Try connecting to server that won't respond.
160   //
161   // This depends somewhat on the network where this test is run.
162   // Hopefully this IP will be routable but unresponsive.
163   // (Alternatively, we could try listening on a local raw socket, but that
164   // normally requires root privileges.)
165   auto host =
166       SocketAddressTestHelper::isIPv6Enabled() ?
167       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
168       SocketAddressTestHelper::isIPv4Enabled() ?
169       SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
170       nullptr;
171   SocketAddress addr(host, 65535);
172   ConnCallback cb;
173   socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
174
175   evb.loop();
176
177   CHECK_EQ(cb.state, STATE_FAILED);
178   CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
179
180   // Verify that we can still get the peer address after a timeout.
181   // Use case is if the client was created from a client pool, and we want
182   // to log which peer failed.
183   folly::SocketAddress peer;
184   socket->getPeerAddress(&peer);
185   CHECK_EQ(peer, addr);
186   EXPECT_LE(0, socket->getConnectTime().count());
187   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
188 }
189
190 /**
191  * Test writing immediately after connecting, without waiting for connect
192  * to finish.
193  */
194 TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
195   TestServer server;
196
197   // connect()
198   EventBase evb;
199   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
200
201   if (GetParam() == TFOState::ENABLED) {
202     socket->enableTFO();
203   }
204
205   ConnCallback ccb;
206   socket->connect(&ccb, server.getAddress(), 30);
207
208   // write()
209   char buf[128];
210   memset(buf, 'a', sizeof(buf));
211   WriteCallback wcb;
212   socket->write(&wcb, buf, sizeof(buf));
213
214   // Loop.  We don't bother accepting on the server socket yet.
215   // The kernel should be able to buffer the write request so it can succeed.
216   evb.loop();
217
218   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
219   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
220
221   // Make sure the server got a connection and received the data
222   socket->close();
223   server.verifyConnection(buf, sizeof(buf));
224
225   ASSERT_TRUE(socket->isClosedBySelf());
226   ASSERT_FALSE(socket->isClosedByPeer());
227   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
228 }
229
230 /**
231  * Test connecting using a nullptr connect callback.
232  */
233 TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
234   TestServer server;
235
236   // connect()
237   EventBase evb;
238   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
239   if (GetParam() == TFOState::ENABLED) {
240     socket->enableTFO();
241   }
242
243   socket->connect(nullptr, server.getAddress(), 30);
244
245   // write some data, just so we have some way of verifing
246   // that the socket works correctly after connecting
247   char buf[128];
248   memset(buf, 'a', sizeof(buf));
249   WriteCallback wcb;
250   socket->write(&wcb, buf, sizeof(buf));
251
252   evb.loop();
253
254   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
255
256   // Make sure the server got a connection and received the data
257   socket->close();
258   server.verifyConnection(buf, sizeof(buf));
259
260   ASSERT_TRUE(socket->isClosedBySelf());
261   ASSERT_FALSE(socket->isClosedByPeer());
262 }
263
264 /**
265  * Test calling both write() and close() immediately after connecting, without
266  * waiting for connect to finish.
267  *
268  * This exercises the STATE_CONNECTING_CLOSING code.
269  */
270 TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
271   TestServer server;
272
273   // connect()
274   EventBase evb;
275   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
276   if (GetParam() == TFOState::ENABLED) {
277     socket->enableTFO();
278   }
279   ConnCallback ccb;
280   socket->connect(&ccb, server.getAddress(), 30);
281
282   // write()
283   char buf[128];
284   memset(buf, 'a', sizeof(buf));
285   WriteCallback wcb;
286   socket->write(&wcb, buf, sizeof(buf));
287
288   // close()
289   socket->close();
290
291   // Loop.  We don't bother accepting on the server socket yet.
292   // The kernel should be able to buffer the write request so it can succeed.
293   evb.loop();
294
295   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
296   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
297
298   // Make sure the server got a connection and received the data
299   server.verifyConnection(buf, sizeof(buf));
300
301   ASSERT_TRUE(socket->isClosedBySelf());
302   ASSERT_FALSE(socket->isClosedByPeer());
303 }
304
305 /**
306  * Test calling close() immediately after connect()
307  */
308 TEST(AsyncSocketTest, ConnectAndClose) {
309   TestServer server;
310
311   // Connect using a AsyncSocket
312   EventBase evb;
313   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
314   ConnCallback ccb;
315   socket->connect(&ccb, server.getAddress(), 30);
316
317   // Hopefully the connect didn't succeed immediately.
318   // If it did, we can't exercise the close-while-connecting code path.
319   if (ccb.state == STATE_SUCCEEDED) {
320     LOG(INFO) << "connect() succeeded immediately; aborting test "
321                        "of close-during-connect behavior";
322     return;
323   }
324
325   socket->close();
326
327   // Loop, although there shouldn't be anything to do.
328   evb.loop();
329
330   // Make sure the connection was aborted
331   CHECK_EQ(ccb.state, STATE_FAILED);
332
333   ASSERT_TRUE(socket->isClosedBySelf());
334   ASSERT_FALSE(socket->isClosedByPeer());
335 }
336
337 /**
338  * Test calling closeNow() immediately after connect()
339  *
340  * This should be identical to the normal close behavior.
341  */
342 TEST(AsyncSocketTest, ConnectAndCloseNow) {
343   TestServer server;
344
345   // Connect using a AsyncSocket
346   EventBase evb;
347   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
348   ConnCallback ccb;
349   socket->connect(&ccb, server.getAddress(), 30);
350
351   // Hopefully the connect didn't succeed immediately.
352   // If it did, we can't exercise the close-while-connecting code path.
353   if (ccb.state == STATE_SUCCEEDED) {
354     LOG(INFO) << "connect() succeeded immediately; aborting test "
355                        "of closeNow()-during-connect behavior";
356     return;
357   }
358
359   socket->closeNow();
360
361   // Loop, although there shouldn't be anything to do.
362   evb.loop();
363
364   // Make sure the connection was aborted
365   CHECK_EQ(ccb.state, STATE_FAILED);
366
367   ASSERT_TRUE(socket->isClosedBySelf());
368   ASSERT_FALSE(socket->isClosedByPeer());
369 }
370
371 /**
372  * Test calling both write() and closeNow() immediately after connecting,
373  * without waiting for connect to finish.
374  *
375  * This should abort the pending write.
376  */
377 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
378   TestServer server;
379
380   // connect()
381   EventBase evb;
382   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
383   ConnCallback ccb;
384   socket->connect(&ccb, server.getAddress(), 30);
385
386   // Hopefully the connect didn't succeed immediately.
387   // If it did, we can't exercise the close-while-connecting code path.
388   if (ccb.state == STATE_SUCCEEDED) {
389     LOG(INFO) << "connect() succeeded immediately; aborting test "
390                        "of write-during-connect behavior";
391     return;
392   }
393
394   // write()
395   char buf[128];
396   memset(buf, 'a', sizeof(buf));
397   WriteCallback wcb;
398   socket->write(&wcb, buf, sizeof(buf));
399
400   // close()
401   socket->closeNow();
402
403   // Loop, although there shouldn't be anything to do.
404   evb.loop();
405
406   CHECK_EQ(ccb.state, STATE_FAILED);
407   CHECK_EQ(wcb.state, STATE_FAILED);
408
409   ASSERT_TRUE(socket->isClosedBySelf());
410   ASSERT_FALSE(socket->isClosedByPeer());
411 }
412
413 /**
414  * Test installing a read callback immediately, before connect() finishes.
415  */
416 TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
417   TestServer server;
418
419   // connect()
420   EventBase evb;
421   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
422   if (GetParam() == TFOState::ENABLED) {
423     socket->enableTFO();
424   }
425
426   ConnCallback ccb;
427   socket->connect(&ccb, server.getAddress(), 30);
428
429   ReadCallback rcb;
430   socket->setReadCB(&rcb);
431
432   if (GetParam() == TFOState::ENABLED) {
433     // Trigger a connection
434     socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
435   }
436
437   // Even though we haven't looped yet, we should be able to accept
438   // the connection and send data to it.
439   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
440   uint8_t buf[128];
441   memset(buf, 'a', sizeof(buf));
442   acceptedSocket->write(buf, sizeof(buf));
443   acceptedSocket->flush();
444   acceptedSocket->close();
445
446   // Loop, although there shouldn't be anything to do.
447   evb.loop();
448
449   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
450   CHECK_EQ(rcb.buffers.size(), 1);
451   CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
452   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
453
454   ASSERT_FALSE(socket->isClosedBySelf());
455   ASSERT_FALSE(socket->isClosedByPeer());
456 }
457
458 /**
459  * Test installing a read callback and then closing immediately before the
460  * connect attempt finishes.
461  */
462 TEST(AsyncSocketTest, ConnectReadAndClose) {
463   TestServer server;
464
465   // connect()
466   EventBase evb;
467   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
468   ConnCallback ccb;
469   socket->connect(&ccb, server.getAddress(), 30);
470
471   // Hopefully the connect didn't succeed immediately.
472   // If it did, we can't exercise the close-while-connecting code path.
473   if (ccb.state == STATE_SUCCEEDED) {
474     LOG(INFO) << "connect() succeeded immediately; aborting test "
475                        "of read-during-connect behavior";
476     return;
477   }
478
479   ReadCallback rcb;
480   socket->setReadCB(&rcb);
481
482   // close()
483   socket->close();
484
485   // Loop, although there shouldn't be anything to do.
486   evb.loop();
487
488   CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
489   CHECK_EQ(rcb.buffers.size(), 0);
490   CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
491
492   ASSERT_TRUE(socket->isClosedBySelf());
493   ASSERT_FALSE(socket->isClosedByPeer());
494 }
495
496 /**
497  * Test both writing and installing a read callback immediately,
498  * before connect() finishes.
499  */
500 TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
501   TestServer server;
502
503   // connect()
504   EventBase evb;
505   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
506   if (GetParam() == TFOState::ENABLED) {
507     socket->enableTFO();
508   }
509   ConnCallback ccb;
510   socket->connect(&ccb, server.getAddress(), 30);
511
512   // write()
513   char buf1[128];
514   memset(buf1, 'a', sizeof(buf1));
515   WriteCallback wcb;
516   socket->write(&wcb, buf1, sizeof(buf1));
517
518   // set a read callback
519   ReadCallback rcb;
520   socket->setReadCB(&rcb);
521
522   // Even though we haven't looped yet, we should be able to accept
523   // the connection and send data to it.
524   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
525   uint8_t buf2[128];
526   memset(buf2, 'b', sizeof(buf2));
527   acceptedSocket->write(buf2, sizeof(buf2));
528   acceptedSocket->flush();
529
530   // shut down the write half of acceptedSocket, so that the AsyncSocket
531   // will stop reading and we can break out of the event loop.
532   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
533
534   // Loop
535   evb.loop();
536
537   // Make sure the connect succeeded
538   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
539
540   // Make sure the AsyncSocket read the data written by the accepted socket
541   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
542   CHECK_EQ(rcb.buffers.size(), 1);
543   CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
544   CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
545
546   // Close the AsyncSocket so we'll see EOF on acceptedSocket
547   socket->close();
548
549   // Make sure the accepted socket saw the data written by the AsyncSocket
550   uint8_t readbuf[sizeof(buf1)];
551   acceptedSocket->readAll(readbuf, sizeof(readbuf));
552   CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
553   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
554   CHECK_EQ(bytesRead, 0);
555
556   ASSERT_FALSE(socket->isClosedBySelf());
557   ASSERT_TRUE(socket->isClosedByPeer());
558 }
559
560 /**
561  * Test writing to the socket then shutting down writes before the connect
562  * attempt finishes.
563  */
564 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
565   TestServer server;
566
567   // connect()
568   EventBase evb;
569   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
570   ConnCallback ccb;
571   socket->connect(&ccb, server.getAddress(), 30);
572
573   // Hopefully the connect didn't succeed immediately.
574   // If it did, we can't exercise the write-while-connecting code path.
575   if (ccb.state == STATE_SUCCEEDED) {
576     LOG(INFO) << "connect() succeeded immediately; skipping test";
577     return;
578   }
579
580   // Ask to write some data
581   char wbuf[128];
582   memset(wbuf, 'a', sizeof(wbuf));
583   WriteCallback wcb;
584   socket->write(&wcb, wbuf, sizeof(wbuf));
585   socket->shutdownWrite();
586
587   // Shutdown writes
588   socket->shutdownWrite();
589
590   // Even though we haven't looped yet, we should be able to accept
591   // the connection.
592   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
593
594   // Since the connection is still in progress, there should be no data to
595   // read yet.  Verify that the accepted socket is not readable.
596   struct pollfd fds[1];
597   fds[0].fd = acceptedSocket->getSocketFD();
598   fds[0].events = POLLIN;
599   fds[0].revents = 0;
600   int rc = poll(fds, 1, 0);
601   CHECK_EQ(rc, 0);
602
603   // Write data to the accepted socket
604   uint8_t acceptedWbuf[192];
605   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
606   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
607   acceptedSocket->flush();
608
609   // Loop
610   evb.loop();
611
612   // The loop should have completed the connection, written the queued data,
613   // and shutdown writes on the socket.
614   //
615   // Check that the connection was completed successfully and that the write
616   // callback succeeded.
617   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
618   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
619
620   // Check that we can read the data that was written to the socket, and that
621   // we see an EOF, since its socket was half-shutdown.
622   uint8_t readbuf[sizeof(wbuf)];
623   acceptedSocket->readAll(readbuf, sizeof(readbuf));
624   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
625   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
626   CHECK_EQ(bytesRead, 0);
627
628   // Close the accepted socket.  This will cause it to see EOF
629   // and uninstall the read callback when we loop next.
630   acceptedSocket->close();
631
632   // Install a read callback, then loop again.
633   ReadCallback rcb;
634   socket->setReadCB(&rcb);
635   evb.loop();
636
637   // This loop should have read the data and seen the EOF
638   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
639   CHECK_EQ(rcb.buffers.size(), 1);
640   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
641   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
642                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
643
644   ASSERT_FALSE(socket->isClosedBySelf());
645   ASSERT_FALSE(socket->isClosedByPeer());
646 }
647
648 /**
649  * Test reading, writing, and shutting down writes before the connect attempt
650  * finishes.
651  */
652 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
653   TestServer server;
654
655   // connect()
656   EventBase evb;
657   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
658   ConnCallback ccb;
659   socket->connect(&ccb, server.getAddress(), 30);
660
661   // Hopefully the connect didn't succeed immediately.
662   // If it did, we can't exercise the write-while-connecting code path.
663   if (ccb.state == STATE_SUCCEEDED) {
664     LOG(INFO) << "connect() succeeded immediately; skipping test";
665     return;
666   }
667
668   // Install a read callback
669   ReadCallback rcb;
670   socket->setReadCB(&rcb);
671
672   // Ask to write some data
673   char wbuf[128];
674   memset(wbuf, 'a', sizeof(wbuf));
675   WriteCallback wcb;
676   socket->write(&wcb, wbuf, sizeof(wbuf));
677
678   // Shutdown writes
679   socket->shutdownWrite();
680
681   // Even though we haven't looped yet, we should be able to accept
682   // the connection.
683   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
684
685   // Since the connection is still in progress, there should be no data to
686   // read yet.  Verify that the accepted socket is not readable.
687   struct pollfd fds[1];
688   fds[0].fd = acceptedSocket->getSocketFD();
689   fds[0].events = POLLIN;
690   fds[0].revents = 0;
691   int rc = poll(fds, 1, 0);
692   CHECK_EQ(rc, 0);
693
694   // Write data to the accepted socket
695   uint8_t acceptedWbuf[192];
696   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
697   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
698   acceptedSocket->flush();
699   // Shutdown writes to the accepted socket.  This will cause it to see EOF
700   // and uninstall the read callback.
701   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
702
703   // Loop
704   evb.loop();
705
706   // The loop should have completed the connection, written the queued data,
707   // shutdown writes on the socket, read the data we wrote to it, and see the
708   // EOF.
709   //
710   // Check that the connection was completed successfully and that the read
711   // and write callbacks were invoked as expected.
712   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
713   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
714   CHECK_EQ(rcb.buffers.size(), 1);
715   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
716   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
717                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
718   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
719
720   // Check that we can read the data that was written to the socket, and that
721   // we see an EOF, since its socket was half-shutdown.
722   uint8_t readbuf[sizeof(wbuf)];
723   acceptedSocket->readAll(readbuf, sizeof(readbuf));
724   CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
725   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
726   CHECK_EQ(bytesRead, 0);
727
728   // Fully close both sockets
729   acceptedSocket->close();
730   socket->close();
731
732   ASSERT_FALSE(socket->isClosedBySelf());
733   ASSERT_TRUE(socket->isClosedByPeer());
734 }
735
736 /**
737  * Test reading, writing, and calling shutdownWriteNow() before the
738  * connect attempt finishes.
739  */
740 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
741   TestServer server;
742
743   // connect()
744   EventBase evb;
745   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
746   ConnCallback ccb;
747   socket->connect(&ccb, server.getAddress(), 30);
748
749   // Hopefully the connect didn't succeed immediately.
750   // If it did, we can't exercise the write-while-connecting code path.
751   if (ccb.state == STATE_SUCCEEDED) {
752     LOG(INFO) << "connect() succeeded immediately; skipping test";
753     return;
754   }
755
756   // Install a read callback
757   ReadCallback rcb;
758   socket->setReadCB(&rcb);
759
760   // Ask to write some data
761   char wbuf[128];
762   memset(wbuf, 'a', sizeof(wbuf));
763   WriteCallback wcb;
764   socket->write(&wcb, wbuf, sizeof(wbuf));
765
766   // Shutdown writes immediately.
767   // This should immediately discard the data that we just tried to write.
768   socket->shutdownWriteNow();
769
770   // Verify that writeError() was invoked on the write callback.
771   CHECK_EQ(wcb.state, STATE_FAILED);
772   CHECK_EQ(wcb.bytesWritten, 0);
773
774   // Even though we haven't looped yet, we should be able to accept
775   // the connection.
776   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
777
778   // Since the connection is still in progress, there should be no data to
779   // read yet.  Verify that the accepted socket is not readable.
780   struct pollfd fds[1];
781   fds[0].fd = acceptedSocket->getSocketFD();
782   fds[0].events = POLLIN;
783   fds[0].revents = 0;
784   int rc = poll(fds, 1, 0);
785   CHECK_EQ(rc, 0);
786
787   // Write data to the accepted socket
788   uint8_t acceptedWbuf[192];
789   memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
790   acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
791   acceptedSocket->flush();
792   // Shutdown writes to the accepted socket.  This will cause it to see EOF
793   // and uninstall the read callback.
794   shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
795
796   // Loop
797   evb.loop();
798
799   // The loop should have completed the connection, written the queued data,
800   // shutdown writes on the socket, read the data we wrote to it, and see the
801   // EOF.
802   //
803   // Check that the connection was completed successfully and that the read
804   // callback was invoked as expected.
805   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
806   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
807   CHECK_EQ(rcb.buffers.size(), 1);
808   CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
809   CHECK_EQ(memcmp(rcb.buffers[0].buffer,
810                            acceptedWbuf, sizeof(acceptedWbuf)), 0);
811
812   // Since we used shutdownWriteNow(), it should have discarded all pending
813   // write data.  Verify we see an immediate EOF when reading from the accepted
814   // socket.
815   uint8_t readbuf[sizeof(wbuf)];
816   uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
817   CHECK_EQ(bytesRead, 0);
818
819   // Fully close both sockets
820   acceptedSocket->close();
821   socket->close();
822
823   ASSERT_FALSE(socket->isClosedBySelf());
824   ASSERT_TRUE(socket->isClosedByPeer());
825 }
826
827 // Helper function for use in testConnectOptWrite()
828 // Temporarily disable the read callback
829 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
830   // Uninstall the read callback
831   socket->setReadCB(nullptr);
832   // Schedule the read callback to be reinstalled after 1ms
833   socket->getEventBase()->runInLoop(
834       std::bind(&AsyncSocket::setReadCB, socket, rcb));
835 }
836
837 /**
838  * Test connect+write, then have the connect callback perform another write.
839  *
840  * This tests interaction of the optimistic writing after connect with
841  * additional write attempts that occur in the connect callback.
842  */
843 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
844   TestServer server;
845   EventBase evb;
846   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
847
848   // connect()
849   ConnCallback ccb;
850   socket->connect(&ccb, server.getAddress(), 30);
851
852   // Hopefully the connect didn't succeed immediately.
853   // If it did, we can't exercise the optimistic write code path.
854   if (ccb.state == STATE_SUCCEEDED) {
855     LOG(INFO) << "connect() succeeded immediately; aborting test "
856                        "of optimistic write behavior";
857     return;
858   }
859
860   // Tell the connect callback to perform a write when the connect succeeds
861   WriteCallback wcb2;
862   scoped_array<char> buf2(new char[size2]);
863   memset(buf2.get(), 'b', size2);
864   if (size2 > 0) {
865     ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
866     // Tell the second write callback to close the connection when it is done
867     wcb2.successCallback = [&] { socket->closeNow(); };
868   }
869
870   // Schedule one write() immediately, before the connect finishes
871   scoped_array<char> buf1(new char[size1]);
872   memset(buf1.get(), 'a', size1);
873   WriteCallback wcb1;
874   if (size1 > 0) {
875     socket->write(&wcb1, buf1.get(), size1);
876   }
877
878   if (close) {
879     // immediately perform a close, before connect() completes
880     socket->close();
881   }
882
883   // Start reading from the other endpoint after 10ms.
884   // If we're using large buffers, we have to read so that the writes don't
885   // block forever.
886   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
887   ReadCallback rcb;
888   rcb.dataAvailableCallback = std::bind(tmpDisableReads,
889                                         acceptedSocket.get(), &rcb);
890   socket->getEventBase()->tryRunAfterDelay(
891       std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
892       10);
893
894   // Loop.  We don't bother accepting on the server socket yet.
895   // The kernel should be able to buffer the write request so it can succeed.
896   evb.loop();
897
898   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
899   if (size1 > 0) {
900     CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
901   }
902   if (size2 > 0) {
903     CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
904   }
905
906   socket->close();
907
908   // Make sure the read callback received all of the data
909   size_t bytesRead = 0;
910   for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
911        it != rcb.buffers.end();
912        ++it) {
913     size_t start = bytesRead;
914     bytesRead += it->length;
915     size_t end = bytesRead;
916     if (start < size1) {
917       size_t cmpLen = min(size1, end) - start;
918       CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
919     }
920     if (end > size1 && end <= size1 + size2) {
921       size_t itOffset;
922       size_t buf2Offset;
923       size_t cmpLen;
924       if (start >= size1) {
925         itOffset = 0;
926         buf2Offset = start - size1;
927         cmpLen = end - start;
928       } else {
929         itOffset = size1 - start;
930         buf2Offset = 0;
931         cmpLen = end - size1;
932       }
933       CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
934                                cmpLen),
935                         0);
936     }
937   }
938   CHECK_EQ(bytesRead, size1 + size2);
939 }
940
941 TEST(AsyncSocketTest, ConnectCallbackWrite) {
942   // Test using small writes that should both succeed immediately
943   testConnectOptWrite(100, 200);
944
945   // Test using a large buffer in the connect callback, that should block
946   const size_t largeSize = 8*1024*1024;
947   testConnectOptWrite(100, largeSize);
948
949   // Test using a large initial write
950   testConnectOptWrite(largeSize, 100);
951
952   // Test using two large buffers
953   testConnectOptWrite(largeSize, largeSize);
954
955   // Test a small write in the connect callback,
956   // but no immediate write before connect completes
957   testConnectOptWrite(0, 64);
958
959   // Test a large write in the connect callback,
960   // but no immediate write before connect completes
961   testConnectOptWrite(0, largeSize);
962
963   // Test connect, a small write, then immediately call close() before connect
964   // completes
965   testConnectOptWrite(211, 0, true);
966
967   // Test connect, a large immediate write (that will block), then immediately
968   // call close() before connect completes
969   testConnectOptWrite(largeSize, 0, true);
970 }
971
972 ///////////////////////////////////////////////////////////////////////////
973 // write() related tests
974 ///////////////////////////////////////////////////////////////////////////
975
976 /**
977  * Test writing using a nullptr callback
978  */
979 TEST(AsyncSocketTest, WriteNullCallback) {
980   TestServer server;
981
982   // connect()
983   EventBase evb;
984   std::shared_ptr<AsyncSocket> socket =
985     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
986   evb.loop(); // loop until the socket is connected
987
988   // write() with a nullptr callback
989   char buf[128];
990   memset(buf, 'a', sizeof(buf));
991   socket->write(nullptr, buf, sizeof(buf));
992
993   evb.loop(); // loop until the data is sent
994
995   // Make sure the server got a connection and received the data
996   socket->close();
997   server.verifyConnection(buf, sizeof(buf));
998
999   ASSERT_TRUE(socket->isClosedBySelf());
1000   ASSERT_FALSE(socket->isClosedByPeer());
1001 }
1002
1003 /**
1004  * Test writing with a send timeout
1005  */
1006 TEST(AsyncSocketTest, WriteTimeout) {
1007   TestServer server;
1008
1009   // connect()
1010   EventBase evb;
1011   std::shared_ptr<AsyncSocket> socket =
1012     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1013   evb.loop(); // loop until the socket is connected
1014
1015   // write() a large chunk of data, with no-one on the other end reading
1016   size_t writeLength = 8*1024*1024;
1017   uint32_t timeout = 200;
1018   socket->setSendTimeout(timeout);
1019   scoped_array<char> buf(new char[writeLength]);
1020   memset(buf.get(), 'a', writeLength);
1021   WriteCallback wcb;
1022   socket->write(&wcb, buf.get(), writeLength);
1023
1024   TimePoint start;
1025   evb.loop();
1026   TimePoint end;
1027
1028   // Make sure the write attempt timed out as requested
1029   CHECK_EQ(wcb.state, STATE_FAILED);
1030   CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
1031
1032   // Check that the write timed out within a reasonable period of time.
1033   // We don't check for exactly the specified timeout, since AsyncSocket only
1034   // times out when it hasn't made progress for that period of time.
1035   //
1036   // On linux, the first write sends a few hundred kb of data, then blocks for
1037   // writability, and then unblocks again after 40ms and is able to write
1038   // another smaller of data before blocking permanently.  Therefore it doesn't
1039   // time out until 40ms + timeout.
1040   //
1041   // I haven't fully verified the cause of this, but I believe it probably
1042   // occurs because the receiving end delays sending an ack for up to 40ms.
1043   // (This is the default value for TCP_DELACK_MIN.)  Once the sender receives
1044   // the ack, it can send some more data.  However, after that point the
1045   // receiver's kernel buffer is full.  This 40ms delay happens even with
1046   // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints.  However, the
1047   // kernel may be automatically disabling TCP_QUICKACK after receiving some
1048   // data.
1049   //
1050   // For now, we simply check that the timeout occurred within 160ms of
1051   // the requested value.
1052   T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1053 }
1054
1055 /**
1056  * Test writing to a socket that the remote endpoint has closed
1057  */
1058 TEST(AsyncSocketTest, WritePipeError) {
1059   TestServer server;
1060
1061   // connect()
1062   EventBase evb;
1063   std::shared_ptr<AsyncSocket> socket =
1064     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1065   socket->setSendTimeout(1000);
1066   evb.loop(); // loop until the socket is connected
1067
1068   // accept and immediately close the socket
1069   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1070   acceptedSocket.reset();
1071
1072   // write() a large chunk of data
1073   size_t writeLength = 8*1024*1024;
1074   scoped_array<char> buf(new char[writeLength]);
1075   memset(buf.get(), 'a', writeLength);
1076   WriteCallback wcb;
1077   socket->write(&wcb, buf.get(), writeLength);
1078
1079   evb.loop();
1080
1081   // Make sure the write failed.
1082   // It would be nice if AsyncSocketException could convey the errno value,
1083   // so that we could check for EPIPE
1084   CHECK_EQ(wcb.state, STATE_FAILED);
1085   CHECK_EQ(wcb.exception.getType(),
1086                     AsyncSocketException::INTERNAL_ERROR);
1087
1088   ASSERT_FALSE(socket->isClosedBySelf());
1089   ASSERT_FALSE(socket->isClosedByPeer());
1090 }
1091
1092 /**
1093  * Test writing a mix of simple buffers and IOBufs
1094  */
1095 TEST(AsyncSocketTest, WriteIOBuf) {
1096   TestServer server;
1097
1098   // connect()
1099   EventBase evb;
1100   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1101   ConnCallback ccb;
1102   socket->connect(&ccb, server.getAddress(), 30);
1103
1104   // Accept the connection
1105   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1106   ReadCallback rcb;
1107   acceptedSocket->setReadCB(&rcb);
1108
1109   // Write a simple buffer to the socket
1110   constexpr size_t simpleBufLength = 5;
1111   char simpleBuf[simpleBufLength];
1112   memset(simpleBuf, 'a', simpleBufLength);
1113   WriteCallback wcb;
1114   socket->write(&wcb, simpleBuf, simpleBufLength);
1115
1116   // Write a single-element IOBuf chain
1117   size_t buf1Length = 7;
1118   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1119   memset(buf1->writableData(), 'b', buf1Length);
1120   buf1->append(buf1Length);
1121   unique_ptr<IOBuf> buf1Copy(buf1->clone());
1122   WriteCallback wcb2;
1123   socket->writeChain(&wcb2, std::move(buf1));
1124
1125   // Write a multiple-element IOBuf chain
1126   size_t buf2Length = 11;
1127   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1128   memset(buf2->writableData(), 'c', buf2Length);
1129   buf2->append(buf2Length);
1130   size_t buf3Length = 13;
1131   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1132   memset(buf3->writableData(), 'd', buf3Length);
1133   buf3->append(buf3Length);
1134   buf2->appendChain(std::move(buf3));
1135   unique_ptr<IOBuf> buf2Copy(buf2->clone());
1136   buf2Copy->coalesce();
1137   WriteCallback wcb3;
1138   socket->writeChain(&wcb3, std::move(buf2));
1139   socket->shutdownWrite();
1140
1141   // Let the reads and writes run to completion
1142   evb.loop();
1143
1144   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1145   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1146   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1147
1148   // Make sure the reader got the right data in the right order
1149   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1150   CHECK_EQ(rcb.buffers.size(), 1);
1151   CHECK_EQ(rcb.buffers[0].length,
1152       simpleBufLength + buf1Length + buf2Length + buf3Length);
1153   CHECK_EQ(
1154       memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1155   CHECK_EQ(
1156       memcmp(rcb.buffers[0].buffer + simpleBufLength,
1157           buf1Copy->data(), buf1Copy->length()), 0);
1158   CHECK_EQ(
1159       memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1160           buf2Copy->data(), buf2Copy->length()), 0);
1161
1162   acceptedSocket->close();
1163   socket->close();
1164
1165   ASSERT_TRUE(socket->isClosedBySelf());
1166   ASSERT_FALSE(socket->isClosedByPeer());
1167 }
1168
1169 TEST(AsyncSocketTest, WriteIOBufCorked) {
1170   TestServer server;
1171
1172   // connect()
1173   EventBase evb;
1174   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1175   ConnCallback ccb;
1176   socket->connect(&ccb, server.getAddress(), 30);
1177
1178   // Accept the connection
1179   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1180   ReadCallback rcb;
1181   acceptedSocket->setReadCB(&rcb);
1182
1183   // Do three writes, 100ms apart, with the "cork" flag set
1184   // on the second write.  The reader should see the first write
1185   // arrive by itself, followed by the second and third writes
1186   // arriving together.
1187   size_t buf1Length = 5;
1188   unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1189   memset(buf1->writableData(), 'a', buf1Length);
1190   buf1->append(buf1Length);
1191   size_t buf2Length = 7;
1192   unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1193   memset(buf2->writableData(), 'b', buf2Length);
1194   buf2->append(buf2Length);
1195   size_t buf3Length = 11;
1196   unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1197   memset(buf3->writableData(), 'c', buf3Length);
1198   buf3->append(buf3Length);
1199   WriteCallback wcb1;
1200   socket->writeChain(&wcb1, std::move(buf1));
1201   WriteCallback wcb2;
1202   DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1203   write2.scheduleTimeout(100);
1204   WriteCallback wcb3;
1205   DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1206   write3.scheduleTimeout(140);
1207
1208   evb.loop();
1209   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1210   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1211   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1212   if (wcb3.state != STATE_SUCCEEDED) {
1213     throw(wcb3.exception);
1214   }
1215   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1216
1217   // Make sure the reader got the data with the right grouping
1218   CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1219   CHECK_EQ(rcb.buffers.size(), 2);
1220   CHECK_EQ(rcb.buffers[0].length, buf1Length);
1221   CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1222
1223   acceptedSocket->close();
1224   socket->close();
1225
1226   ASSERT_TRUE(socket->isClosedBySelf());
1227   ASSERT_FALSE(socket->isClosedByPeer());
1228 }
1229
1230 /**
1231  * Test performing a zero-length write
1232  */
1233 TEST(AsyncSocketTest, ZeroLengthWrite) {
1234   TestServer server;
1235
1236   // connect()
1237   EventBase evb;
1238   std::shared_ptr<AsyncSocket> socket =
1239     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1240   evb.loop(); // loop until the socket is connected
1241
1242   auto acceptedSocket = server.acceptAsync(&evb);
1243   ReadCallback rcb;
1244   acceptedSocket->setReadCB(&rcb);
1245
1246   size_t len1 = 1024*1024;
1247   size_t len2 = 1024*1024;
1248   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1249   memset(buf.get(), 'a', len1);
1250   memset(buf.get(), 'b', len2);
1251
1252   WriteCallback wcb1;
1253   WriteCallback wcb2;
1254   WriteCallback wcb3;
1255   WriteCallback wcb4;
1256   socket->write(&wcb1, buf.get(), 0);
1257   socket->write(&wcb2, buf.get(), len1);
1258   socket->write(&wcb3, buf.get() + len1, 0);
1259   socket->write(&wcb4, buf.get() + len1, len2);
1260   socket->close();
1261
1262   evb.loop(); // loop until the data is sent
1263
1264   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1265   CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1266   CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1267   CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1268   rcb.verifyData(buf.get(), len1 + len2);
1269
1270   ASSERT_TRUE(socket->isClosedBySelf());
1271   ASSERT_FALSE(socket->isClosedByPeer());
1272 }
1273
1274 TEST(AsyncSocketTest, ZeroLengthWritev) {
1275   TestServer server;
1276
1277   // connect()
1278   EventBase evb;
1279   std::shared_ptr<AsyncSocket> socket =
1280     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1281   evb.loop(); // loop until the socket is connected
1282
1283   auto acceptedSocket = server.acceptAsync(&evb);
1284   ReadCallback rcb;
1285   acceptedSocket->setReadCB(&rcb);
1286
1287   size_t len1 = 1024*1024;
1288   size_t len2 = 1024*1024;
1289   std::unique_ptr<char[]> buf(new char[len1 + len2]);
1290   memset(buf.get(), 'a', len1);
1291   memset(buf.get(), 'b', len2);
1292
1293   WriteCallback wcb;
1294   constexpr size_t iovCount = 4;
1295   struct iovec iov[iovCount];
1296   iov[0].iov_base = buf.get();
1297   iov[0].iov_len = len1;
1298   iov[1].iov_base = buf.get() + len1;
1299   iov[1].iov_len = 0;
1300   iov[2].iov_base = buf.get() + len1;
1301   iov[2].iov_len = len2;
1302   iov[3].iov_base = buf.get() + len1 + len2;
1303   iov[3].iov_len = 0;
1304
1305   socket->writev(&wcb, iov, iovCount);
1306   socket->close();
1307   evb.loop(); // loop until the data is sent
1308
1309   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1310   rcb.verifyData(buf.get(), len1 + len2);
1311
1312   ASSERT_TRUE(socket->isClosedBySelf());
1313   ASSERT_FALSE(socket->isClosedByPeer());
1314 }
1315
1316 ///////////////////////////////////////////////////////////////////////////
1317 // close() related tests
1318 ///////////////////////////////////////////////////////////////////////////
1319
1320 /**
1321  * Test calling close() with pending writes when the socket is already closing.
1322  */
1323 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1324   TestServer server;
1325
1326   // connect()
1327   EventBase evb;
1328   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1329   ConnCallback ccb;
1330   socket->connect(&ccb, server.getAddress(), 30);
1331
1332   // accept the socket on the server side
1333   std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1334
1335   // Loop to ensure the connect has completed
1336   evb.loop();
1337
1338   // Make sure we are connected
1339   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1340
1341   // Schedule pending writes, until several write attempts have blocked
1342   char buf[128];
1343   memset(buf, 'a', sizeof(buf));
1344   typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1345   WriteCallbackVector writeCallbacks;
1346
1347   writeCallbacks.reserve(5);
1348   while (writeCallbacks.size() < 5) {
1349     std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1350
1351     socket->write(wcb.get(), buf, sizeof(buf));
1352     if (wcb->state == STATE_SUCCEEDED) {
1353       // Succeeded immediately.  Keep performing more writes
1354       continue;
1355     }
1356
1357     // This write is blocked.
1358     // Have the write callback call close() when writeError() is invoked
1359     wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1360     writeCallbacks.push_back(wcb);
1361   }
1362
1363   // Call closeNow() to immediately fail the pending writes
1364   socket->closeNow();
1365
1366   // Make sure writeError() was invoked on all of the pending write callbacks
1367   for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1368        it != writeCallbacks.end();
1369        ++it) {
1370     CHECK_EQ((*it)->state, STATE_FAILED);
1371   }
1372
1373   ASSERT_TRUE(socket->isClosedBySelf());
1374   ASSERT_FALSE(socket->isClosedByPeer());
1375 }
1376
1377 ///////////////////////////////////////////////////////////////////////////
1378 // ImmediateRead related tests
1379 ///////////////////////////////////////////////////////////////////////////
1380
1381 /* AsyncSocket use to verify immediate read works */
1382 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1383  public:
1384   bool immediateReadCalled = false;
1385   explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1386  protected:
1387   void checkForImmediateRead() noexcept override {
1388     immediateReadCalled = true;
1389     AsyncSocket::handleRead();
1390   }
1391 };
1392
1393 TEST(AsyncSocket, ConnectReadImmediateRead) {
1394   TestServer server;
1395
1396   const size_t maxBufferSz = 100;
1397   const size_t maxReadsPerEvent = 1;
1398   const size_t expectedDataSz = maxBufferSz * 3;
1399   char expectedData[expectedDataSz];
1400   memset(expectedData, 'j', expectedDataSz);
1401
1402   EventBase evb;
1403   ReadCallback rcb(maxBufferSz);
1404   AsyncSocketImmediateRead socket(&evb);
1405   socket.connect(nullptr, server.getAddress(), 30);
1406
1407   evb.loop(); // loop until the socket is connected
1408
1409   socket.setReadCB(&rcb);
1410   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1411   socket.immediateReadCalled = false;
1412
1413   auto acceptedSocket = server.acceptAsync(&evb);
1414
1415   ReadCallback rcbServer;
1416   WriteCallback wcbServer;
1417   rcbServer.dataAvailableCallback = [&]() {
1418     if (rcbServer.dataRead() == expectedDataSz) {
1419       // write back all data read
1420       rcbServer.verifyData(expectedData, expectedDataSz);
1421       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1422       acceptedSocket->close();
1423     }
1424   };
1425   acceptedSocket->setReadCB(&rcbServer);
1426
1427   // write data
1428   WriteCallback wcb1;
1429   socket.write(&wcb1, expectedData, expectedDataSz);
1430   evb.loop();
1431   CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1432   rcb.verifyData(expectedData, expectedDataSz);
1433   CHECK_EQ(socket.immediateReadCalled, true);
1434
1435   ASSERT_FALSE(socket.isClosedBySelf());
1436   ASSERT_FALSE(socket.isClosedByPeer());
1437 }
1438
1439 TEST(AsyncSocket, ConnectReadUninstallRead) {
1440   TestServer server;
1441
1442   const size_t maxBufferSz = 100;
1443   const size_t maxReadsPerEvent = 1;
1444   const size_t expectedDataSz = maxBufferSz * 3;
1445   char expectedData[expectedDataSz];
1446   memset(expectedData, 'k', expectedDataSz);
1447
1448   EventBase evb;
1449   ReadCallback rcb(maxBufferSz);
1450   AsyncSocketImmediateRead socket(&evb);
1451   socket.connect(nullptr, server.getAddress(), 30);
1452
1453   evb.loop(); // loop until the socket is connected
1454
1455   socket.setReadCB(&rcb);
1456   socket.setMaxReadsPerEvent(maxReadsPerEvent);
1457   socket.immediateReadCalled = false;
1458
1459   auto acceptedSocket = server.acceptAsync(&evb);
1460
1461   ReadCallback rcbServer;
1462   WriteCallback wcbServer;
1463   rcbServer.dataAvailableCallback = [&]() {
1464     if (rcbServer.dataRead() == expectedDataSz) {
1465       // write back all data read
1466       rcbServer.verifyData(expectedData, expectedDataSz);
1467       acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1468       acceptedSocket->close();
1469     }
1470   };
1471   acceptedSocket->setReadCB(&rcbServer);
1472
1473   rcb.dataAvailableCallback = [&]() {
1474     // we read data and reset readCB
1475     socket.setReadCB(nullptr);
1476   };
1477
1478   // write data
1479   WriteCallback wcb;
1480   socket.write(&wcb, expectedData, expectedDataSz);
1481   evb.loop();
1482   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1483
1484   /* we shoud've only read maxBufferSz data since readCallback_
1485    * was reset in dataAvailableCallback */
1486   CHECK_EQ(rcb.dataRead(), maxBufferSz);
1487   CHECK_EQ(socket.immediateReadCalled, false);
1488
1489   ASSERT_FALSE(socket.isClosedBySelf());
1490   ASSERT_FALSE(socket.isClosedByPeer());
1491 }
1492
1493 // TODO:
1494 // - Test connect() and have the connect callback set the read callback
1495 // - Test connect() and have the connect callback unset the read callback
1496 // - Test reading/writing/closing/destroying the socket in the connect callback
1497 // - Test reading/writing/closing/destroying the socket in the read callback
1498 // - Test reading/writing/closing/destroying the socket in the write callback
1499 // - Test one-way shutdown behavior
1500 // - Test changing the EventBase
1501 //
1502 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1503 //   in connectSuccess(), readDataAvailable(), writeSuccess()
1504
1505
1506 ///////////////////////////////////////////////////////////////////////////
1507 // AsyncServerSocket tests
1508 ///////////////////////////////////////////////////////////////////////////
1509 namespace {
1510 /**
1511  * Helper ConnectionEventCallback class for the test code.
1512  * It maintains counters protected by a spin lock.
1513  */
1514 class TestConnectionEventCallback :
1515   public AsyncServerSocket::ConnectionEventCallback {
1516  public:
1517   virtual void onConnectionAccepted(
1518       const int /* socket */,
1519       const SocketAddress& /* addr */) noexcept override {
1520     folly::RWSpinLock::WriteHolder holder(spinLock_);
1521     connectionAccepted_++;
1522   }
1523
1524   virtual void onConnectionAcceptError(const int /* err */) noexcept override {
1525     folly::RWSpinLock::WriteHolder holder(spinLock_);
1526     connectionAcceptedError_++;
1527   }
1528
1529   virtual void onConnectionDropped(
1530       const int /* socket */,
1531       const SocketAddress& /* addr */) noexcept override {
1532     folly::RWSpinLock::WriteHolder holder(spinLock_);
1533     connectionDropped_++;
1534   }
1535
1536   virtual void onConnectionEnqueuedForAcceptorCallback(
1537       const int /* socket */,
1538       const SocketAddress& /* addr */) noexcept override {
1539     folly::RWSpinLock::WriteHolder holder(spinLock_);
1540     connectionEnqueuedForAcceptCallback_++;
1541   }
1542
1543   virtual void onConnectionDequeuedByAcceptorCallback(
1544       const int /* socket */,
1545       const SocketAddress& /* addr */) noexcept override {
1546     folly::RWSpinLock::WriteHolder holder(spinLock_);
1547     connectionDequeuedByAcceptCallback_++;
1548   }
1549
1550   virtual void onBackoffStarted() noexcept override {
1551     folly::RWSpinLock::WriteHolder holder(spinLock_);
1552     backoffStarted_++;
1553   }
1554
1555   virtual void onBackoffEnded() noexcept override {
1556     folly::RWSpinLock::WriteHolder holder(spinLock_);
1557     backoffEnded_++;
1558   }
1559
1560   virtual void onBackoffError() noexcept override {
1561     folly::RWSpinLock::WriteHolder holder(spinLock_);
1562     backoffError_++;
1563   }
1564
1565   unsigned int getConnectionAccepted() const {
1566     folly::RWSpinLock::ReadHolder holder(spinLock_);
1567     return connectionAccepted_;
1568   }
1569
1570   unsigned int getConnectionAcceptedError() const {
1571     folly::RWSpinLock::ReadHolder holder(spinLock_);
1572     return connectionAcceptedError_;
1573   }
1574
1575   unsigned int getConnectionDropped() const {
1576     folly::RWSpinLock::ReadHolder holder(spinLock_);
1577     return connectionDropped_;
1578   }
1579
1580   unsigned int getConnectionEnqueuedForAcceptCallback() const {
1581     folly::RWSpinLock::ReadHolder holder(spinLock_);
1582     return connectionEnqueuedForAcceptCallback_;
1583   }
1584
1585   unsigned int getConnectionDequeuedByAcceptCallback() const {
1586     folly::RWSpinLock::ReadHolder holder(spinLock_);
1587     return connectionDequeuedByAcceptCallback_;
1588   }
1589
1590   unsigned int getBackoffStarted() const {
1591     folly::RWSpinLock::ReadHolder holder(spinLock_);
1592     return backoffStarted_;
1593   }
1594
1595   unsigned int getBackoffEnded() const {
1596     folly::RWSpinLock::ReadHolder holder(spinLock_);
1597     return backoffEnded_;
1598   }
1599
1600   unsigned int getBackoffError() const {
1601     folly::RWSpinLock::ReadHolder holder(spinLock_);
1602     return backoffError_;
1603   }
1604
1605  private:
1606   mutable folly::RWSpinLock spinLock_;
1607   unsigned int connectionAccepted_{0};
1608   unsigned int connectionAcceptedError_{0};
1609   unsigned int connectionDropped_{0};
1610   unsigned int connectionEnqueuedForAcceptCallback_{0};
1611   unsigned int connectionDequeuedByAcceptCallback_{0};
1612   unsigned int backoffStarted_{0};
1613   unsigned int backoffEnded_{0};
1614   unsigned int backoffError_{0};
1615 };
1616
1617 /**
1618  * Helper AcceptCallback class for the test code
1619  * It records the callbacks that were invoked, and also supports calling
1620  * generic std::function objects in each callback.
1621  */
1622 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1623  public:
1624   enum EventType {
1625     TYPE_START,
1626     TYPE_ACCEPT,
1627     TYPE_ERROR,
1628     TYPE_STOP
1629   };
1630   struct EventInfo {
1631     EventInfo(int fd, const folly::SocketAddress& addr)
1632       : type(TYPE_ACCEPT),
1633         fd(fd),
1634         address(addr),
1635         errorMsg() {}
1636     explicit EventInfo(const std::string& msg)
1637       : type(TYPE_ERROR),
1638         fd(-1),
1639         address(),
1640         errorMsg(msg) {}
1641     explicit EventInfo(EventType et)
1642       : type(et),
1643         fd(-1),
1644         address(),
1645         errorMsg() {}
1646
1647     EventType type;
1648     int fd;  // valid for TYPE_ACCEPT
1649     folly::SocketAddress address;  // valid for TYPE_ACCEPT
1650     string errorMsg;  // valid for TYPE_ERROR
1651   };
1652   typedef std::deque<EventInfo> EventList;
1653
1654   TestAcceptCallback()
1655     : connectionAcceptedFn_(),
1656       acceptErrorFn_(),
1657       acceptStoppedFn_(),
1658       events_() {}
1659
1660   std::deque<EventInfo>* getEvents() {
1661     return &events_;
1662   }
1663
1664   void setConnectionAcceptedFn(
1665       const std::function<void(int, const folly::SocketAddress&)>& fn) {
1666     connectionAcceptedFn_ = fn;
1667   }
1668   void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1669     acceptErrorFn_ = fn;
1670   }
1671   void setAcceptStartedFn(const std::function<void()>& fn) {
1672     acceptStartedFn_ = fn;
1673   }
1674   void setAcceptStoppedFn(const std::function<void()>& fn) {
1675     acceptStoppedFn_ = fn;
1676   }
1677
1678   void connectionAccepted(
1679       int fd, const folly::SocketAddress& clientAddr) noexcept override {
1680     events_.emplace_back(fd, clientAddr);
1681
1682     if (connectionAcceptedFn_) {
1683       connectionAcceptedFn_(fd, clientAddr);
1684     }
1685   }
1686   void acceptError(const std::exception& ex) noexcept override {
1687     events_.emplace_back(ex.what());
1688
1689     if (acceptErrorFn_) {
1690       acceptErrorFn_(ex);
1691     }
1692   }
1693   void acceptStarted() noexcept override {
1694     events_.emplace_back(TYPE_START);
1695
1696     if (acceptStartedFn_) {
1697       acceptStartedFn_();
1698     }
1699   }
1700   void acceptStopped() noexcept override {
1701     events_.emplace_back(TYPE_STOP);
1702
1703     if (acceptStoppedFn_) {
1704       acceptStoppedFn_();
1705     }
1706   }
1707
1708  private:
1709   std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1710   std::function<void(const std::exception&)> acceptErrorFn_;
1711   std::function<void()> acceptStartedFn_;
1712   std::function<void()> acceptStoppedFn_;
1713
1714   std::deque<EventInfo> events_;
1715 };
1716 }
1717
1718 /**
1719  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1720  */
1721 TEST(AsyncSocketTest, ServerAcceptOptions) {
1722   EventBase eventBase;
1723
1724   // Create a server socket
1725   std::shared_ptr<AsyncServerSocket> serverSocket(
1726       AsyncServerSocket::newSocket(&eventBase));
1727   serverSocket->bind(0);
1728   serverSocket->listen(16);
1729   folly::SocketAddress serverAddress;
1730   serverSocket->getAddress(&serverAddress);
1731
1732   // Add a callback to accept one connection then stop the loop
1733   TestAcceptCallback acceptCallback;
1734   acceptCallback.setConnectionAcceptedFn(
1735       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1736         serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1737       });
1738   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1739     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1740   });
1741   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
1742   serverSocket->startAccepting();
1743
1744   // Connect to the server socket
1745   std::shared_ptr<AsyncSocket> socket(
1746       AsyncSocket::newSocket(&eventBase, serverAddress));
1747
1748   eventBase.loop();
1749
1750   // Verify that the server accepted a connection
1751   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1752   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1753                     TestAcceptCallback::TYPE_START);
1754   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1755                     TestAcceptCallback::TYPE_ACCEPT);
1756   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1757                     TestAcceptCallback::TYPE_STOP);
1758   int fd = acceptCallback.getEvents()->at(1).fd;
1759
1760   // The accepted connection should already be in non-blocking mode
1761   int flags = fcntl(fd, F_GETFL, 0);
1762   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1763
1764 #ifndef TCP_NOPUSH
1765   // The accepted connection should already have TCP_NODELAY set
1766   int value;
1767   socklen_t valueLength = sizeof(value);
1768   int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1769   CHECK_EQ(rc, 0);
1770   CHECK_EQ(value, 1);
1771 #endif
1772 }
1773
1774 /**
1775  * Test AsyncServerSocket::removeAcceptCallback()
1776  */
1777 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1778   // Create a new AsyncServerSocket
1779   EventBase eventBase;
1780   std::shared_ptr<AsyncServerSocket> serverSocket(
1781       AsyncServerSocket::newSocket(&eventBase));
1782   serverSocket->bind(0);
1783   serverSocket->listen(16);
1784   folly::SocketAddress serverAddress;
1785   serverSocket->getAddress(&serverAddress);
1786
1787   // Add several accept callbacks
1788   TestAcceptCallback cb1;
1789   TestAcceptCallback cb2;
1790   TestAcceptCallback cb3;
1791   TestAcceptCallback cb4;
1792   TestAcceptCallback cb5;
1793   TestAcceptCallback cb6;
1794   TestAcceptCallback cb7;
1795
1796   // Test having callbacks remove other callbacks before them on the list,
1797   // after them on the list, or removing themselves.
1798   //
1799   // Have callback 2 remove callback 3 and callback 5 the first time it is
1800   // called.
1801   int cb2Count = 0;
1802   cb1.setConnectionAcceptedFn([&](int /* fd */,
1803                                   const folly::SocketAddress& /* addr */) {
1804     std::shared_ptr<AsyncSocket> sock2(
1805         AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1806   });
1807   cb3.setConnectionAcceptedFn(
1808       [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1809   cb4.setConnectionAcceptedFn(
1810       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1811         std::shared_ptr<AsyncSocket> sock3(
1812             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1813       });
1814   cb5.setConnectionAcceptedFn(
1815       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1816         std::shared_ptr<AsyncSocket> sock5(
1817             AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1818
1819       });
1820   cb2.setConnectionAcceptedFn(
1821       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1822         if (cb2Count == 0) {
1823           serverSocket->removeAcceptCallback(&cb3, nullptr);
1824           serverSocket->removeAcceptCallback(&cb5, nullptr);
1825         }
1826         ++cb2Count;
1827       });
1828   // Have callback 6 remove callback 4 the first time it is called,
1829   // and destroy the server socket the second time it is called
1830   int cb6Count = 0;
1831   cb6.setConnectionAcceptedFn(
1832       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1833         if (cb6Count == 0) {
1834           serverSocket->removeAcceptCallback(&cb4, nullptr);
1835           std::shared_ptr<AsyncSocket> sock6(
1836               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1837           std::shared_ptr<AsyncSocket> sock7(
1838               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1839           std::shared_ptr<AsyncSocket> sock8(
1840               AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1841
1842         } else {
1843           serverSocket.reset();
1844         }
1845         ++cb6Count;
1846       });
1847   // Have callback 7 remove itself
1848   cb7.setConnectionAcceptedFn(
1849       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1850         serverSocket->removeAcceptCallback(&cb7, nullptr);
1851       });
1852
1853   serverSocket->addAcceptCallback(&cb1, &eventBase);
1854   serverSocket->addAcceptCallback(&cb2, &eventBase);
1855   serverSocket->addAcceptCallback(&cb3, &eventBase);
1856   serverSocket->addAcceptCallback(&cb4, &eventBase);
1857   serverSocket->addAcceptCallback(&cb5, &eventBase);
1858   serverSocket->addAcceptCallback(&cb6, &eventBase);
1859   serverSocket->addAcceptCallback(&cb7, &eventBase);
1860   serverSocket->startAccepting();
1861
1862   // Make several connections to the socket
1863   std::shared_ptr<AsyncSocket> sock1(
1864       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1865   std::shared_ptr<AsyncSocket> sock4(
1866       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1867
1868   // Loop until we are stopped
1869   eventBase.loop();
1870
1871   // Check to make sure that the expected callbacks were invoked.
1872   //
1873   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1874   // the AcceptCallbacks in round-robin fashion, in the order that they were
1875   // added.  The code is implemented this way right now, but the API doesn't
1876   // explicitly require it be done this way.  If we change the code not to be
1877   // exactly round robin in the future, we can simplify the test checks here.
1878   // (We'll also need to update the termination code, since we expect cb6 to
1879   // get called twice to terminate the loop.)
1880   CHECK_EQ(cb1.getEvents()->size(), 4);
1881   CHECK_EQ(cb1.getEvents()->at(0).type,
1882                     TestAcceptCallback::TYPE_START);
1883   CHECK_EQ(cb1.getEvents()->at(1).type,
1884                     TestAcceptCallback::TYPE_ACCEPT);
1885   CHECK_EQ(cb1.getEvents()->at(2).type,
1886                     TestAcceptCallback::TYPE_ACCEPT);
1887   CHECK_EQ(cb1.getEvents()->at(3).type,
1888                     TestAcceptCallback::TYPE_STOP);
1889
1890   CHECK_EQ(cb2.getEvents()->size(), 4);
1891   CHECK_EQ(cb2.getEvents()->at(0).type,
1892                     TestAcceptCallback::TYPE_START);
1893   CHECK_EQ(cb2.getEvents()->at(1).type,
1894                     TestAcceptCallback::TYPE_ACCEPT);
1895   CHECK_EQ(cb2.getEvents()->at(2).type,
1896                     TestAcceptCallback::TYPE_ACCEPT);
1897   CHECK_EQ(cb2.getEvents()->at(3).type,
1898                     TestAcceptCallback::TYPE_STOP);
1899
1900   CHECK_EQ(cb3.getEvents()->size(), 2);
1901   CHECK_EQ(cb3.getEvents()->at(0).type,
1902                     TestAcceptCallback::TYPE_START);
1903   CHECK_EQ(cb3.getEvents()->at(1).type,
1904                     TestAcceptCallback::TYPE_STOP);
1905
1906   CHECK_EQ(cb4.getEvents()->size(), 3);
1907   CHECK_EQ(cb4.getEvents()->at(0).type,
1908                     TestAcceptCallback::TYPE_START);
1909   CHECK_EQ(cb4.getEvents()->at(1).type,
1910                     TestAcceptCallback::TYPE_ACCEPT);
1911   CHECK_EQ(cb4.getEvents()->at(2).type,
1912                     TestAcceptCallback::TYPE_STOP);
1913
1914   CHECK_EQ(cb5.getEvents()->size(), 2);
1915   CHECK_EQ(cb5.getEvents()->at(0).type,
1916                     TestAcceptCallback::TYPE_START);
1917   CHECK_EQ(cb5.getEvents()->at(1).type,
1918                     TestAcceptCallback::TYPE_STOP);
1919
1920   CHECK_EQ(cb6.getEvents()->size(), 4);
1921   CHECK_EQ(cb6.getEvents()->at(0).type,
1922                     TestAcceptCallback::TYPE_START);
1923   CHECK_EQ(cb6.getEvents()->at(1).type,
1924                     TestAcceptCallback::TYPE_ACCEPT);
1925   CHECK_EQ(cb6.getEvents()->at(2).type,
1926                     TestAcceptCallback::TYPE_ACCEPT);
1927   CHECK_EQ(cb6.getEvents()->at(3).type,
1928                     TestAcceptCallback::TYPE_STOP);
1929
1930   CHECK_EQ(cb7.getEvents()->size(), 3);
1931   CHECK_EQ(cb7.getEvents()->at(0).type,
1932                     TestAcceptCallback::TYPE_START);
1933   CHECK_EQ(cb7.getEvents()->at(1).type,
1934                     TestAcceptCallback::TYPE_ACCEPT);
1935   CHECK_EQ(cb7.getEvents()->at(2).type,
1936                     TestAcceptCallback::TYPE_STOP);
1937 }
1938
1939 /**
1940  * Test AsyncServerSocket::removeAcceptCallback()
1941  */
1942 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1943   // Create a new AsyncServerSocket
1944   EventBase eventBase;
1945   std::shared_ptr<AsyncServerSocket> serverSocket(
1946       AsyncServerSocket::newSocket(&eventBase));
1947   serverSocket->bind(0);
1948   serverSocket->listen(16);
1949   folly::SocketAddress serverAddress;
1950   serverSocket->getAddress(&serverAddress);
1951
1952   // Add several accept callbacks
1953   TestAcceptCallback cb1;
1954   auto thread_id = std::this_thread::get_id();
1955   cb1.setAcceptStartedFn([&](){
1956     CHECK_NE(thread_id, std::this_thread::get_id());
1957     thread_id = std::this_thread::get_id();
1958   });
1959   cb1.setConnectionAcceptedFn(
1960       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1961         CHECK_EQ(thread_id, std::this_thread::get_id());
1962         serverSocket->removeAcceptCallback(&cb1, &eventBase);
1963       });
1964   cb1.setAcceptStoppedFn([&](){
1965     CHECK_EQ(thread_id, std::this_thread::get_id());
1966   });
1967
1968   // Test having callbacks remove other callbacks before them on the list,
1969   serverSocket->addAcceptCallback(&cb1, &eventBase);
1970   serverSocket->startAccepting();
1971
1972   // Make several connections to the socket
1973   std::shared_ptr<AsyncSocket> sock1(
1974       AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1975
1976   // Loop in another thread
1977   auto other = std::thread([&](){
1978     eventBase.loop();
1979   });
1980   other.join();
1981
1982   // Check to make sure that the expected callbacks were invoked.
1983   //
1984   // NOTE: This code depends on the AsyncServerSocket operating calling all of
1985   // the AcceptCallbacks in round-robin fashion, in the order that they were
1986   // added.  The code is implemented this way right now, but the API doesn't
1987   // explicitly require it be done this way.  If we change the code not to be
1988   // exactly round robin in the future, we can simplify the test checks here.
1989   // (We'll also need to update the termination code, since we expect cb6 to
1990   // get called twice to terminate the loop.)
1991   CHECK_EQ(cb1.getEvents()->size(), 3);
1992   CHECK_EQ(cb1.getEvents()->at(0).type,
1993                     TestAcceptCallback::TYPE_START);
1994   CHECK_EQ(cb1.getEvents()->at(1).type,
1995                     TestAcceptCallback::TYPE_ACCEPT);
1996   CHECK_EQ(cb1.getEvents()->at(2).type,
1997                     TestAcceptCallback::TYPE_STOP);
1998
1999 }
2000
2001 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
2002   EventBase* eventBase = serverSocket->getEventBase();
2003   CHECK(eventBase);
2004
2005   // Add a callback to accept one connection then stop accepting
2006   TestAcceptCallback acceptCallback;
2007   acceptCallback.setConnectionAcceptedFn(
2008       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2009         serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
2010       });
2011   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2012     serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
2013   });
2014   serverSocket->addAcceptCallback(&acceptCallback, eventBase);
2015   serverSocket->startAccepting();
2016
2017   // Connect to the server socket
2018   folly::SocketAddress serverAddress;
2019   serverSocket->getAddress(&serverAddress);
2020   AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
2021
2022   // Loop to process all events
2023   eventBase->loop();
2024
2025   // Verify that the server accepted a connection
2026   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2027   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2028                     TestAcceptCallback::TYPE_START);
2029   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2030                     TestAcceptCallback::TYPE_ACCEPT);
2031   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2032                     TestAcceptCallback::TYPE_STOP);
2033 }
2034
2035 /* Verify that we don't leak sockets if we are destroyed()
2036  * and there are still writes pending
2037  *
2038  * If destroy() only calls close() instead of closeNow(),
2039  * it would shutdown(writes) on the socket, but it would
2040  * never be close()'d, and the socket would leak
2041  */
2042 TEST(AsyncSocketTest, DestroyCloseTest) {
2043   TestServer server;
2044
2045   // connect()
2046   EventBase clientEB;
2047   EventBase serverEB;
2048   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
2049   ConnCallback ccb;
2050   socket->connect(&ccb, server.getAddress(), 30);
2051
2052   // Accept the connection
2053   std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
2054   ReadCallback rcb;
2055   acceptedSocket->setReadCB(&rcb);
2056
2057   // Write a large buffer to the socket that is larger than kernel buffer
2058   size_t simpleBufLength = 5000000;
2059   char* simpleBuf = new char[simpleBufLength];
2060   memset(simpleBuf, 'a', simpleBufLength);
2061   WriteCallback wcb;
2062
2063   // Let the reads and writes run to completion
2064   int fd = acceptedSocket->getFd();
2065
2066   acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2067   socket.reset();
2068   acceptedSocket.reset();
2069
2070   // Test that server socket was closed
2071   ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2072   CHECK_EQ(sz, -1);
2073   CHECK_EQ(errno, 9);
2074   delete[] simpleBuf;
2075 }
2076
2077 /**
2078  * Test AsyncServerSocket::useExistingSocket()
2079  */
2080 TEST(AsyncSocketTest, ServerExistingSocket) {
2081   EventBase eventBase;
2082
2083   // Test creating a socket, and letting AsyncServerSocket bind and listen
2084   {
2085     // Manually create a socket
2086     int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2087     ASSERT_GE(fd, 0);
2088
2089     // Create a server socket
2090     AsyncServerSocket::UniquePtr serverSocket(
2091         new AsyncServerSocket(&eventBase));
2092     serverSocket->useExistingSocket(fd);
2093     folly::SocketAddress address;
2094     serverSocket->getAddress(&address);
2095     address.setPort(0);
2096     serverSocket->bind(address);
2097     serverSocket->listen(16);
2098
2099     // Make sure the socket works
2100     serverSocketSanityTest(serverSocket.get());
2101   }
2102
2103   // Test creating a socket and binding manually,
2104   // then letting AsyncServerSocket listen
2105   {
2106     // Manually create a socket
2107     int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2108     ASSERT_GE(fd, 0);
2109     // bind
2110     struct sockaddr_in addr;
2111     addr.sin_family = AF_INET;
2112     addr.sin_port = 0;
2113     addr.sin_addr.s_addr = INADDR_ANY;
2114     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2115                              sizeof(addr)), 0);
2116     // Look up the address that we bound to
2117     folly::SocketAddress boundAddress;
2118     boundAddress.setFromLocalAddress(fd);
2119
2120     // Create a server socket
2121     AsyncServerSocket::UniquePtr serverSocket(
2122         new AsyncServerSocket(&eventBase));
2123     serverSocket->useExistingSocket(fd);
2124     serverSocket->listen(16);
2125
2126     // Make sure AsyncServerSocket reports the same address that we bound to
2127     folly::SocketAddress serverSocketAddress;
2128     serverSocket->getAddress(&serverSocketAddress);
2129     CHECK_EQ(boundAddress, serverSocketAddress);
2130
2131     // Make sure the socket works
2132     serverSocketSanityTest(serverSocket.get());
2133   }
2134
2135   // Test creating a socket, binding and listening manually,
2136   // then giving it to AsyncServerSocket
2137   {
2138     // Manually create a socket
2139     int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2140     ASSERT_GE(fd, 0);
2141     // bind
2142     struct sockaddr_in addr;
2143     addr.sin_family = AF_INET;
2144     addr.sin_port = 0;
2145     addr.sin_addr.s_addr = INADDR_ANY;
2146     CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2147                              sizeof(addr)), 0);
2148     // Look up the address that we bound to
2149     folly::SocketAddress boundAddress;
2150     boundAddress.setFromLocalAddress(fd);
2151     // listen
2152     CHECK_EQ(listen(fd, 16), 0);
2153
2154     // Create a server socket
2155     AsyncServerSocket::UniquePtr serverSocket(
2156         new AsyncServerSocket(&eventBase));
2157     serverSocket->useExistingSocket(fd);
2158
2159     // Make sure AsyncServerSocket reports the same address that we bound to
2160     folly::SocketAddress serverSocketAddress;
2161     serverSocket->getAddress(&serverSocketAddress);
2162     CHECK_EQ(boundAddress, serverSocketAddress);
2163
2164     // Make sure the socket works
2165     serverSocketSanityTest(serverSocket.get());
2166   }
2167 }
2168
2169 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2170   EventBase eventBase;
2171
2172   // Create a server socket
2173   std::shared_ptr<AsyncServerSocket> serverSocket(
2174       AsyncServerSocket::newSocket(&eventBase));
2175   string path(1, 0);
2176   path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2177   folly::SocketAddress serverAddress;
2178   serverAddress.setFromPath(path);
2179   serverSocket->bind(serverAddress);
2180   serverSocket->listen(16);
2181
2182   // Add a callback to accept one connection then stop the loop
2183   TestAcceptCallback acceptCallback;
2184   acceptCallback.setConnectionAcceptedFn(
2185       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2186         serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2187       });
2188   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2189     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2190   });
2191   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2192   serverSocket->startAccepting();
2193
2194   // Connect to the server socket
2195   std::shared_ptr<AsyncSocket> socket(
2196       AsyncSocket::newSocket(&eventBase, serverAddress));
2197
2198   eventBase.loop();
2199
2200   // Verify that the server accepted a connection
2201   CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2202   CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2203                     TestAcceptCallback::TYPE_START);
2204   CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2205                     TestAcceptCallback::TYPE_ACCEPT);
2206   CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2207                     TestAcceptCallback::TYPE_STOP);
2208   int fd = acceptCallback.getEvents()->at(1).fd;
2209
2210   // The accepted connection should already be in non-blocking mode
2211   int flags = fcntl(fd, F_GETFL, 0);
2212   CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2213 }
2214
2215 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2216   EventBase eventBase;
2217   TestConnectionEventCallback connectionEventCallback;
2218
2219   // Create a server socket
2220   std::shared_ptr<AsyncServerSocket> serverSocket(
2221       AsyncServerSocket::newSocket(&eventBase));
2222   serverSocket->setConnectionEventCallback(&connectionEventCallback);
2223   serverSocket->bind(0);
2224   serverSocket->listen(16);
2225   folly::SocketAddress serverAddress;
2226   serverSocket->getAddress(&serverAddress);
2227
2228   // Add a callback to accept one connection then stop the loop
2229   TestAcceptCallback acceptCallback;
2230   acceptCallback.setConnectionAcceptedFn(
2231       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2232         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2233       });
2234   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2235     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2236   });
2237   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2238   serverSocket->startAccepting();
2239
2240   // Connect to the server socket
2241   std::shared_ptr<AsyncSocket> socket(
2242       AsyncSocket::newSocket(&eventBase, serverAddress));
2243
2244   eventBase.loop();
2245
2246   // Validate the connection event counters
2247   ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2248   ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2249   ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2250   ASSERT_EQ(
2251       connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2252   ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2253   ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2254   ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2255   ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2256 }
2257
2258 TEST(AsyncSocketTest, CallbackInPrimaryEventBase) {
2259   EventBase eventBase;
2260   TestConnectionEventCallback connectionEventCallback;
2261
2262   // Create a server socket
2263   std::shared_ptr<AsyncServerSocket> serverSocket(
2264       AsyncServerSocket::newSocket(&eventBase));
2265   serverSocket->setConnectionEventCallback(&connectionEventCallback);
2266   serverSocket->bind(0);
2267   serverSocket->listen(16);
2268   folly::SocketAddress serverAddress;
2269   serverSocket->getAddress(&serverAddress);
2270
2271   // Add a callback to accept one connection then stop the loop
2272   TestAcceptCallback acceptCallback;
2273   acceptCallback.setConnectionAcceptedFn(
2274       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2275         serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2276       });
2277   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2278     serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2279   });
2280   bool acceptStartedFlag{false};
2281   acceptCallback.setAcceptStartedFn([&acceptStartedFlag](){
2282     acceptStartedFlag = true;
2283   });
2284   bool acceptStoppedFlag{false};
2285   acceptCallback.setAcceptStoppedFn([&acceptStoppedFlag](){
2286     acceptStoppedFlag = true;
2287   });
2288   serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2289   serverSocket->startAccepting();
2290
2291   // Connect to the server socket
2292   std::shared_ptr<AsyncSocket> socket(
2293       AsyncSocket::newSocket(&eventBase, serverAddress));
2294
2295   eventBase.loop();
2296
2297   ASSERT_TRUE(acceptStartedFlag);
2298   ASSERT_TRUE(acceptStoppedFlag);
2299   // Validate the connection event counters
2300   ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2301   ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2302   ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2303   ASSERT_EQ(
2304       connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
2305   ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
2306   ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2307   ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2308   ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2309 }
2310
2311
2312
2313 /**
2314  * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2315  */
2316 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2317   EventBase eventBase;
2318
2319   // Counter of how many connections have been accepted
2320   int count = 0;
2321
2322   // Create a server socket
2323   auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2324   serverSocket->bind(0);
2325   serverSocket->listen(16);
2326   folly::SocketAddress serverAddress;
2327   serverSocket->getAddress(&serverAddress);
2328
2329   // Add a callback to accept connections
2330   TestAcceptCallback acceptCallback;
2331   acceptCallback.setConnectionAcceptedFn(
2332       [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2333         count++;
2334         CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2335
2336         if (count == 4) {
2337           // all messages are processed, remove accept callback
2338           serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2339         }
2340       });
2341   acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2342     serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2343   });
2344   serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2345   serverSocket->startAccepting();
2346
2347   // Connect to the server socket, 4 clients, there are 4 connections
2348   auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2349   auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2350   auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2351   auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2352
2353   eventBase.loop();
2354 }
2355
2356 /**
2357  * Test AsyncTransport::BufferCallback
2358  */
2359 TEST(AsyncSocketTest, BufferTest) {
2360   TestServer server;
2361
2362   EventBase evb;
2363   AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2364   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2365   ConnCallback ccb;
2366   socket->connect(&ccb, server.getAddress(), 30, option);
2367
2368   char buf[100 * 1024];
2369   memset(buf, 'c', sizeof(buf));
2370   WriteCallback wcb;
2371   BufferCallback bcb;
2372   socket->setBufferCallback(&bcb);
2373   socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2374
2375   evb.loop();
2376   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2377   CHECK_EQ(wcb.state, STATE_SUCCEEDED);
2378
2379   ASSERT_TRUE(bcb.hasBuffered());
2380   ASSERT_TRUE(bcb.hasBufferCleared());
2381
2382   socket->close();
2383   server.verifyConnection(buf, sizeof(buf));
2384
2385   ASSERT_TRUE(socket->isClosedBySelf());
2386   ASSERT_FALSE(socket->isClosedByPeer());
2387 }
2388
2389 TEST(AsyncSocketTest, BufferCallbackKill) {
2390   TestServer server;
2391   EventBase evb;
2392   AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2393   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2394   ConnCallback ccb;
2395   socket->connect(&ccb, server.getAddress(), 30, option);
2396   evb.loopOnce();
2397
2398   char buf[100 * 1024];
2399   memset(buf, 'c', sizeof(buf));
2400   BufferCallback bcb;
2401   socket->setBufferCallback(&bcb);
2402   WriteCallback wcb;
2403   wcb.successCallback = [&] {
2404     ASSERT_TRUE(socket.unique());
2405     socket.reset();
2406   };
2407
2408   // This will trigger AsyncSocket::handleWrite,
2409   // which calls WriteCallback::writeSuccess,
2410   // which calls wcb.successCallback above,
2411   // which tries to delete socket
2412   // Then, the socket will also try to use this BufferCallback
2413   // And that should crash us, if there is no DestructorGuard on the stack
2414   socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2415
2416   evb.loop();
2417   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2418 }
2419
2420 #if FOLLY_ALLOW_TFO
2421 TEST(AsyncSocketTest, ConnectTFO) {
2422   // Start listening on a local port
2423   TestServer server(true);
2424
2425   // Connect using a AsyncSocket
2426   EventBase evb;
2427   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2428   socket->enableTFO();
2429   ConnCallback cb;
2430   socket->connect(&cb, server.getAddress(), 30);
2431
2432   std::array<uint8_t, 128> buf;
2433   memset(buf.data(), 'a', buf.size());
2434
2435   std::array<uint8_t, 3> readBuf;
2436   auto sendBuf = IOBuf::copyBuffer("hey");
2437
2438   std::thread t([&] {
2439     auto acceptedSocket = server.accept();
2440     acceptedSocket->write(buf.data(), buf.size());
2441     acceptedSocket->flush();
2442     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2443     acceptedSocket->close();
2444   });
2445
2446   evb.loop();
2447
2448   CHECK_EQ(cb.state, STATE_SUCCEEDED);
2449   EXPECT_LE(0, socket->getConnectTime().count());
2450   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2451   EXPECT_TRUE(socket->getTFOAttempted());
2452
2453   // Should trigger the connect
2454   WriteCallback write;
2455   ReadCallback rcb;
2456   socket->writeChain(&write, sendBuf->clone());
2457   socket->setReadCB(&rcb);
2458   evb.loop();
2459
2460   t.join();
2461
2462   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2463   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2464   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2465   ASSERT_EQ(1, rcb.buffers.size());
2466   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2467   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2468   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2469 }
2470
2471 TEST(AsyncSocketTest, ConnectTFOSupplyEarlyReadCB) {
2472   // Start listening on a local port
2473   TestServer server(true);
2474
2475   // Connect using a AsyncSocket
2476   EventBase evb;
2477   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2478   socket->enableTFO();
2479   ConnCallback cb;
2480   socket->connect(&cb, server.getAddress(), 30);
2481   ReadCallback rcb;
2482   socket->setReadCB(&rcb);
2483
2484   std::array<uint8_t, 128> buf;
2485   memset(buf.data(), 'a', buf.size());
2486
2487   std::array<uint8_t, 3> readBuf;
2488   auto sendBuf = IOBuf::copyBuffer("hey");
2489
2490   std::thread t([&] {
2491     auto acceptedSocket = server.accept();
2492     acceptedSocket->write(buf.data(), buf.size());
2493     acceptedSocket->flush();
2494     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2495     acceptedSocket->close();
2496   });
2497
2498   evb.loop();
2499
2500   CHECK_EQ(cb.state, STATE_SUCCEEDED);
2501   EXPECT_LE(0, socket->getConnectTime().count());
2502   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2503   EXPECT_TRUE(socket->getTFOAttempted());
2504
2505   // Should trigger the connect
2506   WriteCallback write;
2507   socket->writeChain(&write, sendBuf->clone());
2508   evb.loop();
2509
2510   t.join();
2511
2512   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2513   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2514   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2515   ASSERT_EQ(1, rcb.buffers.size());
2516   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2517   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2518   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2519 }
2520
2521 /**
2522  * Test connecting to a server that isn't listening
2523  */
2524 TEST(AsyncSocketTest, ConnectRefusedTFO) {
2525   EventBase evb;
2526
2527   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2528
2529   socket->enableTFO();
2530
2531   // Hopefully nothing is actually listening on this address
2532   folly::SocketAddress addr("::1", 65535);
2533   ConnCallback cb;
2534   socket->connect(&cb, addr, 30);
2535
2536   evb.loop();
2537
2538   WriteCallback write1;
2539   // Trigger the connect if TFO attempt is supported.
2540   socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2541   evb.loop();
2542   WriteCallback write2;
2543   socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2544   evb.loop();
2545
2546   if (!socket->getTFOFinished()) {
2547     EXPECT_EQ(STATE_FAILED, write1.state);
2548   } else {
2549     EXPECT_EQ(STATE_SUCCEEDED, write1.state);
2550     EXPECT_FALSE(socket->getTFOSucceded());
2551   }
2552
2553   EXPECT_EQ(STATE_FAILED, write2.state);
2554
2555   EXPECT_EQ(STATE_SUCCEEDED, cb.state);
2556   EXPECT_LE(0, socket->getConnectTime().count());
2557   EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2558   EXPECT_TRUE(socket->getTFOAttempted());
2559 }
2560
2561 /**
2562  * Test calling closeNow() immediately after connecting.
2563  */
2564 TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
2565   TestServer server(true);
2566
2567   // connect()
2568   EventBase evb;
2569   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2570   socket->enableTFO();
2571
2572   ConnCallback ccb;
2573   socket->connect(&ccb, server.getAddress(), 30);
2574
2575   // write()
2576   std::array<char, 128> buf;
2577   memset(buf.data(), 'a', buf.size());
2578
2579   // close()
2580   socket->closeNow();
2581
2582   // Loop, although there shouldn't be anything to do.
2583   evb.loop();
2584
2585   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2586
2587   ASSERT_TRUE(socket->isClosedBySelf());
2588   ASSERT_FALSE(socket->isClosedByPeer());
2589 }
2590
2591 /**
2592  * Test calling close() immediately after connect()
2593  */
2594 TEST(AsyncSocketTest, ConnectAndCloseTFO) {
2595   TestServer server(true);
2596
2597   // Connect using a AsyncSocket
2598   EventBase evb;
2599   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2600   socket->enableTFO();
2601
2602   ConnCallback ccb;
2603   socket->connect(&ccb, server.getAddress(), 30);
2604
2605   socket->close();
2606
2607   // Loop, although there shouldn't be anything to do.
2608   evb.loop();
2609
2610   // Make sure the connection was aborted
2611   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2612
2613   ASSERT_TRUE(socket->isClosedBySelf());
2614   ASSERT_FALSE(socket->isClosedByPeer());
2615 }
2616
2617 class MockAsyncTFOSocket : public AsyncSocket {
2618  public:
2619   using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
2620
2621   explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
2622
2623   MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
2624 };
2625
2626 TEST(AsyncSocketTest, TestTFOUnsupported) {
2627   TestServer server(true);
2628
2629   // Connect using a AsyncSocket
2630   EventBase evb;
2631   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2632   socket->enableTFO();
2633
2634   ConnCallback ccb;
2635   socket->connect(&ccb, server.getAddress(), 30);
2636   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2637
2638   ReadCallback rcb;
2639   socket->setReadCB(&rcb);
2640
2641   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2642       .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2643   WriteCallback write;
2644   auto sendBuf = IOBuf::copyBuffer("hey");
2645   socket->writeChain(&write, sendBuf->clone());
2646   EXPECT_EQ(STATE_WAITING, write.state);
2647
2648   std::array<uint8_t, 128> buf;
2649   memset(buf.data(), 'a', buf.size());
2650
2651   std::array<uint8_t, 3> readBuf;
2652
2653   std::thread t([&] {
2654     std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2655     acceptedSocket->write(buf.data(), buf.size());
2656     acceptedSocket->flush();
2657     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2658     acceptedSocket->close();
2659   });
2660
2661   evb.loop();
2662
2663   t.join();
2664   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2665   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2666
2667   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2668   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2669   ASSERT_EQ(1, rcb.buffers.size());
2670   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2671   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2672   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2673 }
2674
2675 TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
2676   // Try connecting to server that won't respond.
2677   //
2678   // This depends somewhat on the network where this test is run.
2679   // Hopefully this IP will be routable but unresponsive.
2680   // (Alternatively, we could try listening on a local raw socket, but that
2681   // normally requires root privileges.)
2682   auto host = SocketAddressTestHelper::isIPv6Enabled()
2683       ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2684       : SocketAddressTestHelper::isIPv4Enabled()
2685           ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2686           : nullptr;
2687   SocketAddress addr(host, 65535);
2688
2689   // Connect using a AsyncSocket
2690   EventBase evb;
2691   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2692   socket->enableTFO();
2693
2694   ConnCallback ccb;
2695   // Set a very small timeout
2696   socket->connect(&ccb, addr, 1);
2697   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2698
2699   ReadCallback rcb;
2700   socket->setReadCB(&rcb);
2701
2702   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2703       .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2704   WriteCallback write;
2705   socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2706
2707   evb.loop();
2708
2709   EXPECT_EQ(STATE_FAILED, write.state);
2710 }
2711
2712 TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
2713   TestServer server(true);
2714
2715   // Connect using a AsyncSocket
2716   EventBase evb;
2717   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2718   socket->enableTFO();
2719
2720   ConnCallback ccb;
2721   socket->connect(&ccb, server.getAddress(), 30);
2722   CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2723
2724   ReadCallback rcb;
2725   socket->setReadCB(&rcb);
2726
2727   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2728       .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2729         sockaddr_storage addr;
2730         auto len = server.getAddress().getAddress(&addr);
2731         return connect(fd, (const struct sockaddr*)&addr, len);
2732       }));
2733   WriteCallback write;
2734   auto sendBuf = IOBuf::copyBuffer("hey");
2735   socket->writeChain(&write, sendBuf->clone());
2736   EXPECT_EQ(STATE_WAITING, write.state);
2737
2738   std::array<uint8_t, 128> buf;
2739   memset(buf.data(), 'a', buf.size());
2740
2741   std::array<uint8_t, 3> readBuf;
2742
2743   std::thread t([&] {
2744     std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2745     acceptedSocket->write(buf.data(), buf.size());
2746     acceptedSocket->flush();
2747     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2748     acceptedSocket->close();
2749   });
2750
2751   evb.loop();
2752
2753   t.join();
2754   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2755
2756   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2757   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2758
2759   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2760   ASSERT_EQ(1, rcb.buffers.size());
2761   ASSERT_EQ(buf.size(), rcb.buffers[0].length);
2762   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2763 }
2764
2765 TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
2766   // Try connecting to server that won't respond.
2767   //
2768   // This depends somewhat on the network where this test is run.
2769   // Hopefully this IP will be routable but unresponsive.
2770   // (Alternatively, we could try listening on a local raw socket, but that
2771   // normally requires root privileges.)
2772   auto host = SocketAddressTestHelper::isIPv6Enabled()
2773       ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2774       : SocketAddressTestHelper::isIPv4Enabled()
2775           ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2776           : nullptr;
2777   SocketAddress addr(host, 65535);
2778
2779   // Connect using a AsyncSocket
2780   EventBase evb;
2781   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2782   socket->enableTFO();
2783
2784   ConnCallback ccb;
2785   // Set a very small timeout
2786   socket->connect(&ccb, addr, 1);
2787   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2788
2789   ReadCallback rcb;
2790   socket->setReadCB(&rcb);
2791
2792   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2793       .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2794         sockaddr_storage addr2;
2795         auto len = addr.getAddress(&addr2);
2796         return connect(fd, (const struct sockaddr*)&addr2, len);
2797       }));
2798   WriteCallback write;
2799   socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2800
2801   evb.loop();
2802
2803   EXPECT_EQ(STATE_FAILED, write.state);
2804 }
2805
2806 TEST(AsyncSocketTest, TestTFOEagain) {
2807   TestServer server(true);
2808
2809   // Connect using a AsyncSocket
2810   EventBase evb;
2811   auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2812   socket->enableTFO();
2813
2814   ConnCallback ccb;
2815   socket->connect(&ccb, server.getAddress(), 30);
2816
2817   EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2818       .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
2819   WriteCallback write;
2820   socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2821
2822   evb.loop();
2823
2824   EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2825   EXPECT_EQ(STATE_FAILED, write.state);
2826 }
2827
2828 // Sending a large amount of data in the first write which will
2829 // definitely not fit into MSS.
2830 TEST(AsyncSocketTest, ConnectTFOWithBigData) {
2831   // Start listening on a local port
2832   TestServer server(true);
2833
2834   // Connect using a AsyncSocket
2835   EventBase evb;
2836   std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2837   socket->enableTFO();
2838   ConnCallback cb;
2839   socket->connect(&cb, server.getAddress(), 30);
2840
2841   std::array<uint8_t, 128> buf;
2842   memset(buf.data(), 'a', buf.size());
2843
2844   constexpr size_t len = 10 * 1024;
2845   auto sendBuf = IOBuf::create(len);
2846   sendBuf->append(len);
2847   std::array<uint8_t, len> readBuf;
2848
2849   std::thread t([&] {
2850     auto acceptedSocket = server.accept();
2851     acceptedSocket->write(buf.data(), buf.size());
2852     acceptedSocket->flush();
2853     acceptedSocket->readAll(readBuf.data(), readBuf.size());
2854     acceptedSocket->close();
2855   });
2856
2857   evb.loop();
2858
2859   CHECK_EQ(cb.state, STATE_SUCCEEDED);
2860   EXPECT_LE(0, socket->getConnectTime().count());
2861   EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2862   EXPECT_TRUE(socket->getTFOAttempted());
2863
2864   // Should trigger the connect
2865   WriteCallback write;
2866   ReadCallback rcb;
2867   socket->writeChain(&write, sendBuf->clone());
2868   socket->setReadCB(&rcb);
2869   evb.loop();
2870
2871   t.join();
2872
2873   EXPECT_EQ(STATE_SUCCEEDED, write.state);
2874   EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2875   EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2876   ASSERT_EQ(1, rcb.buffers.size());
2877   ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2878   EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2879   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2880 }
2881
2882 #endif