2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23 #include <folly/io/async/test/AsyncSocketTest.h>
24 #include <folly/io/async/test/Util.h>
25 #include <folly/test/SocketAddressTestHelper.h>
27 #include <gtest/gtest.h>
28 #include <boost/scoped_array.hpp>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/tcp.h>
38 using namespace boost;
45 using std::unique_ptr;
46 using std::chrono::milliseconds;
47 using boost::scoped_array;
49 using namespace folly;
51 class DelayedWrite: public AsyncTimeout {
53 DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
54 unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
55 bool cork, bool lastWrite = false):
56 AsyncTimeout(socket->getEventBase()),
58 bufs_(std::move(bufs)),
61 lastWrite_(lastWrite) {}
64 void timeoutExpired() noexcept override {
65 WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
66 socket_->writeChain(wcb_, std::move(bufs_), flags);
68 socket_->shutdownWrite();
72 std::shared_ptr<AsyncSocket> socket_;
73 unique_ptr<IOBuf> bufs_;
74 AsyncTransportWrapper::WriteCallback* wcb_;
79 ///////////////////////////////////////////////////////////////////////////
81 ///////////////////////////////////////////////////////////////////////////
84 * Test connecting to a server
86 TEST(AsyncSocketTest, Connect) {
87 // Start listening on a local port
90 // Connect using a AsyncSocket
92 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
94 socket->connect(&cb, server.getAddress(), 30);
98 CHECK_EQ(cb.state, STATE_SUCCEEDED);
102 * Test connecting to a server that isn't listening
104 TEST(AsyncSocketTest, ConnectRefused) {
107 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
109 // Hopefully nothing is actually listening on this address
110 folly::SocketAddress addr("127.0.0.1", 65535);
112 socket->connect(&cb, addr, 30);
116 CHECK_EQ(cb.state, STATE_FAILED);
117 CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
121 * Test connection timeout
123 TEST(AsyncSocketTest, ConnectTimeout) {
126 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
128 // Try connecting to server that won't respond.
130 // This depends somewhat on the network where this test is run.
131 // Hopefully this IP will be routable but unresponsive.
132 // (Alternatively, we could try listening on a local raw socket, but that
133 // normally requires root privileges.)
135 SocketAddressTestHelper::isIPv6Enabled() ?
136 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
137 SocketAddressTestHelper::isIPv4Enabled() ?
138 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
140 SocketAddress addr(host, 65535);
142 socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
146 CHECK_EQ(cb.state, STATE_FAILED);
147 CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
149 // Verify that we can still get the peer address after a timeout.
150 // Use case is if the client was created from a client pool, and we want
151 // to log which peer failed.
152 folly::SocketAddress peer;
153 socket->getPeerAddress(&peer);
154 CHECK_EQ(peer, addr);
158 * Test writing immediately after connecting, without waiting for connect
161 TEST(AsyncSocketTest, ConnectAndWrite) {
166 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
168 socket->connect(&ccb, server.getAddress(), 30);
172 memset(buf, 'a', sizeof(buf));
174 socket->write(&wcb, buf, sizeof(buf));
176 // Loop. We don't bother accepting on the server socket yet.
177 // The kernel should be able to buffer the write request so it can succeed.
180 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
181 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
183 // Make sure the server got a connection and received the data
185 server.verifyConnection(buf, sizeof(buf));
187 ASSERT_TRUE(socket->isClosedBySelf());
188 ASSERT_FALSE(socket->isClosedByPeer());
192 * Test connecting using a nullptr connect callback.
194 TEST(AsyncSocketTest, ConnectNullCallback) {
199 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
200 socket->connect(nullptr, server.getAddress(), 30);
202 // write some data, just so we have some way of verifing
203 // that the socket works correctly after connecting
205 memset(buf, 'a', sizeof(buf));
207 socket->write(&wcb, buf, sizeof(buf));
211 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
213 // Make sure the server got a connection and received the data
215 server.verifyConnection(buf, sizeof(buf));
217 ASSERT_TRUE(socket->isClosedBySelf());
218 ASSERT_FALSE(socket->isClosedByPeer());
222 * Test calling both write() and close() immediately after connecting, without
223 * waiting for connect to finish.
225 * This exercises the STATE_CONNECTING_CLOSING code.
227 TEST(AsyncSocketTest, ConnectWriteAndClose) {
232 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
234 socket->connect(&ccb, server.getAddress(), 30);
238 memset(buf, 'a', sizeof(buf));
240 socket->write(&wcb, buf, sizeof(buf));
245 // Loop. We don't bother accepting on the server socket yet.
246 // The kernel should be able to buffer the write request so it can succeed.
249 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
250 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
252 // Make sure the server got a connection and received the data
253 server.verifyConnection(buf, sizeof(buf));
255 ASSERT_TRUE(socket->isClosedBySelf());
256 ASSERT_FALSE(socket->isClosedByPeer());
260 * Test calling close() immediately after connect()
262 TEST(AsyncSocketTest, ConnectAndClose) {
265 // Connect using a AsyncSocket
267 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
269 socket->connect(&ccb, server.getAddress(), 30);
271 // Hopefully the connect didn't succeed immediately.
272 // If it did, we can't exercise the close-while-connecting code path.
273 if (ccb.state == STATE_SUCCEEDED) {
274 LOG(INFO) << "connect() succeeded immediately; aborting test "
275 "of close-during-connect behavior";
281 // Loop, although there shouldn't be anything to do.
284 // Make sure the connection was aborted
285 CHECK_EQ(ccb.state, STATE_FAILED);
287 ASSERT_TRUE(socket->isClosedBySelf());
288 ASSERT_FALSE(socket->isClosedByPeer());
292 * Test calling closeNow() immediately after connect()
294 * This should be identical to the normal close behavior.
296 TEST(AsyncSocketTest, ConnectAndCloseNow) {
299 // Connect using a AsyncSocket
301 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
303 socket->connect(&ccb, server.getAddress(), 30);
305 // Hopefully the connect didn't succeed immediately.
306 // If it did, we can't exercise the close-while-connecting code path.
307 if (ccb.state == STATE_SUCCEEDED) {
308 LOG(INFO) << "connect() succeeded immediately; aborting test "
309 "of closeNow()-during-connect behavior";
315 // Loop, although there shouldn't be anything to do.
318 // Make sure the connection was aborted
319 CHECK_EQ(ccb.state, STATE_FAILED);
321 ASSERT_TRUE(socket->isClosedBySelf());
322 ASSERT_FALSE(socket->isClosedByPeer());
326 * Test calling both write() and closeNow() immediately after connecting,
327 * without waiting for connect to finish.
329 * This should abort the pending write.
331 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
336 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
338 socket->connect(&ccb, server.getAddress(), 30);
340 // Hopefully the connect didn't succeed immediately.
341 // If it did, we can't exercise the close-while-connecting code path.
342 if (ccb.state == STATE_SUCCEEDED) {
343 LOG(INFO) << "connect() succeeded immediately; aborting test "
344 "of write-during-connect behavior";
350 memset(buf, 'a', sizeof(buf));
352 socket->write(&wcb, buf, sizeof(buf));
357 // Loop, although there shouldn't be anything to do.
360 CHECK_EQ(ccb.state, STATE_FAILED);
361 CHECK_EQ(wcb.state, STATE_FAILED);
363 ASSERT_TRUE(socket->isClosedBySelf());
364 ASSERT_FALSE(socket->isClosedByPeer());
368 * Test installing a read callback immediately, before connect() finishes.
370 TEST(AsyncSocketTest, ConnectAndRead) {
375 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
377 socket->connect(&ccb, server.getAddress(), 30);
380 socket->setReadCB(&rcb);
382 // Even though we haven't looped yet, we should be able to accept
383 // the connection and send data to it.
384 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
386 memset(buf, 'a', sizeof(buf));
387 acceptedSocket->write(buf, sizeof(buf));
388 acceptedSocket->flush();
389 acceptedSocket->close();
391 // Loop, although there shouldn't be anything to do.
394 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
395 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
396 CHECK_EQ(rcb.buffers.size(), 1);
397 CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
398 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
400 ASSERT_FALSE(socket->isClosedBySelf());
401 ASSERT_FALSE(socket->isClosedByPeer());
405 * Test installing a read callback and then closing immediately before the
406 * connect attempt finishes.
408 TEST(AsyncSocketTest, ConnectReadAndClose) {
413 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
415 socket->connect(&ccb, server.getAddress(), 30);
417 // Hopefully the connect didn't succeed immediately.
418 // If it did, we can't exercise the close-while-connecting code path.
419 if (ccb.state == STATE_SUCCEEDED) {
420 LOG(INFO) << "connect() succeeded immediately; aborting test "
421 "of read-during-connect behavior";
426 socket->setReadCB(&rcb);
431 // Loop, although there shouldn't be anything to do.
434 CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
435 CHECK_EQ(rcb.buffers.size(), 0);
436 CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
438 ASSERT_TRUE(socket->isClosedBySelf());
439 ASSERT_FALSE(socket->isClosedByPeer());
443 * Test both writing and installing a read callback immediately,
444 * before connect() finishes.
446 TEST(AsyncSocketTest, ConnectWriteAndRead) {
451 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
453 socket->connect(&ccb, server.getAddress(), 30);
457 memset(buf1, 'a', sizeof(buf1));
459 socket->write(&wcb, buf1, sizeof(buf1));
461 // set a read callback
463 socket->setReadCB(&rcb);
465 // Even though we haven't looped yet, we should be able to accept
466 // the connection and send data to it.
467 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
469 memset(buf2, 'b', sizeof(buf2));
470 acceptedSocket->write(buf2, sizeof(buf2));
471 acceptedSocket->flush();
473 // shut down the write half of acceptedSocket, so that the AsyncSocket
474 // will stop reading and we can break out of the event loop.
475 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
480 // Make sure the connect succeeded
481 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
483 // Make sure the AsyncSocket read the data written by the accepted socket
484 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
485 CHECK_EQ(rcb.buffers.size(), 1);
486 CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
487 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
489 // Close the AsyncSocket so we'll see EOF on acceptedSocket
492 // Make sure the accepted socket saw the data written by the AsyncSocket
493 uint8_t readbuf[sizeof(buf1)];
494 acceptedSocket->readAll(readbuf, sizeof(readbuf));
495 CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
496 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
497 CHECK_EQ(bytesRead, 0);
499 ASSERT_FALSE(socket->isClosedBySelf());
500 ASSERT_TRUE(socket->isClosedByPeer());
504 * Test writing to the socket then shutting down writes before the connect
507 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
512 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
514 socket->connect(&ccb, server.getAddress(), 30);
516 // Hopefully the connect didn't succeed immediately.
517 // If it did, we can't exercise the write-while-connecting code path.
518 if (ccb.state == STATE_SUCCEEDED) {
519 LOG(INFO) << "connect() succeeded immediately; skipping test";
523 // Ask to write some data
525 memset(wbuf, 'a', sizeof(wbuf));
527 socket->write(&wcb, wbuf, sizeof(wbuf));
528 socket->shutdownWrite();
531 socket->shutdownWrite();
533 // Even though we haven't looped yet, we should be able to accept
535 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
537 // Since the connection is still in progress, there should be no data to
538 // read yet. Verify that the accepted socket is not readable.
539 struct pollfd fds[1];
540 fds[0].fd = acceptedSocket->getSocketFD();
541 fds[0].events = POLLIN;
543 int rc = poll(fds, 1, 0);
546 // Write data to the accepted socket
547 uint8_t acceptedWbuf[192];
548 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
549 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
550 acceptedSocket->flush();
555 // The loop should have completed the connection, written the queued data,
556 // and shutdown writes on the socket.
558 // Check that the connection was completed successfully and that the write
559 // callback succeeded.
560 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
561 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
563 // Check that we can read the data that was written to the socket, and that
564 // we see an EOF, since its socket was half-shutdown.
565 uint8_t readbuf[sizeof(wbuf)];
566 acceptedSocket->readAll(readbuf, sizeof(readbuf));
567 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
568 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
569 CHECK_EQ(bytesRead, 0);
571 // Close the accepted socket. This will cause it to see EOF
572 // and uninstall the read callback when we loop next.
573 acceptedSocket->close();
575 // Install a read callback, then loop again.
577 socket->setReadCB(&rcb);
580 // This loop should have read the data and seen the EOF
581 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
582 CHECK_EQ(rcb.buffers.size(), 1);
583 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
584 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
585 acceptedWbuf, sizeof(acceptedWbuf)), 0);
587 ASSERT_FALSE(socket->isClosedBySelf());
588 ASSERT_FALSE(socket->isClosedByPeer());
592 * Test reading, writing, and shutting down writes before the connect attempt
595 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
600 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
602 socket->connect(&ccb, server.getAddress(), 30);
604 // Hopefully the connect didn't succeed immediately.
605 // If it did, we can't exercise the write-while-connecting code path.
606 if (ccb.state == STATE_SUCCEEDED) {
607 LOG(INFO) << "connect() succeeded immediately; skipping test";
611 // Install a read callback
613 socket->setReadCB(&rcb);
615 // Ask to write some data
617 memset(wbuf, 'a', sizeof(wbuf));
619 socket->write(&wcb, wbuf, sizeof(wbuf));
622 socket->shutdownWrite();
624 // Even though we haven't looped yet, we should be able to accept
626 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
628 // Since the connection is still in progress, there should be no data to
629 // read yet. Verify that the accepted socket is not readable.
630 struct pollfd fds[1];
631 fds[0].fd = acceptedSocket->getSocketFD();
632 fds[0].events = POLLIN;
634 int rc = poll(fds, 1, 0);
637 // Write data to the accepted socket
638 uint8_t acceptedWbuf[192];
639 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
640 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
641 acceptedSocket->flush();
642 // Shutdown writes to the accepted socket. This will cause it to see EOF
643 // and uninstall the read callback.
644 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
649 // The loop should have completed the connection, written the queued data,
650 // shutdown writes on the socket, read the data we wrote to it, and see the
653 // Check that the connection was completed successfully and that the read
654 // and write callbacks were invoked as expected.
655 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
656 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
657 CHECK_EQ(rcb.buffers.size(), 1);
658 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
659 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
660 acceptedWbuf, sizeof(acceptedWbuf)), 0);
661 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
663 // Check that we can read the data that was written to the socket, and that
664 // we see an EOF, since its socket was half-shutdown.
665 uint8_t readbuf[sizeof(wbuf)];
666 acceptedSocket->readAll(readbuf, sizeof(readbuf));
667 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
668 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
669 CHECK_EQ(bytesRead, 0);
671 // Fully close both sockets
672 acceptedSocket->close();
675 ASSERT_FALSE(socket->isClosedBySelf());
676 ASSERT_TRUE(socket->isClosedByPeer());
680 * Test reading, writing, and calling shutdownWriteNow() before the
681 * connect attempt finishes.
683 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
688 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
690 socket->connect(&ccb, server.getAddress(), 30);
692 // Hopefully the connect didn't succeed immediately.
693 // If it did, we can't exercise the write-while-connecting code path.
694 if (ccb.state == STATE_SUCCEEDED) {
695 LOG(INFO) << "connect() succeeded immediately; skipping test";
699 // Install a read callback
701 socket->setReadCB(&rcb);
703 // Ask to write some data
705 memset(wbuf, 'a', sizeof(wbuf));
707 socket->write(&wcb, wbuf, sizeof(wbuf));
709 // Shutdown writes immediately.
710 // This should immediately discard the data that we just tried to write.
711 socket->shutdownWriteNow();
713 // Verify that writeError() was invoked on the write callback.
714 CHECK_EQ(wcb.state, STATE_FAILED);
715 CHECK_EQ(wcb.bytesWritten, 0);
717 // Even though we haven't looped yet, we should be able to accept
719 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
721 // Since the connection is still in progress, there should be no data to
722 // read yet. Verify that the accepted socket is not readable.
723 struct pollfd fds[1];
724 fds[0].fd = acceptedSocket->getSocketFD();
725 fds[0].events = POLLIN;
727 int rc = poll(fds, 1, 0);
730 // Write data to the accepted socket
731 uint8_t acceptedWbuf[192];
732 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
733 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
734 acceptedSocket->flush();
735 // Shutdown writes to the accepted socket. This will cause it to see EOF
736 // and uninstall the read callback.
737 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
742 // The loop should have completed the connection, written the queued data,
743 // shutdown writes on the socket, read the data we wrote to it, and see the
746 // Check that the connection was completed successfully and that the read
747 // callback was invoked as expected.
748 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
749 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
750 CHECK_EQ(rcb.buffers.size(), 1);
751 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
752 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
753 acceptedWbuf, sizeof(acceptedWbuf)), 0);
755 // Since we used shutdownWriteNow(), it should have discarded all pending
756 // write data. Verify we see an immediate EOF when reading from the accepted
758 uint8_t readbuf[sizeof(wbuf)];
759 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
760 CHECK_EQ(bytesRead, 0);
762 // Fully close both sockets
763 acceptedSocket->close();
766 ASSERT_FALSE(socket->isClosedBySelf());
767 ASSERT_TRUE(socket->isClosedByPeer());
770 // Helper function for use in testConnectOptWrite()
771 // Temporarily disable the read callback
772 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
773 // Uninstall the read callback
774 socket->setReadCB(nullptr);
775 // Schedule the read callback to be reinstalled after 1ms
776 socket->getEventBase()->runInLoop(
777 std::bind(&AsyncSocket::setReadCB, socket, rcb));
781 * Test connect+write, then have the connect callback perform another write.
783 * This tests interaction of the optimistic writing after connect with
784 * additional write attempts that occur in the connect callback.
786 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
789 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
793 socket->connect(&ccb, server.getAddress(), 30);
795 // Hopefully the connect didn't succeed immediately.
796 // If it did, we can't exercise the optimistic write code path.
797 if (ccb.state == STATE_SUCCEEDED) {
798 LOG(INFO) << "connect() succeeded immediately; aborting test "
799 "of optimistic write behavior";
803 // Tell the connect callback to perform a write when the connect succeeds
805 scoped_array<char> buf2(new char[size2]);
806 memset(buf2.get(), 'b', size2);
808 ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
809 // Tell the second write callback to close the connection when it is done
810 wcb2.successCallback = [&] { socket->closeNow(); };
813 // Schedule one write() immediately, before the connect finishes
814 scoped_array<char> buf1(new char[size1]);
815 memset(buf1.get(), 'a', size1);
818 socket->write(&wcb1, buf1.get(), size1);
822 // immediately perform a close, before connect() completes
826 // Start reading from the other endpoint after 10ms.
827 // If we're using large buffers, we have to read so that the writes don't
829 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
831 rcb.dataAvailableCallback = std::bind(tmpDisableReads,
832 acceptedSocket.get(), &rcb);
833 socket->getEventBase()->tryRunAfterDelay(
834 std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
837 // Loop. We don't bother accepting on the server socket yet.
838 // The kernel should be able to buffer the write request so it can succeed.
841 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
843 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
846 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
851 // Make sure the read callback received all of the data
852 size_t bytesRead = 0;
853 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
854 it != rcb.buffers.end();
856 size_t start = bytesRead;
857 bytesRead += it->length;
858 size_t end = bytesRead;
860 size_t cmpLen = min(size1, end) - start;
861 CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
863 if (end > size1 && end <= size1 + size2) {
867 if (start >= size1) {
869 buf2Offset = start - size1;
870 cmpLen = end - start;
872 itOffset = size1 - start;
874 cmpLen = end - size1;
876 CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
881 CHECK_EQ(bytesRead, size1 + size2);
884 TEST(AsyncSocketTest, ConnectCallbackWrite) {
885 // Test using small writes that should both succeed immediately
886 testConnectOptWrite(100, 200);
888 // Test using a large buffer in the connect callback, that should block
889 const size_t largeSize = 8*1024*1024;
890 testConnectOptWrite(100, largeSize);
892 // Test using a large initial write
893 testConnectOptWrite(largeSize, 100);
895 // Test using two large buffers
896 testConnectOptWrite(largeSize, largeSize);
898 // Test a small write in the connect callback,
899 // but no immediate write before connect completes
900 testConnectOptWrite(0, 64);
902 // Test a large write in the connect callback,
903 // but no immediate write before connect completes
904 testConnectOptWrite(0, largeSize);
906 // Test connect, a small write, then immediately call close() before connect
908 testConnectOptWrite(211, 0, true);
910 // Test connect, a large immediate write (that will block), then immediately
911 // call close() before connect completes
912 testConnectOptWrite(largeSize, 0, true);
915 ///////////////////////////////////////////////////////////////////////////
916 // write() related tests
917 ///////////////////////////////////////////////////////////////////////////
920 * Test writing using a nullptr callback
922 TEST(AsyncSocketTest, WriteNullCallback) {
927 std::shared_ptr<AsyncSocket> socket =
928 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
929 evb.loop(); // loop until the socket is connected
931 // write() with a nullptr callback
933 memset(buf, 'a', sizeof(buf));
934 socket->write(nullptr, buf, sizeof(buf));
936 evb.loop(); // loop until the data is sent
938 // Make sure the server got a connection and received the data
940 server.verifyConnection(buf, sizeof(buf));
942 ASSERT_TRUE(socket->isClosedBySelf());
943 ASSERT_FALSE(socket->isClosedByPeer());
947 * Test writing with a send timeout
949 TEST(AsyncSocketTest, WriteTimeout) {
954 std::shared_ptr<AsyncSocket> socket =
955 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
956 evb.loop(); // loop until the socket is connected
958 // write() a large chunk of data, with no-one on the other end reading
959 size_t writeLength = 8*1024*1024;
960 uint32_t timeout = 200;
961 socket->setSendTimeout(timeout);
962 scoped_array<char> buf(new char[writeLength]);
963 memset(buf.get(), 'a', writeLength);
965 socket->write(&wcb, buf.get(), writeLength);
971 // Make sure the write attempt timed out as requested
972 CHECK_EQ(wcb.state, STATE_FAILED);
973 CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
975 // Check that the write timed out within a reasonable period of time.
976 // We don't check for exactly the specified timeout, since AsyncSocket only
977 // times out when it hasn't made progress for that period of time.
979 // On linux, the first write sends a few hundred kb of data, then blocks for
980 // writability, and then unblocks again after 40ms and is able to write
981 // another smaller of data before blocking permanently. Therefore it doesn't
982 // time out until 40ms + timeout.
984 // I haven't fully verified the cause of this, but I believe it probably
985 // occurs because the receiving end delays sending an ack for up to 40ms.
986 // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
987 // the ack, it can send some more data. However, after that point the
988 // receiver's kernel buffer is full. This 40ms delay happens even with
989 // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
990 // kernel may be automatically disabling TCP_QUICKACK after receiving some
993 // For now, we simply check that the timeout occurred within 160ms of
994 // the requested value.
995 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
999 * Test writing to a socket that the remote endpoint has closed
1001 TEST(AsyncSocketTest, WritePipeError) {
1006 std::shared_ptr<AsyncSocket> socket =
1007 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1008 socket->setSendTimeout(1000);
1009 evb.loop(); // loop until the socket is connected
1011 // accept and immediately close the socket
1012 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1013 acceptedSocket.reset();
1015 // write() a large chunk of data
1016 size_t writeLength = 8*1024*1024;
1017 scoped_array<char> buf(new char[writeLength]);
1018 memset(buf.get(), 'a', writeLength);
1020 socket->write(&wcb, buf.get(), writeLength);
1024 // Make sure the write failed.
1025 // It would be nice if AsyncSocketException could convey the errno value,
1026 // so that we could check for EPIPE
1027 CHECK_EQ(wcb.state, STATE_FAILED);
1028 CHECK_EQ(wcb.exception.getType(),
1029 AsyncSocketException::INTERNAL_ERROR);
1031 ASSERT_FALSE(socket->isClosedBySelf());
1032 ASSERT_FALSE(socket->isClosedByPeer());
1036 * Test writing a mix of simple buffers and IOBufs
1038 TEST(AsyncSocketTest, WriteIOBuf) {
1043 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1045 socket->connect(&ccb, server.getAddress(), 30);
1047 // Accept the connection
1048 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1050 acceptedSocket->setReadCB(&rcb);
1052 // Write a simple buffer to the socket
1053 size_t simpleBufLength = 5;
1054 char simpleBuf[simpleBufLength];
1055 memset(simpleBuf, 'a', simpleBufLength);
1057 socket->write(&wcb, simpleBuf, simpleBufLength);
1059 // Write a single-element IOBuf chain
1060 size_t buf1Length = 7;
1061 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1062 memset(buf1->writableData(), 'b', buf1Length);
1063 buf1->append(buf1Length);
1064 unique_ptr<IOBuf> buf1Copy(buf1->clone());
1066 socket->writeChain(&wcb2, std::move(buf1));
1068 // Write a multiple-element IOBuf chain
1069 size_t buf2Length = 11;
1070 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1071 memset(buf2->writableData(), 'c', buf2Length);
1072 buf2->append(buf2Length);
1073 size_t buf3Length = 13;
1074 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1075 memset(buf3->writableData(), 'd', buf3Length);
1076 buf3->append(buf3Length);
1077 buf2->appendChain(std::move(buf3));
1078 unique_ptr<IOBuf> buf2Copy(buf2->clone());
1079 buf2Copy->coalesce();
1081 socket->writeChain(&wcb3, std::move(buf2));
1082 socket->shutdownWrite();
1084 // Let the reads and writes run to completion
1087 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1088 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1089 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1091 // Make sure the reader got the right data in the right order
1092 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1093 CHECK_EQ(rcb.buffers.size(), 1);
1094 CHECK_EQ(rcb.buffers[0].length,
1095 simpleBufLength + buf1Length + buf2Length + buf3Length);
1097 memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1099 memcmp(rcb.buffers[0].buffer + simpleBufLength,
1100 buf1Copy->data(), buf1Copy->length()), 0);
1102 memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1103 buf2Copy->data(), buf2Copy->length()), 0);
1105 acceptedSocket->close();
1108 ASSERT_TRUE(socket->isClosedBySelf());
1109 ASSERT_FALSE(socket->isClosedByPeer());
1112 TEST(AsyncSocketTest, WriteIOBufCorked) {
1117 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1119 socket->connect(&ccb, server.getAddress(), 30);
1121 // Accept the connection
1122 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1124 acceptedSocket->setReadCB(&rcb);
1126 // Do three writes, 100ms apart, with the "cork" flag set
1127 // on the second write. The reader should see the first write
1128 // arrive by itself, followed by the second and third writes
1129 // arriving together.
1130 size_t buf1Length = 5;
1131 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1132 memset(buf1->writableData(), 'a', buf1Length);
1133 buf1->append(buf1Length);
1134 size_t buf2Length = 7;
1135 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1136 memset(buf2->writableData(), 'b', buf2Length);
1137 buf2->append(buf2Length);
1138 size_t buf3Length = 11;
1139 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1140 memset(buf3->writableData(), 'c', buf3Length);
1141 buf3->append(buf3Length);
1143 socket->writeChain(&wcb1, std::move(buf1));
1145 DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1146 write2.scheduleTimeout(100);
1148 DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1149 write3.scheduleTimeout(200);
1152 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1153 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1154 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1155 if (wcb3.state != STATE_SUCCEEDED) {
1156 throw(wcb3.exception);
1158 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1160 // Make sure the reader got the data with the right grouping
1161 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1162 CHECK_EQ(rcb.buffers.size(), 2);
1163 CHECK_EQ(rcb.buffers[0].length, buf1Length);
1164 CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1166 acceptedSocket->close();
1169 ASSERT_TRUE(socket->isClosedBySelf());
1170 ASSERT_FALSE(socket->isClosedByPeer());
1174 * Test performing a zero-length write
1176 TEST(AsyncSocketTest, ZeroLengthWrite) {
1181 std::shared_ptr<AsyncSocket> socket =
1182 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1183 evb.loop(); // loop until the socket is connected
1185 auto acceptedSocket = server.acceptAsync(&evb);
1187 acceptedSocket->setReadCB(&rcb);
1189 size_t len1 = 1024*1024;
1190 size_t len2 = 1024*1024;
1191 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1192 memset(buf.get(), 'a', len1);
1193 memset(buf.get(), 'b', len2);
1199 socket->write(&wcb1, buf.get(), 0);
1200 socket->write(&wcb2, buf.get(), len1);
1201 socket->write(&wcb3, buf.get() + len1, 0);
1202 socket->write(&wcb4, buf.get() + len1, len2);
1205 evb.loop(); // loop until the data is sent
1207 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1208 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1209 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1210 CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1211 rcb.verifyData(buf.get(), len1 + len2);
1213 ASSERT_TRUE(socket->isClosedBySelf());
1214 ASSERT_FALSE(socket->isClosedByPeer());
1217 TEST(AsyncSocketTest, ZeroLengthWritev) {
1222 std::shared_ptr<AsyncSocket> socket =
1223 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1224 evb.loop(); // loop until the socket is connected
1226 auto acceptedSocket = server.acceptAsync(&evb);
1228 acceptedSocket->setReadCB(&rcb);
1230 size_t len1 = 1024*1024;
1231 size_t len2 = 1024*1024;
1232 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1233 memset(buf.get(), 'a', len1);
1234 memset(buf.get(), 'b', len2);
1237 size_t iovCount = 4;
1238 struct iovec iov[iovCount];
1239 iov[0].iov_base = buf.get();
1240 iov[0].iov_len = len1;
1241 iov[1].iov_base = buf.get() + len1;
1243 iov[2].iov_base = buf.get() + len1;
1244 iov[2].iov_len = len2;
1245 iov[3].iov_base = buf.get() + len1 + len2;
1248 socket->writev(&wcb, iov, iovCount);
1250 evb.loop(); // loop until the data is sent
1252 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1253 rcb.verifyData(buf.get(), len1 + len2);
1255 ASSERT_TRUE(socket->isClosedBySelf());
1256 ASSERT_FALSE(socket->isClosedByPeer());
1259 ///////////////////////////////////////////////////////////////////////////
1260 // close() related tests
1261 ///////////////////////////////////////////////////////////////////////////
1264 * Test calling close() with pending writes when the socket is already closing.
1266 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1271 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1273 socket->connect(&ccb, server.getAddress(), 30);
1275 // accept the socket on the server side
1276 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1278 // Loop to ensure the connect has completed
1281 // Make sure we are connected
1282 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1284 // Schedule pending writes, until several write attempts have blocked
1286 memset(buf, 'a', sizeof(buf));
1287 typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1288 WriteCallbackVector writeCallbacks;
1290 writeCallbacks.reserve(5);
1291 while (writeCallbacks.size() < 5) {
1292 std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1294 socket->write(wcb.get(), buf, sizeof(buf));
1295 if (wcb->state == STATE_SUCCEEDED) {
1296 // Succeeded immediately. Keep performing more writes
1300 // This write is blocked.
1301 // Have the write callback call close() when writeError() is invoked
1302 wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1303 writeCallbacks.push_back(wcb);
1306 // Call closeNow() to immediately fail the pending writes
1309 // Make sure writeError() was invoked on all of the pending write callbacks
1310 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1311 it != writeCallbacks.end();
1313 CHECK_EQ((*it)->state, STATE_FAILED);
1316 ASSERT_TRUE(socket->isClosedBySelf());
1317 ASSERT_FALSE(socket->isClosedByPeer());
1320 ///////////////////////////////////////////////////////////////////////////
1321 // ImmediateRead related tests
1322 ///////////////////////////////////////////////////////////////////////////
1324 /* AsyncSocket use to verify immediate read works */
1325 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1327 bool immediateReadCalled = false;
1328 explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1330 void checkForImmediateRead() noexcept override {
1331 immediateReadCalled = true;
1332 AsyncSocket::handleRead();
1336 TEST(AsyncSocket, ConnectReadImmediateRead) {
1339 const size_t maxBufferSz = 100;
1340 const size_t maxReadsPerEvent = 1;
1341 const size_t expectedDataSz = maxBufferSz * 3;
1342 char expectedData[expectedDataSz];
1343 memset(expectedData, 'j', expectedDataSz);
1346 ReadCallback rcb(maxBufferSz);
1347 AsyncSocketImmediateRead socket(&evb);
1348 socket.connect(nullptr, server.getAddress(), 30);
1350 evb.loop(); // loop until the socket is connected
1352 socket.setReadCB(&rcb);
1353 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1354 socket.immediateReadCalled = false;
1356 auto acceptedSocket = server.acceptAsync(&evb);
1358 ReadCallback rcbServer;
1359 WriteCallback wcbServer;
1360 rcbServer.dataAvailableCallback = [&]() {
1361 if (rcbServer.dataRead() == expectedDataSz) {
1362 // write back all data read
1363 rcbServer.verifyData(expectedData, expectedDataSz);
1364 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1365 acceptedSocket->close();
1368 acceptedSocket->setReadCB(&rcbServer);
1372 socket.write(&wcb1, expectedData, expectedDataSz);
1374 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1375 rcb.verifyData(expectedData, expectedDataSz);
1376 CHECK_EQ(socket.immediateReadCalled, true);
1378 ASSERT_FALSE(socket.isClosedBySelf());
1379 ASSERT_FALSE(socket.isClosedByPeer());
1382 TEST(AsyncSocket, ConnectReadUninstallRead) {
1385 const size_t maxBufferSz = 100;
1386 const size_t maxReadsPerEvent = 1;
1387 const size_t expectedDataSz = maxBufferSz * 3;
1388 char expectedData[expectedDataSz];
1389 memset(expectedData, 'k', expectedDataSz);
1392 ReadCallback rcb(maxBufferSz);
1393 AsyncSocketImmediateRead socket(&evb);
1394 socket.connect(nullptr, server.getAddress(), 30);
1396 evb.loop(); // loop until the socket is connected
1398 socket.setReadCB(&rcb);
1399 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1400 socket.immediateReadCalled = false;
1402 auto acceptedSocket = server.acceptAsync(&evb);
1404 ReadCallback rcbServer;
1405 WriteCallback wcbServer;
1406 rcbServer.dataAvailableCallback = [&]() {
1407 if (rcbServer.dataRead() == expectedDataSz) {
1408 // write back all data read
1409 rcbServer.verifyData(expectedData, expectedDataSz);
1410 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1411 acceptedSocket->close();
1414 acceptedSocket->setReadCB(&rcbServer);
1416 rcb.dataAvailableCallback = [&]() {
1417 // we read data and reset readCB
1418 socket.setReadCB(nullptr);
1423 socket.write(&wcb, expectedData, expectedDataSz);
1425 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1427 /* we shoud've only read maxBufferSz data since readCallback_
1428 * was reset in dataAvailableCallback */
1429 CHECK_EQ(rcb.dataRead(), maxBufferSz);
1430 CHECK_EQ(socket.immediateReadCalled, false);
1432 ASSERT_FALSE(socket.isClosedBySelf());
1433 ASSERT_FALSE(socket.isClosedByPeer());
1437 // - Test connect() and have the connect callback set the read callback
1438 // - Test connect() and have the connect callback unset the read callback
1439 // - Test reading/writing/closing/destroying the socket in the connect callback
1440 // - Test reading/writing/closing/destroying the socket in the read callback
1441 // - Test reading/writing/closing/destroying the socket in the write callback
1442 // - Test one-way shutdown behavior
1443 // - Test changing the EventBase
1445 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1446 // in connectSuccess(), readDataAvailable(), writeSuccess()
1449 ///////////////////////////////////////////////////////////////////////////
1450 // AsyncServerSocket tests
1451 ///////////////////////////////////////////////////////////////////////////
1454 * Helper AcceptCallback class for the test code
1455 * It records the callbacks that were invoked, and also supports calling
1456 * generic std::function objects in each callback.
1458 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1467 EventInfo(int fd, const folly::SocketAddress& addr)
1468 : type(TYPE_ACCEPT),
1472 explicit EventInfo(const std::string& msg)
1477 explicit EventInfo(EventType et)
1484 int fd; // valid for TYPE_ACCEPT
1485 folly::SocketAddress address; // valid for TYPE_ACCEPT
1486 string errorMsg; // valid for TYPE_ERROR
1488 typedef std::deque<EventInfo> EventList;
1490 TestAcceptCallback()
1491 : connectionAcceptedFn_(),
1496 std::deque<EventInfo>* getEvents() {
1500 void setConnectionAcceptedFn(
1501 const std::function<void(int, const folly::SocketAddress&)>& fn) {
1502 connectionAcceptedFn_ = fn;
1504 void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1505 acceptErrorFn_ = fn;
1507 void setAcceptStartedFn(const std::function<void()>& fn) {
1508 acceptStartedFn_ = fn;
1510 void setAcceptStoppedFn(const std::function<void()>& fn) {
1511 acceptStoppedFn_ = fn;
1514 void connectionAccepted(
1515 int fd, const folly::SocketAddress& clientAddr) noexcept override {
1516 events_.emplace_back(fd, clientAddr);
1518 if (connectionAcceptedFn_) {
1519 connectionAcceptedFn_(fd, clientAddr);
1522 void acceptError(const std::exception& ex) noexcept override {
1523 events_.emplace_back(ex.what());
1525 if (acceptErrorFn_) {
1529 void acceptStarted() noexcept override {
1530 events_.emplace_back(TYPE_START);
1532 if (acceptStartedFn_) {
1536 void acceptStopped() noexcept override {
1537 events_.emplace_back(TYPE_STOP);
1539 if (acceptStoppedFn_) {
1545 std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1546 std::function<void(const std::exception&)> acceptErrorFn_;
1547 std::function<void()> acceptStartedFn_;
1548 std::function<void()> acceptStoppedFn_;
1550 std::deque<EventInfo> events_;
1554 * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1556 TEST(AsyncSocketTest, ServerAcceptOptions) {
1557 EventBase eventBase;
1559 // Create a server socket
1560 std::shared_ptr<AsyncServerSocket> serverSocket(
1561 AsyncServerSocket::newSocket(&eventBase));
1562 serverSocket->bind(0);
1563 serverSocket->listen(16);
1564 folly::SocketAddress serverAddress;
1565 serverSocket->getAddress(&serverAddress);
1567 // Add a callback to accept one connection then stop the loop
1568 TestAcceptCallback acceptCallback;
1569 acceptCallback.setConnectionAcceptedFn(
1570 [&](int fd, const folly::SocketAddress& addr) {
1571 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1573 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1574 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1576 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1577 serverSocket->startAccepting();
1579 // Connect to the server socket
1580 std::shared_ptr<AsyncSocket> socket(
1581 AsyncSocket::newSocket(&eventBase, serverAddress));
1585 // Verify that the server accepted a connection
1586 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1587 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1588 TestAcceptCallback::TYPE_START);
1589 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1590 TestAcceptCallback::TYPE_ACCEPT);
1591 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1592 TestAcceptCallback::TYPE_STOP);
1593 int fd = acceptCallback.getEvents()->at(1).fd;
1595 // The accepted connection should already be in non-blocking mode
1596 int flags = fcntl(fd, F_GETFL, 0);
1597 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1600 // The accepted connection should already have TCP_NODELAY set
1602 socklen_t valueLength = sizeof(value);
1603 int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1610 * Test AsyncServerSocket::removeAcceptCallback()
1612 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1613 // Create a new AsyncServerSocket
1614 EventBase eventBase;
1615 std::shared_ptr<AsyncServerSocket> serverSocket(
1616 AsyncServerSocket::newSocket(&eventBase));
1617 serverSocket->bind(0);
1618 serverSocket->listen(16);
1619 folly::SocketAddress serverAddress;
1620 serverSocket->getAddress(&serverAddress);
1622 // Add several accept callbacks
1623 TestAcceptCallback cb1;
1624 TestAcceptCallback cb2;
1625 TestAcceptCallback cb3;
1626 TestAcceptCallback cb4;
1627 TestAcceptCallback cb5;
1628 TestAcceptCallback cb6;
1629 TestAcceptCallback cb7;
1631 // Test having callbacks remove other callbacks before them on the list,
1632 // after them on the list, or removing themselves.
1634 // Have callback 2 remove callback 3 and callback 5 the first time it is
1637 cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1638 std::shared_ptr<AsyncSocket> sock2(
1639 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1641 cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1643 cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1644 std::shared_ptr<AsyncSocket> sock3(
1645 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1647 cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1648 std::shared_ptr<AsyncSocket> sock5(
1649 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1652 cb2.setConnectionAcceptedFn(
1653 [&](int fd, const folly::SocketAddress& addr) {
1654 if (cb2Count == 0) {
1655 serverSocket->removeAcceptCallback(&cb3, nullptr);
1656 serverSocket->removeAcceptCallback(&cb5, nullptr);
1660 // Have callback 6 remove callback 4 the first time it is called,
1661 // and destroy the server socket the second time it is called
1663 cb6.setConnectionAcceptedFn(
1664 [&](int fd, const folly::SocketAddress& addr) {
1665 if (cb6Count == 0) {
1666 serverSocket->removeAcceptCallback(&cb4, nullptr);
1667 std::shared_ptr<AsyncSocket> sock6(
1668 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1669 std::shared_ptr<AsyncSocket> sock7(
1670 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1671 std::shared_ptr<AsyncSocket> sock8(
1672 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1675 serverSocket.reset();
1679 // Have callback 7 remove itself
1680 cb7.setConnectionAcceptedFn(
1681 [&](int fd, const folly::SocketAddress& addr) {
1682 serverSocket->removeAcceptCallback(&cb7, nullptr);
1685 serverSocket->addAcceptCallback(&cb1, nullptr);
1686 serverSocket->addAcceptCallback(&cb2, nullptr);
1687 serverSocket->addAcceptCallback(&cb3, nullptr);
1688 serverSocket->addAcceptCallback(&cb4, nullptr);
1689 serverSocket->addAcceptCallback(&cb5, nullptr);
1690 serverSocket->addAcceptCallback(&cb6, nullptr);
1691 serverSocket->addAcceptCallback(&cb7, nullptr);
1692 serverSocket->startAccepting();
1694 // Make several connections to the socket
1695 std::shared_ptr<AsyncSocket> sock1(
1696 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1697 std::shared_ptr<AsyncSocket> sock4(
1698 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1700 // Loop until we are stopped
1703 // Check to make sure that the expected callbacks were invoked.
1705 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1706 // the AcceptCallbacks in round-robin fashion, in the order that they were
1707 // added. The code is implemented this way right now, but the API doesn't
1708 // explicitly require it be done this way. If we change the code not to be
1709 // exactly round robin in the future, we can simplify the test checks here.
1710 // (We'll also need to update the termination code, since we expect cb6 to
1711 // get called twice to terminate the loop.)
1712 CHECK_EQ(cb1.getEvents()->size(), 4);
1713 CHECK_EQ(cb1.getEvents()->at(0).type,
1714 TestAcceptCallback::TYPE_START);
1715 CHECK_EQ(cb1.getEvents()->at(1).type,
1716 TestAcceptCallback::TYPE_ACCEPT);
1717 CHECK_EQ(cb1.getEvents()->at(2).type,
1718 TestAcceptCallback::TYPE_ACCEPT);
1719 CHECK_EQ(cb1.getEvents()->at(3).type,
1720 TestAcceptCallback::TYPE_STOP);
1722 CHECK_EQ(cb2.getEvents()->size(), 4);
1723 CHECK_EQ(cb2.getEvents()->at(0).type,
1724 TestAcceptCallback::TYPE_START);
1725 CHECK_EQ(cb2.getEvents()->at(1).type,
1726 TestAcceptCallback::TYPE_ACCEPT);
1727 CHECK_EQ(cb2.getEvents()->at(2).type,
1728 TestAcceptCallback::TYPE_ACCEPT);
1729 CHECK_EQ(cb2.getEvents()->at(3).type,
1730 TestAcceptCallback::TYPE_STOP);
1732 CHECK_EQ(cb3.getEvents()->size(), 2);
1733 CHECK_EQ(cb3.getEvents()->at(0).type,
1734 TestAcceptCallback::TYPE_START);
1735 CHECK_EQ(cb3.getEvents()->at(1).type,
1736 TestAcceptCallback::TYPE_STOP);
1738 CHECK_EQ(cb4.getEvents()->size(), 3);
1739 CHECK_EQ(cb4.getEvents()->at(0).type,
1740 TestAcceptCallback::TYPE_START);
1741 CHECK_EQ(cb4.getEvents()->at(1).type,
1742 TestAcceptCallback::TYPE_ACCEPT);
1743 CHECK_EQ(cb4.getEvents()->at(2).type,
1744 TestAcceptCallback::TYPE_STOP);
1746 CHECK_EQ(cb5.getEvents()->size(), 2);
1747 CHECK_EQ(cb5.getEvents()->at(0).type,
1748 TestAcceptCallback::TYPE_START);
1749 CHECK_EQ(cb5.getEvents()->at(1).type,
1750 TestAcceptCallback::TYPE_STOP);
1752 CHECK_EQ(cb6.getEvents()->size(), 4);
1753 CHECK_EQ(cb6.getEvents()->at(0).type,
1754 TestAcceptCallback::TYPE_START);
1755 CHECK_EQ(cb6.getEvents()->at(1).type,
1756 TestAcceptCallback::TYPE_ACCEPT);
1757 CHECK_EQ(cb6.getEvents()->at(2).type,
1758 TestAcceptCallback::TYPE_ACCEPT);
1759 CHECK_EQ(cb6.getEvents()->at(3).type,
1760 TestAcceptCallback::TYPE_STOP);
1762 CHECK_EQ(cb7.getEvents()->size(), 3);
1763 CHECK_EQ(cb7.getEvents()->at(0).type,
1764 TestAcceptCallback::TYPE_START);
1765 CHECK_EQ(cb7.getEvents()->at(1).type,
1766 TestAcceptCallback::TYPE_ACCEPT);
1767 CHECK_EQ(cb7.getEvents()->at(2).type,
1768 TestAcceptCallback::TYPE_STOP);
1772 * Test AsyncServerSocket::removeAcceptCallback()
1774 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1775 // Create a new AsyncServerSocket
1776 EventBase eventBase;
1777 std::shared_ptr<AsyncServerSocket> serverSocket(
1778 AsyncServerSocket::newSocket(&eventBase));
1779 serverSocket->bind(0);
1780 serverSocket->listen(16);
1781 folly::SocketAddress serverAddress;
1782 serverSocket->getAddress(&serverAddress);
1784 // Add several accept callbacks
1785 TestAcceptCallback cb1;
1786 auto thread_id = pthread_self();
1787 cb1.setAcceptStartedFn([&](){
1788 CHECK_NE(thread_id, pthread_self());
1789 thread_id = pthread_self();
1791 cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1792 CHECK_EQ(thread_id, pthread_self());
1793 serverSocket->removeAcceptCallback(&cb1, nullptr);
1795 cb1.setAcceptStoppedFn([&](){
1796 CHECK_EQ(thread_id, pthread_self());
1799 // Test having callbacks remove other callbacks before them on the list,
1800 serverSocket->addAcceptCallback(&cb1, nullptr);
1801 serverSocket->startAccepting();
1803 // Make several connections to the socket
1804 std::shared_ptr<AsyncSocket> sock1(
1805 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1807 // Loop in another thread
1808 auto other = std::thread([&](){
1813 // Check to make sure that the expected callbacks were invoked.
1815 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1816 // the AcceptCallbacks in round-robin fashion, in the order that they were
1817 // added. The code is implemented this way right now, but the API doesn't
1818 // explicitly require it be done this way. If we change the code not to be
1819 // exactly round robin in the future, we can simplify the test checks here.
1820 // (We'll also need to update the termination code, since we expect cb6 to
1821 // get called twice to terminate the loop.)
1822 CHECK_EQ(cb1.getEvents()->size(), 3);
1823 CHECK_EQ(cb1.getEvents()->at(0).type,
1824 TestAcceptCallback::TYPE_START);
1825 CHECK_EQ(cb1.getEvents()->at(1).type,
1826 TestAcceptCallback::TYPE_ACCEPT);
1827 CHECK_EQ(cb1.getEvents()->at(2).type,
1828 TestAcceptCallback::TYPE_STOP);
1832 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1833 // Add a callback to accept one connection then stop accepting
1834 TestAcceptCallback acceptCallback;
1835 acceptCallback.setConnectionAcceptedFn(
1836 [&](int fd, const folly::SocketAddress& addr) {
1837 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1839 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1840 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1842 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1843 serverSocket->startAccepting();
1845 // Connect to the server socket
1846 EventBase* eventBase = serverSocket->getEventBase();
1847 folly::SocketAddress serverAddress;
1848 serverSocket->getAddress(&serverAddress);
1849 AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1851 // Loop to process all events
1854 // Verify that the server accepted a connection
1855 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1856 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1857 TestAcceptCallback::TYPE_START);
1858 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1859 TestAcceptCallback::TYPE_ACCEPT);
1860 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1861 TestAcceptCallback::TYPE_STOP);
1864 /* Verify that we don't leak sockets if we are destroyed()
1865 * and there are still writes pending
1867 * If destroy() only calls close() instead of closeNow(),
1868 * it would shutdown(writes) on the socket, but it would
1869 * never be close()'d, and the socket would leak
1871 TEST(AsyncSocketTest, DestroyCloseTest) {
1877 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1879 socket->connect(&ccb, server.getAddress(), 30);
1881 // Accept the connection
1882 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1884 acceptedSocket->setReadCB(&rcb);
1886 // Write a large buffer to the socket that is larger than kernel buffer
1887 size_t simpleBufLength = 5000000;
1888 char* simpleBuf = new char[simpleBufLength];
1889 memset(simpleBuf, 'a', simpleBufLength);
1892 // Let the reads and writes run to completion
1893 int fd = acceptedSocket->getFd();
1895 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1897 acceptedSocket.reset();
1899 // Test that server socket was closed
1900 ssize_t sz = read(fd, simpleBuf, simpleBufLength);
1907 * Test AsyncServerSocket::useExistingSocket()
1909 TEST(AsyncSocketTest, ServerExistingSocket) {
1910 EventBase eventBase;
1912 // Test creating a socket, and letting AsyncServerSocket bind and listen
1914 // Manually create a socket
1915 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1918 // Create a server socket
1919 AsyncServerSocket::UniquePtr serverSocket(
1920 new AsyncServerSocket(&eventBase));
1921 serverSocket->useExistingSocket(fd);
1922 folly::SocketAddress address;
1923 serverSocket->getAddress(&address);
1925 serverSocket->bind(address);
1926 serverSocket->listen(16);
1928 // Make sure the socket works
1929 serverSocketSanityTest(serverSocket.get());
1932 // Test creating a socket and binding manually,
1933 // then letting AsyncServerSocket listen
1935 // Manually create a socket
1936 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1939 struct sockaddr_in addr;
1940 addr.sin_family = AF_INET;
1942 addr.sin_addr.s_addr = INADDR_ANY;
1943 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1945 // Look up the address that we bound to
1946 folly::SocketAddress boundAddress;
1947 boundAddress.setFromLocalAddress(fd);
1949 // Create a server socket
1950 AsyncServerSocket::UniquePtr serverSocket(
1951 new AsyncServerSocket(&eventBase));
1952 serverSocket->useExistingSocket(fd);
1953 serverSocket->listen(16);
1955 // Make sure AsyncServerSocket reports the same address that we bound to
1956 folly::SocketAddress serverSocketAddress;
1957 serverSocket->getAddress(&serverSocketAddress);
1958 CHECK_EQ(boundAddress, serverSocketAddress);
1960 // Make sure the socket works
1961 serverSocketSanityTest(serverSocket.get());
1964 // Test creating a socket, binding and listening manually,
1965 // then giving it to AsyncServerSocket
1967 // Manually create a socket
1968 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1971 struct sockaddr_in addr;
1972 addr.sin_family = AF_INET;
1974 addr.sin_addr.s_addr = INADDR_ANY;
1975 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1977 // Look up the address that we bound to
1978 folly::SocketAddress boundAddress;
1979 boundAddress.setFromLocalAddress(fd);
1981 CHECK_EQ(listen(fd, 16), 0);
1983 // Create a server socket
1984 AsyncServerSocket::UniquePtr serverSocket(
1985 new AsyncServerSocket(&eventBase));
1986 serverSocket->useExistingSocket(fd);
1988 // Make sure AsyncServerSocket reports the same address that we bound to
1989 folly::SocketAddress serverSocketAddress;
1990 serverSocket->getAddress(&serverSocketAddress);
1991 CHECK_EQ(boundAddress, serverSocketAddress);
1993 // Make sure the socket works
1994 serverSocketSanityTest(serverSocket.get());
1998 TEST(AsyncSocketTest, UnixDomainSocketTest) {
1999 EventBase eventBase;
2001 // Create a server socket
2002 std::shared_ptr<AsyncServerSocket> serverSocket(
2003 AsyncServerSocket::newSocket(&eventBase));
2005 path.append("/anonymous");
2006 folly::SocketAddress serverAddress;
2007 serverAddress.setFromPath(path);
2008 serverSocket->bind(serverAddress);
2009 serverSocket->listen(16);
2011 // Add a callback to accept one connection then stop the loop
2012 TestAcceptCallback acceptCallback;
2013 acceptCallback.setConnectionAcceptedFn(
2014 [&](int fd, const folly::SocketAddress& addr) {
2015 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2017 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
2018 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2020 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2021 serverSocket->startAccepting();
2023 // Connect to the server socket
2024 std::shared_ptr<AsyncSocket> socket(
2025 AsyncSocket::newSocket(&eventBase, serverAddress));
2029 // Verify that the server accepted a connection
2030 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2031 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2032 TestAcceptCallback::TYPE_START);
2033 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2034 TestAcceptCallback::TYPE_ACCEPT);
2035 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2036 TestAcceptCallback::TYPE_STOP);
2037 int fd = acceptCallback.getEvents()->at(1).fd;
2039 // The accepted connection should already be in non-blocking mode
2040 int flags = fcntl(fd, F_GETFL, 0);
2041 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);