2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/ExceptionWrapper.h>
17 #include <folly/RWSpinLock.h>
18 #include <folly/Random.h>
19 #include <folly/SocketAddress.h>
20 #include <folly/io/async/AsyncServerSocket.h>
21 #include <folly/io/async/AsyncSocket.h>
22 #include <folly/io/async/AsyncTimeout.h>
23 #include <folly/io/async/EventBase.h>
25 #include <folly/io/IOBuf.h>
26 #include <folly/io/async/test/AsyncSocketTest.h>
27 #include <folly/io/async/test/Util.h>
28 #include <folly/portability/Sockets.h>
29 #include <folly/portability/Unistd.h>
30 #include <folly/test/SocketAddressTestHelper.h>
32 #include <boost/scoped_array.hpp>
34 #include <gmock/gmock.h>
35 #include <gtest/gtest.h>
36 #include <sys/types.h>
40 using namespace boost;
47 using std::unique_ptr;
48 using std::chrono::milliseconds;
49 using boost::scoped_array;
51 using namespace folly;
52 using namespace testing;
54 class DelayedWrite: public AsyncTimeout {
56 DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
57 unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
58 bool cork, bool lastWrite = false):
59 AsyncTimeout(socket->getEventBase()),
61 bufs_(std::move(bufs)),
64 lastWrite_(lastWrite) {}
67 void timeoutExpired() noexcept override {
68 WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
69 socket_->writeChain(wcb_, std::move(bufs_), flags);
71 socket_->shutdownWrite();
75 std::shared_ptr<AsyncSocket> socket_;
76 unique_ptr<IOBuf> bufs_;
77 AsyncTransportWrapper::WriteCallback* wcb_;
82 ///////////////////////////////////////////////////////////////////////////
84 ///////////////////////////////////////////////////////////////////////////
87 * Test connecting to a server
89 TEST(AsyncSocketTest, Connect) {
90 // Start listening on a local port
93 // Connect using a AsyncSocket
95 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
97 socket->connect(&cb, server.getAddress(), 30);
101 CHECK_EQ(cb.state, STATE_SUCCEEDED);
102 EXPECT_LE(0, socket->getConnectTime().count());
103 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
106 enum class TFOState {
111 class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
113 std::vector<TFOState> getTestingValues() {
114 std::vector<TFOState> vals;
115 vals.emplace_back(TFOState::DISABLED);
118 vals.emplace_back(TFOState::ENABLED);
123 INSTANTIATE_TEST_CASE_P(
125 AsyncSocketConnectTest,
126 ::testing::ValuesIn(getTestingValues()));
129 * Test connecting to a server that isn't listening
131 TEST(AsyncSocketTest, ConnectRefused) {
134 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
136 // Hopefully nothing is actually listening on this address
137 folly::SocketAddress addr("127.0.0.1", 65535);
139 socket->connect(&cb, addr, 30);
143 EXPECT_EQ(STATE_FAILED, cb.state);
144 EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
145 EXPECT_LE(0, socket->getConnectTime().count());
146 EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
150 * Test connection timeout
152 TEST(AsyncSocketTest, ConnectTimeout) {
155 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
157 // Try connecting to server that won't respond.
159 // This depends somewhat on the network where this test is run.
160 // Hopefully this IP will be routable but unresponsive.
161 // (Alternatively, we could try listening on a local raw socket, but that
162 // normally requires root privileges.)
164 SocketAddressTestHelper::isIPv6Enabled() ?
165 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
166 SocketAddressTestHelper::isIPv4Enabled() ?
167 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
169 SocketAddress addr(host, 65535);
171 socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
175 CHECK_EQ(cb.state, STATE_FAILED);
176 CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
178 // Verify that we can still get the peer address after a timeout.
179 // Use case is if the client was created from a client pool, and we want
180 // to log which peer failed.
181 folly::SocketAddress peer;
182 socket->getPeerAddress(&peer);
183 CHECK_EQ(peer, addr);
184 EXPECT_LE(0, socket->getConnectTime().count());
185 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
189 * Test writing immediately after connecting, without waiting for connect
192 TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
197 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
199 if (GetParam() == TFOState::ENABLED) {
204 socket->connect(&ccb, server.getAddress(), 30);
208 memset(buf, 'a', sizeof(buf));
210 socket->write(&wcb, buf, sizeof(buf));
212 // Loop. We don't bother accepting on the server socket yet.
213 // The kernel should be able to buffer the write request so it can succeed.
216 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
217 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
219 // Make sure the server got a connection and received the data
221 server.verifyConnection(buf, sizeof(buf));
223 ASSERT_TRUE(socket->isClosedBySelf());
224 ASSERT_FALSE(socket->isClosedByPeer());
225 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
229 * Test connecting using a nullptr connect callback.
231 TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
236 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
237 if (GetParam() == TFOState::ENABLED) {
241 socket->connect(nullptr, server.getAddress(), 30);
243 // write some data, just so we have some way of verifing
244 // that the socket works correctly after connecting
246 memset(buf, 'a', sizeof(buf));
248 socket->write(&wcb, buf, sizeof(buf));
252 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
254 // Make sure the server got a connection and received the data
256 server.verifyConnection(buf, sizeof(buf));
258 ASSERT_TRUE(socket->isClosedBySelf());
259 ASSERT_FALSE(socket->isClosedByPeer());
263 * Test calling both write() and close() immediately after connecting, without
264 * waiting for connect to finish.
266 * This exercises the STATE_CONNECTING_CLOSING code.
268 TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
273 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
274 if (GetParam() == TFOState::ENABLED) {
278 socket->connect(&ccb, server.getAddress(), 30);
282 memset(buf, 'a', sizeof(buf));
284 socket->write(&wcb, buf, sizeof(buf));
289 // Loop. We don't bother accepting on the server socket yet.
290 // The kernel should be able to buffer the write request so it can succeed.
293 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
294 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
296 // Make sure the server got a connection and received the data
297 server.verifyConnection(buf, sizeof(buf));
299 ASSERT_TRUE(socket->isClosedBySelf());
300 ASSERT_FALSE(socket->isClosedByPeer());
304 * Test calling close() immediately after connect()
306 TEST(AsyncSocketTest, ConnectAndClose) {
309 // Connect using a AsyncSocket
311 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
313 socket->connect(&ccb, server.getAddress(), 30);
315 // Hopefully the connect didn't succeed immediately.
316 // If it did, we can't exercise the close-while-connecting code path.
317 if (ccb.state == STATE_SUCCEEDED) {
318 LOG(INFO) << "connect() succeeded immediately; aborting test "
319 "of close-during-connect behavior";
325 // Loop, although there shouldn't be anything to do.
328 // Make sure the connection was aborted
329 CHECK_EQ(ccb.state, STATE_FAILED);
331 ASSERT_TRUE(socket->isClosedBySelf());
332 ASSERT_FALSE(socket->isClosedByPeer());
336 * Test calling closeNow() immediately after connect()
338 * This should be identical to the normal close behavior.
340 TEST(AsyncSocketTest, ConnectAndCloseNow) {
343 // Connect using a AsyncSocket
345 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
347 socket->connect(&ccb, server.getAddress(), 30);
349 // Hopefully the connect didn't succeed immediately.
350 // If it did, we can't exercise the close-while-connecting code path.
351 if (ccb.state == STATE_SUCCEEDED) {
352 LOG(INFO) << "connect() succeeded immediately; aborting test "
353 "of closeNow()-during-connect behavior";
359 // Loop, although there shouldn't be anything to do.
362 // Make sure the connection was aborted
363 CHECK_EQ(ccb.state, STATE_FAILED);
365 ASSERT_TRUE(socket->isClosedBySelf());
366 ASSERT_FALSE(socket->isClosedByPeer());
370 * Test calling both write() and closeNow() immediately after connecting,
371 * without waiting for connect to finish.
373 * This should abort the pending write.
375 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
380 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
382 socket->connect(&ccb, server.getAddress(), 30);
384 // Hopefully the connect didn't succeed immediately.
385 // If it did, we can't exercise the close-while-connecting code path.
386 if (ccb.state == STATE_SUCCEEDED) {
387 LOG(INFO) << "connect() succeeded immediately; aborting test "
388 "of write-during-connect behavior";
394 memset(buf, 'a', sizeof(buf));
396 socket->write(&wcb, buf, sizeof(buf));
401 // Loop, although there shouldn't be anything to do.
404 CHECK_EQ(ccb.state, STATE_FAILED);
405 CHECK_EQ(wcb.state, STATE_FAILED);
407 ASSERT_TRUE(socket->isClosedBySelf());
408 ASSERT_FALSE(socket->isClosedByPeer());
412 * Test installing a read callback immediately, before connect() finishes.
414 TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
419 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
420 if (GetParam() == TFOState::ENABLED) {
425 socket->connect(&ccb, server.getAddress(), 30);
428 socket->setReadCB(&rcb);
430 if (GetParam() == TFOState::ENABLED) {
431 // Trigger a connection
432 socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
435 // Even though we haven't looped yet, we should be able to accept
436 // the connection and send data to it.
437 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
439 memset(buf, 'a', sizeof(buf));
440 acceptedSocket->write(buf, sizeof(buf));
441 acceptedSocket->flush();
442 acceptedSocket->close();
444 // Loop, although there shouldn't be anything to do.
447 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
448 CHECK_EQ(rcb.buffers.size(), 1);
449 CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
450 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
452 ASSERT_FALSE(socket->isClosedBySelf());
453 ASSERT_FALSE(socket->isClosedByPeer());
457 * Test installing a read callback and then closing immediately before the
458 * connect attempt finishes.
460 TEST(AsyncSocketTest, ConnectReadAndClose) {
465 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
467 socket->connect(&ccb, server.getAddress(), 30);
469 // Hopefully the connect didn't succeed immediately.
470 // If it did, we can't exercise the close-while-connecting code path.
471 if (ccb.state == STATE_SUCCEEDED) {
472 LOG(INFO) << "connect() succeeded immediately; aborting test "
473 "of read-during-connect behavior";
478 socket->setReadCB(&rcb);
483 // Loop, although there shouldn't be anything to do.
486 CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
487 CHECK_EQ(rcb.buffers.size(), 0);
488 CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
490 ASSERT_TRUE(socket->isClosedBySelf());
491 ASSERT_FALSE(socket->isClosedByPeer());
495 * Test both writing and installing a read callback immediately,
496 * before connect() finishes.
498 TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
503 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
504 if (GetParam() == TFOState::ENABLED) {
508 socket->connect(&ccb, server.getAddress(), 30);
512 memset(buf1, 'a', sizeof(buf1));
514 socket->write(&wcb, buf1, sizeof(buf1));
516 // set a read callback
518 socket->setReadCB(&rcb);
520 // Even though we haven't looped yet, we should be able to accept
521 // the connection and send data to it.
522 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
524 memset(buf2, 'b', sizeof(buf2));
525 acceptedSocket->write(buf2, sizeof(buf2));
526 acceptedSocket->flush();
528 // shut down the write half of acceptedSocket, so that the AsyncSocket
529 // will stop reading and we can break out of the event loop.
530 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
535 // Make sure the connect succeeded
536 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
538 // Make sure the AsyncSocket read the data written by the accepted socket
539 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
540 CHECK_EQ(rcb.buffers.size(), 1);
541 CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
542 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
544 // Close the AsyncSocket so we'll see EOF on acceptedSocket
547 // Make sure the accepted socket saw the data written by the AsyncSocket
548 uint8_t readbuf[sizeof(buf1)];
549 acceptedSocket->readAll(readbuf, sizeof(readbuf));
550 CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
551 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
552 CHECK_EQ(bytesRead, 0);
554 ASSERT_FALSE(socket->isClosedBySelf());
555 ASSERT_TRUE(socket->isClosedByPeer());
559 * Test writing to the socket then shutting down writes before the connect
562 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
567 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
569 socket->connect(&ccb, server.getAddress(), 30);
571 // Hopefully the connect didn't succeed immediately.
572 // If it did, we can't exercise the write-while-connecting code path.
573 if (ccb.state == STATE_SUCCEEDED) {
574 LOG(INFO) << "connect() succeeded immediately; skipping test";
578 // Ask to write some data
580 memset(wbuf, 'a', sizeof(wbuf));
582 socket->write(&wcb, wbuf, sizeof(wbuf));
583 socket->shutdownWrite();
586 socket->shutdownWrite();
588 // Even though we haven't looped yet, we should be able to accept
590 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
592 // Since the connection is still in progress, there should be no data to
593 // read yet. Verify that the accepted socket is not readable.
594 struct pollfd fds[1];
595 fds[0].fd = acceptedSocket->getSocketFD();
596 fds[0].events = POLLIN;
598 int rc = poll(fds, 1, 0);
601 // Write data to the accepted socket
602 uint8_t acceptedWbuf[192];
603 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
604 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
605 acceptedSocket->flush();
610 // The loop should have completed the connection, written the queued data,
611 // and shutdown writes on the socket.
613 // Check that the connection was completed successfully and that the write
614 // callback succeeded.
615 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
616 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
618 // Check that we can read the data that was written to the socket, and that
619 // we see an EOF, since its socket was half-shutdown.
620 uint8_t readbuf[sizeof(wbuf)];
621 acceptedSocket->readAll(readbuf, sizeof(readbuf));
622 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
623 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
624 CHECK_EQ(bytesRead, 0);
626 // Close the accepted socket. This will cause it to see EOF
627 // and uninstall the read callback when we loop next.
628 acceptedSocket->close();
630 // Install a read callback, then loop again.
632 socket->setReadCB(&rcb);
635 // This loop should have read the data and seen the EOF
636 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
637 CHECK_EQ(rcb.buffers.size(), 1);
638 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
639 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
640 acceptedWbuf, sizeof(acceptedWbuf)), 0);
642 ASSERT_FALSE(socket->isClosedBySelf());
643 ASSERT_FALSE(socket->isClosedByPeer());
647 * Test reading, writing, and shutting down writes before the connect attempt
650 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
655 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
657 socket->connect(&ccb, server.getAddress(), 30);
659 // Hopefully the connect didn't succeed immediately.
660 // If it did, we can't exercise the write-while-connecting code path.
661 if (ccb.state == STATE_SUCCEEDED) {
662 LOG(INFO) << "connect() succeeded immediately; skipping test";
666 // Install a read callback
668 socket->setReadCB(&rcb);
670 // Ask to write some data
672 memset(wbuf, 'a', sizeof(wbuf));
674 socket->write(&wcb, wbuf, sizeof(wbuf));
677 socket->shutdownWrite();
679 // Even though we haven't looped yet, we should be able to accept
681 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
683 // Since the connection is still in progress, there should be no data to
684 // read yet. Verify that the accepted socket is not readable.
685 struct pollfd fds[1];
686 fds[0].fd = acceptedSocket->getSocketFD();
687 fds[0].events = POLLIN;
689 int rc = poll(fds, 1, 0);
692 // Write data to the accepted socket
693 uint8_t acceptedWbuf[192];
694 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
695 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
696 acceptedSocket->flush();
697 // Shutdown writes to the accepted socket. This will cause it to see EOF
698 // and uninstall the read callback.
699 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
704 // The loop should have completed the connection, written the queued data,
705 // shutdown writes on the socket, read the data we wrote to it, and see the
708 // Check that the connection was completed successfully and that the read
709 // and write callbacks were invoked as expected.
710 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
711 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
712 CHECK_EQ(rcb.buffers.size(), 1);
713 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
714 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
715 acceptedWbuf, sizeof(acceptedWbuf)), 0);
716 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
718 // Check that we can read the data that was written to the socket, and that
719 // we see an EOF, since its socket was half-shutdown.
720 uint8_t readbuf[sizeof(wbuf)];
721 acceptedSocket->readAll(readbuf, sizeof(readbuf));
722 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
723 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
724 CHECK_EQ(bytesRead, 0);
726 // Fully close both sockets
727 acceptedSocket->close();
730 ASSERT_FALSE(socket->isClosedBySelf());
731 ASSERT_TRUE(socket->isClosedByPeer());
735 * Test reading, writing, and calling shutdownWriteNow() before the
736 * connect attempt finishes.
738 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
743 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
745 socket->connect(&ccb, server.getAddress(), 30);
747 // Hopefully the connect didn't succeed immediately.
748 // If it did, we can't exercise the write-while-connecting code path.
749 if (ccb.state == STATE_SUCCEEDED) {
750 LOG(INFO) << "connect() succeeded immediately; skipping test";
754 // Install a read callback
756 socket->setReadCB(&rcb);
758 // Ask to write some data
760 memset(wbuf, 'a', sizeof(wbuf));
762 socket->write(&wcb, wbuf, sizeof(wbuf));
764 // Shutdown writes immediately.
765 // This should immediately discard the data that we just tried to write.
766 socket->shutdownWriteNow();
768 // Verify that writeError() was invoked on the write callback.
769 CHECK_EQ(wcb.state, STATE_FAILED);
770 CHECK_EQ(wcb.bytesWritten, 0);
772 // Even though we haven't looped yet, we should be able to accept
774 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
776 // Since the connection is still in progress, there should be no data to
777 // read yet. Verify that the accepted socket is not readable.
778 struct pollfd fds[1];
779 fds[0].fd = acceptedSocket->getSocketFD();
780 fds[0].events = POLLIN;
782 int rc = poll(fds, 1, 0);
785 // Write data to the accepted socket
786 uint8_t acceptedWbuf[192];
787 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
788 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
789 acceptedSocket->flush();
790 // Shutdown writes to the accepted socket. This will cause it to see EOF
791 // and uninstall the read callback.
792 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
797 // The loop should have completed the connection, written the queued data,
798 // shutdown writes on the socket, read the data we wrote to it, and see the
801 // Check that the connection was completed successfully and that the read
802 // callback was invoked as expected.
803 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
804 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
805 CHECK_EQ(rcb.buffers.size(), 1);
806 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
807 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
808 acceptedWbuf, sizeof(acceptedWbuf)), 0);
810 // Since we used shutdownWriteNow(), it should have discarded all pending
811 // write data. Verify we see an immediate EOF when reading from the accepted
813 uint8_t readbuf[sizeof(wbuf)];
814 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
815 CHECK_EQ(bytesRead, 0);
817 // Fully close both sockets
818 acceptedSocket->close();
821 ASSERT_FALSE(socket->isClosedBySelf());
822 ASSERT_TRUE(socket->isClosedByPeer());
825 // Helper function for use in testConnectOptWrite()
826 // Temporarily disable the read callback
827 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
828 // Uninstall the read callback
829 socket->setReadCB(nullptr);
830 // Schedule the read callback to be reinstalled after 1ms
831 socket->getEventBase()->runInLoop(
832 std::bind(&AsyncSocket::setReadCB, socket, rcb));
836 * Test connect+write, then have the connect callback perform another write.
838 * This tests interaction of the optimistic writing after connect with
839 * additional write attempts that occur in the connect callback.
841 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
844 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
848 socket->connect(&ccb, server.getAddress(), 30);
850 // Hopefully the connect didn't succeed immediately.
851 // If it did, we can't exercise the optimistic write code path.
852 if (ccb.state == STATE_SUCCEEDED) {
853 LOG(INFO) << "connect() succeeded immediately; aborting test "
854 "of optimistic write behavior";
858 // Tell the connect callback to perform a write when the connect succeeds
860 scoped_array<char> buf2(new char[size2]);
861 memset(buf2.get(), 'b', size2);
863 ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
864 // Tell the second write callback to close the connection when it is done
865 wcb2.successCallback = [&] { socket->closeNow(); };
868 // Schedule one write() immediately, before the connect finishes
869 scoped_array<char> buf1(new char[size1]);
870 memset(buf1.get(), 'a', size1);
873 socket->write(&wcb1, buf1.get(), size1);
877 // immediately perform a close, before connect() completes
881 // Start reading from the other endpoint after 10ms.
882 // If we're using large buffers, we have to read so that the writes don't
884 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
886 rcb.dataAvailableCallback = std::bind(tmpDisableReads,
887 acceptedSocket.get(), &rcb);
888 socket->getEventBase()->tryRunAfterDelay(
889 std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
892 // Loop. We don't bother accepting on the server socket yet.
893 // The kernel should be able to buffer the write request so it can succeed.
896 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
898 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
901 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
906 // Make sure the read callback received all of the data
907 size_t bytesRead = 0;
908 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
909 it != rcb.buffers.end();
911 size_t start = bytesRead;
912 bytesRead += it->length;
913 size_t end = bytesRead;
915 size_t cmpLen = min(size1, end) - start;
916 CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
918 if (end > size1 && end <= size1 + size2) {
922 if (start >= size1) {
924 buf2Offset = start - size1;
925 cmpLen = end - start;
927 itOffset = size1 - start;
929 cmpLen = end - size1;
931 CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
936 CHECK_EQ(bytesRead, size1 + size2);
939 TEST(AsyncSocketTest, ConnectCallbackWrite) {
940 // Test using small writes that should both succeed immediately
941 testConnectOptWrite(100, 200);
943 // Test using a large buffer in the connect callback, that should block
944 const size_t largeSize = 8*1024*1024;
945 testConnectOptWrite(100, largeSize);
947 // Test using a large initial write
948 testConnectOptWrite(largeSize, 100);
950 // Test using two large buffers
951 testConnectOptWrite(largeSize, largeSize);
953 // Test a small write in the connect callback,
954 // but no immediate write before connect completes
955 testConnectOptWrite(0, 64);
957 // Test a large write in the connect callback,
958 // but no immediate write before connect completes
959 testConnectOptWrite(0, largeSize);
961 // Test connect, a small write, then immediately call close() before connect
963 testConnectOptWrite(211, 0, true);
965 // Test connect, a large immediate write (that will block), then immediately
966 // call close() before connect completes
967 testConnectOptWrite(largeSize, 0, true);
970 ///////////////////////////////////////////////////////////////////////////
971 // write() related tests
972 ///////////////////////////////////////////////////////////////////////////
975 * Test writing using a nullptr callback
977 TEST(AsyncSocketTest, WriteNullCallback) {
982 std::shared_ptr<AsyncSocket> socket =
983 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
984 evb.loop(); // loop until the socket is connected
986 // write() with a nullptr callback
988 memset(buf, 'a', sizeof(buf));
989 socket->write(nullptr, buf, sizeof(buf));
991 evb.loop(); // loop until the data is sent
993 // Make sure the server got a connection and received the data
995 server.verifyConnection(buf, sizeof(buf));
997 ASSERT_TRUE(socket->isClosedBySelf());
998 ASSERT_FALSE(socket->isClosedByPeer());
1002 * Test writing with a send timeout
1004 TEST(AsyncSocketTest, WriteTimeout) {
1009 std::shared_ptr<AsyncSocket> socket =
1010 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1011 evb.loop(); // loop until the socket is connected
1013 // write() a large chunk of data, with no-one on the other end reading
1014 size_t writeLength = 8*1024*1024;
1015 uint32_t timeout = 200;
1016 socket->setSendTimeout(timeout);
1017 scoped_array<char> buf(new char[writeLength]);
1018 memset(buf.get(), 'a', writeLength);
1020 socket->write(&wcb, buf.get(), writeLength);
1026 // Make sure the write attempt timed out as requested
1027 CHECK_EQ(wcb.state, STATE_FAILED);
1028 CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
1030 // Check that the write timed out within a reasonable period of time.
1031 // We don't check for exactly the specified timeout, since AsyncSocket only
1032 // times out when it hasn't made progress for that period of time.
1034 // On linux, the first write sends a few hundred kb of data, then blocks for
1035 // writability, and then unblocks again after 40ms and is able to write
1036 // another smaller of data before blocking permanently. Therefore it doesn't
1037 // time out until 40ms + timeout.
1039 // I haven't fully verified the cause of this, but I believe it probably
1040 // occurs because the receiving end delays sending an ack for up to 40ms.
1041 // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
1042 // the ack, it can send some more data. However, after that point the
1043 // receiver's kernel buffer is full. This 40ms delay happens even with
1044 // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
1045 // kernel may be automatically disabling TCP_QUICKACK after receiving some
1048 // For now, we simply check that the timeout occurred within 160ms of
1049 // the requested value.
1050 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1054 * Test writing to a socket that the remote endpoint has closed
1056 TEST(AsyncSocketTest, WritePipeError) {
1061 std::shared_ptr<AsyncSocket> socket =
1062 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1063 socket->setSendTimeout(1000);
1064 evb.loop(); // loop until the socket is connected
1066 // accept and immediately close the socket
1067 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1068 acceptedSocket.reset();
1070 // write() a large chunk of data
1071 size_t writeLength = 8*1024*1024;
1072 scoped_array<char> buf(new char[writeLength]);
1073 memset(buf.get(), 'a', writeLength);
1075 socket->write(&wcb, buf.get(), writeLength);
1079 // Make sure the write failed.
1080 // It would be nice if AsyncSocketException could convey the errno value,
1081 // so that we could check for EPIPE
1082 CHECK_EQ(wcb.state, STATE_FAILED);
1083 CHECK_EQ(wcb.exception.getType(),
1084 AsyncSocketException::INTERNAL_ERROR);
1086 ASSERT_FALSE(socket->isClosedBySelf());
1087 ASSERT_FALSE(socket->isClosedByPeer());
1091 * Test writing a mix of simple buffers and IOBufs
1093 TEST(AsyncSocketTest, WriteIOBuf) {
1098 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1100 socket->connect(&ccb, server.getAddress(), 30);
1102 // Accept the connection
1103 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1105 acceptedSocket->setReadCB(&rcb);
1107 // Write a simple buffer to the socket
1108 constexpr size_t simpleBufLength = 5;
1109 char simpleBuf[simpleBufLength];
1110 memset(simpleBuf, 'a', simpleBufLength);
1112 socket->write(&wcb, simpleBuf, simpleBufLength);
1114 // Write a single-element IOBuf chain
1115 size_t buf1Length = 7;
1116 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1117 memset(buf1->writableData(), 'b', buf1Length);
1118 buf1->append(buf1Length);
1119 unique_ptr<IOBuf> buf1Copy(buf1->clone());
1121 socket->writeChain(&wcb2, std::move(buf1));
1123 // Write a multiple-element IOBuf chain
1124 size_t buf2Length = 11;
1125 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1126 memset(buf2->writableData(), 'c', buf2Length);
1127 buf2->append(buf2Length);
1128 size_t buf3Length = 13;
1129 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1130 memset(buf3->writableData(), 'd', buf3Length);
1131 buf3->append(buf3Length);
1132 buf2->appendChain(std::move(buf3));
1133 unique_ptr<IOBuf> buf2Copy(buf2->clone());
1134 buf2Copy->coalesce();
1136 socket->writeChain(&wcb3, std::move(buf2));
1137 socket->shutdownWrite();
1139 // Let the reads and writes run to completion
1142 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1143 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1144 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1146 // Make sure the reader got the right data in the right order
1147 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1148 CHECK_EQ(rcb.buffers.size(), 1);
1149 CHECK_EQ(rcb.buffers[0].length,
1150 simpleBufLength + buf1Length + buf2Length + buf3Length);
1152 memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1154 memcmp(rcb.buffers[0].buffer + simpleBufLength,
1155 buf1Copy->data(), buf1Copy->length()), 0);
1157 memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1158 buf2Copy->data(), buf2Copy->length()), 0);
1160 acceptedSocket->close();
1163 ASSERT_TRUE(socket->isClosedBySelf());
1164 ASSERT_FALSE(socket->isClosedByPeer());
1167 TEST(AsyncSocketTest, WriteIOBufCorked) {
1172 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1174 socket->connect(&ccb, server.getAddress(), 30);
1176 // Accept the connection
1177 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1179 acceptedSocket->setReadCB(&rcb);
1181 // Do three writes, 100ms apart, with the "cork" flag set
1182 // on the second write. The reader should see the first write
1183 // arrive by itself, followed by the second and third writes
1184 // arriving together.
1185 size_t buf1Length = 5;
1186 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1187 memset(buf1->writableData(), 'a', buf1Length);
1188 buf1->append(buf1Length);
1189 size_t buf2Length = 7;
1190 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1191 memset(buf2->writableData(), 'b', buf2Length);
1192 buf2->append(buf2Length);
1193 size_t buf3Length = 11;
1194 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1195 memset(buf3->writableData(), 'c', buf3Length);
1196 buf3->append(buf3Length);
1198 socket->writeChain(&wcb1, std::move(buf1));
1200 DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1201 write2.scheduleTimeout(100);
1203 DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1204 write3.scheduleTimeout(140);
1207 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1208 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1209 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1210 if (wcb3.state != STATE_SUCCEEDED) {
1211 throw(wcb3.exception);
1213 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1215 // Make sure the reader got the data with the right grouping
1216 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1217 CHECK_EQ(rcb.buffers.size(), 2);
1218 CHECK_EQ(rcb.buffers[0].length, buf1Length);
1219 CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1221 acceptedSocket->close();
1224 ASSERT_TRUE(socket->isClosedBySelf());
1225 ASSERT_FALSE(socket->isClosedByPeer());
1229 * Test performing a zero-length write
1231 TEST(AsyncSocketTest, ZeroLengthWrite) {
1236 std::shared_ptr<AsyncSocket> socket =
1237 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1238 evb.loop(); // loop until the socket is connected
1240 auto acceptedSocket = server.acceptAsync(&evb);
1242 acceptedSocket->setReadCB(&rcb);
1244 size_t len1 = 1024*1024;
1245 size_t len2 = 1024*1024;
1246 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1247 memset(buf.get(), 'a', len1);
1248 memset(buf.get(), 'b', len2);
1254 socket->write(&wcb1, buf.get(), 0);
1255 socket->write(&wcb2, buf.get(), len1);
1256 socket->write(&wcb3, buf.get() + len1, 0);
1257 socket->write(&wcb4, buf.get() + len1, len2);
1260 evb.loop(); // loop until the data is sent
1262 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1263 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1264 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1265 CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1266 rcb.verifyData(buf.get(), len1 + len2);
1268 ASSERT_TRUE(socket->isClosedBySelf());
1269 ASSERT_FALSE(socket->isClosedByPeer());
1272 TEST(AsyncSocketTest, ZeroLengthWritev) {
1277 std::shared_ptr<AsyncSocket> socket =
1278 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1279 evb.loop(); // loop until the socket is connected
1281 auto acceptedSocket = server.acceptAsync(&evb);
1283 acceptedSocket->setReadCB(&rcb);
1285 size_t len1 = 1024*1024;
1286 size_t len2 = 1024*1024;
1287 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1288 memset(buf.get(), 'a', len1);
1289 memset(buf.get(), 'b', len2);
1292 constexpr size_t iovCount = 4;
1293 struct iovec iov[iovCount];
1294 iov[0].iov_base = buf.get();
1295 iov[0].iov_len = len1;
1296 iov[1].iov_base = buf.get() + len1;
1298 iov[2].iov_base = buf.get() + len1;
1299 iov[2].iov_len = len2;
1300 iov[3].iov_base = buf.get() + len1 + len2;
1303 socket->writev(&wcb, iov, iovCount);
1305 evb.loop(); // loop until the data is sent
1307 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1308 rcb.verifyData(buf.get(), len1 + len2);
1310 ASSERT_TRUE(socket->isClosedBySelf());
1311 ASSERT_FALSE(socket->isClosedByPeer());
1314 ///////////////////////////////////////////////////////////////////////////
1315 // close() related tests
1316 ///////////////////////////////////////////////////////////////////////////
1319 * Test calling close() with pending writes when the socket is already closing.
1321 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1326 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1328 socket->connect(&ccb, server.getAddress(), 30);
1330 // accept the socket on the server side
1331 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1333 // Loop to ensure the connect has completed
1336 // Make sure we are connected
1337 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1339 // Schedule pending writes, until several write attempts have blocked
1341 memset(buf, 'a', sizeof(buf));
1342 typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1343 WriteCallbackVector writeCallbacks;
1345 writeCallbacks.reserve(5);
1346 while (writeCallbacks.size() < 5) {
1347 std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1349 socket->write(wcb.get(), buf, sizeof(buf));
1350 if (wcb->state == STATE_SUCCEEDED) {
1351 // Succeeded immediately. Keep performing more writes
1355 // This write is blocked.
1356 // Have the write callback call close() when writeError() is invoked
1357 wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1358 writeCallbacks.push_back(wcb);
1361 // Call closeNow() to immediately fail the pending writes
1364 // Make sure writeError() was invoked on all of the pending write callbacks
1365 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1366 it != writeCallbacks.end();
1368 CHECK_EQ((*it)->state, STATE_FAILED);
1371 ASSERT_TRUE(socket->isClosedBySelf());
1372 ASSERT_FALSE(socket->isClosedByPeer());
1375 ///////////////////////////////////////////////////////////////////////////
1376 // ImmediateRead related tests
1377 ///////////////////////////////////////////////////////////////////////////
1379 /* AsyncSocket use to verify immediate read works */
1380 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1382 bool immediateReadCalled = false;
1383 explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1385 void checkForImmediateRead() noexcept override {
1386 immediateReadCalled = true;
1387 AsyncSocket::handleRead();
1391 TEST(AsyncSocket, ConnectReadImmediateRead) {
1394 const size_t maxBufferSz = 100;
1395 const size_t maxReadsPerEvent = 1;
1396 const size_t expectedDataSz = maxBufferSz * 3;
1397 char expectedData[expectedDataSz];
1398 memset(expectedData, 'j', expectedDataSz);
1401 ReadCallback rcb(maxBufferSz);
1402 AsyncSocketImmediateRead socket(&evb);
1403 socket.connect(nullptr, server.getAddress(), 30);
1405 evb.loop(); // loop until the socket is connected
1407 socket.setReadCB(&rcb);
1408 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1409 socket.immediateReadCalled = false;
1411 auto acceptedSocket = server.acceptAsync(&evb);
1413 ReadCallback rcbServer;
1414 WriteCallback wcbServer;
1415 rcbServer.dataAvailableCallback = [&]() {
1416 if (rcbServer.dataRead() == expectedDataSz) {
1417 // write back all data read
1418 rcbServer.verifyData(expectedData, expectedDataSz);
1419 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1420 acceptedSocket->close();
1423 acceptedSocket->setReadCB(&rcbServer);
1427 socket.write(&wcb1, expectedData, expectedDataSz);
1429 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1430 rcb.verifyData(expectedData, expectedDataSz);
1431 CHECK_EQ(socket.immediateReadCalled, true);
1433 ASSERT_FALSE(socket.isClosedBySelf());
1434 ASSERT_FALSE(socket.isClosedByPeer());
1437 TEST(AsyncSocket, ConnectReadUninstallRead) {
1440 const size_t maxBufferSz = 100;
1441 const size_t maxReadsPerEvent = 1;
1442 const size_t expectedDataSz = maxBufferSz * 3;
1443 char expectedData[expectedDataSz];
1444 memset(expectedData, 'k', expectedDataSz);
1447 ReadCallback rcb(maxBufferSz);
1448 AsyncSocketImmediateRead socket(&evb);
1449 socket.connect(nullptr, server.getAddress(), 30);
1451 evb.loop(); // loop until the socket is connected
1453 socket.setReadCB(&rcb);
1454 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1455 socket.immediateReadCalled = false;
1457 auto acceptedSocket = server.acceptAsync(&evb);
1459 ReadCallback rcbServer;
1460 WriteCallback wcbServer;
1461 rcbServer.dataAvailableCallback = [&]() {
1462 if (rcbServer.dataRead() == expectedDataSz) {
1463 // write back all data read
1464 rcbServer.verifyData(expectedData, expectedDataSz);
1465 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1466 acceptedSocket->close();
1469 acceptedSocket->setReadCB(&rcbServer);
1471 rcb.dataAvailableCallback = [&]() {
1472 // we read data and reset readCB
1473 socket.setReadCB(nullptr);
1478 socket.write(&wcb, expectedData, expectedDataSz);
1480 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1482 /* we shoud've only read maxBufferSz data since readCallback_
1483 * was reset in dataAvailableCallback */
1484 CHECK_EQ(rcb.dataRead(), maxBufferSz);
1485 CHECK_EQ(socket.immediateReadCalled, false);
1487 ASSERT_FALSE(socket.isClosedBySelf());
1488 ASSERT_FALSE(socket.isClosedByPeer());
1492 // - Test connect() and have the connect callback set the read callback
1493 // - Test connect() and have the connect callback unset the read callback
1494 // - Test reading/writing/closing/destroying the socket in the connect callback
1495 // - Test reading/writing/closing/destroying the socket in the read callback
1496 // - Test reading/writing/closing/destroying the socket in the write callback
1497 // - Test one-way shutdown behavior
1498 // - Test changing the EventBase
1500 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1501 // in connectSuccess(), readDataAvailable(), writeSuccess()
1504 ///////////////////////////////////////////////////////////////////////////
1505 // AsyncServerSocket tests
1506 ///////////////////////////////////////////////////////////////////////////
1509 * Helper ConnectionEventCallback class for the test code.
1510 * It maintains counters protected by a spin lock.
1512 class TestConnectionEventCallback :
1513 public AsyncServerSocket::ConnectionEventCallback {
1515 virtual void onConnectionAccepted(
1516 const int /* socket */,
1517 const SocketAddress& /* addr */) noexcept override {
1518 folly::RWSpinLock::WriteHolder holder(spinLock_);
1519 connectionAccepted_++;
1522 virtual void onConnectionAcceptError(const int /* err */) noexcept override {
1523 folly::RWSpinLock::WriteHolder holder(spinLock_);
1524 connectionAcceptedError_++;
1527 virtual void onConnectionDropped(
1528 const int /* socket */,
1529 const SocketAddress& /* addr */) noexcept override {
1530 folly::RWSpinLock::WriteHolder holder(spinLock_);
1531 connectionDropped_++;
1534 virtual void onConnectionEnqueuedForAcceptorCallback(
1535 const int /* socket */,
1536 const SocketAddress& /* addr */) noexcept override {
1537 folly::RWSpinLock::WriteHolder holder(spinLock_);
1538 connectionEnqueuedForAcceptCallback_++;
1541 virtual void onConnectionDequeuedByAcceptorCallback(
1542 const int /* socket */,
1543 const SocketAddress& /* addr */) noexcept override {
1544 folly::RWSpinLock::WriteHolder holder(spinLock_);
1545 connectionDequeuedByAcceptCallback_++;
1548 virtual void onBackoffStarted() noexcept override {
1549 folly::RWSpinLock::WriteHolder holder(spinLock_);
1553 virtual void onBackoffEnded() noexcept override {
1554 folly::RWSpinLock::WriteHolder holder(spinLock_);
1558 virtual void onBackoffError() noexcept override {
1559 folly::RWSpinLock::WriteHolder holder(spinLock_);
1563 unsigned int getConnectionAccepted() const {
1564 folly::RWSpinLock::ReadHolder holder(spinLock_);
1565 return connectionAccepted_;
1568 unsigned int getConnectionAcceptedError() const {
1569 folly::RWSpinLock::ReadHolder holder(spinLock_);
1570 return connectionAcceptedError_;
1573 unsigned int getConnectionDropped() const {
1574 folly::RWSpinLock::ReadHolder holder(spinLock_);
1575 return connectionDropped_;
1578 unsigned int getConnectionEnqueuedForAcceptCallback() const {
1579 folly::RWSpinLock::ReadHolder holder(spinLock_);
1580 return connectionEnqueuedForAcceptCallback_;
1583 unsigned int getConnectionDequeuedByAcceptCallback() const {
1584 folly::RWSpinLock::ReadHolder holder(spinLock_);
1585 return connectionDequeuedByAcceptCallback_;
1588 unsigned int getBackoffStarted() const {
1589 folly::RWSpinLock::ReadHolder holder(spinLock_);
1590 return backoffStarted_;
1593 unsigned int getBackoffEnded() const {
1594 folly::RWSpinLock::ReadHolder holder(spinLock_);
1595 return backoffEnded_;
1598 unsigned int getBackoffError() const {
1599 folly::RWSpinLock::ReadHolder holder(spinLock_);
1600 return backoffError_;
1604 mutable folly::RWSpinLock spinLock_;
1605 unsigned int connectionAccepted_{0};
1606 unsigned int connectionAcceptedError_{0};
1607 unsigned int connectionDropped_{0};
1608 unsigned int connectionEnqueuedForAcceptCallback_{0};
1609 unsigned int connectionDequeuedByAcceptCallback_{0};
1610 unsigned int backoffStarted_{0};
1611 unsigned int backoffEnded_{0};
1612 unsigned int backoffError_{0};
1616 * Helper AcceptCallback class for the test code
1617 * It records the callbacks that were invoked, and also supports calling
1618 * generic std::function objects in each callback.
1620 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1629 EventInfo(int fd, const folly::SocketAddress& addr)
1630 : type(TYPE_ACCEPT),
1634 explicit EventInfo(const std::string& msg)
1639 explicit EventInfo(EventType et)
1646 int fd; // valid for TYPE_ACCEPT
1647 folly::SocketAddress address; // valid for TYPE_ACCEPT
1648 string errorMsg; // valid for TYPE_ERROR
1650 typedef std::deque<EventInfo> EventList;
1652 TestAcceptCallback()
1653 : connectionAcceptedFn_(),
1658 std::deque<EventInfo>* getEvents() {
1662 void setConnectionAcceptedFn(
1663 const std::function<void(int, const folly::SocketAddress&)>& fn) {
1664 connectionAcceptedFn_ = fn;
1666 void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1667 acceptErrorFn_ = fn;
1669 void setAcceptStartedFn(const std::function<void()>& fn) {
1670 acceptStartedFn_ = fn;
1672 void setAcceptStoppedFn(const std::function<void()>& fn) {
1673 acceptStoppedFn_ = fn;
1676 void connectionAccepted(
1677 int fd, const folly::SocketAddress& clientAddr) noexcept override {
1678 events_.emplace_back(fd, clientAddr);
1680 if (connectionAcceptedFn_) {
1681 connectionAcceptedFn_(fd, clientAddr);
1684 void acceptError(const std::exception& ex) noexcept override {
1685 events_.emplace_back(ex.what());
1687 if (acceptErrorFn_) {
1691 void acceptStarted() noexcept override {
1692 events_.emplace_back(TYPE_START);
1694 if (acceptStartedFn_) {
1698 void acceptStopped() noexcept override {
1699 events_.emplace_back(TYPE_STOP);
1701 if (acceptStoppedFn_) {
1707 std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1708 std::function<void(const std::exception&)> acceptErrorFn_;
1709 std::function<void()> acceptStartedFn_;
1710 std::function<void()> acceptStoppedFn_;
1712 std::deque<EventInfo> events_;
1717 * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1719 TEST(AsyncSocketTest, ServerAcceptOptions) {
1720 EventBase eventBase;
1722 // Create a server socket
1723 std::shared_ptr<AsyncServerSocket> serverSocket(
1724 AsyncServerSocket::newSocket(&eventBase));
1725 serverSocket->bind(0);
1726 serverSocket->listen(16);
1727 folly::SocketAddress serverAddress;
1728 serverSocket->getAddress(&serverAddress);
1730 // Add a callback to accept one connection then stop the loop
1731 TestAcceptCallback acceptCallback;
1732 acceptCallback.setConnectionAcceptedFn(
1733 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1734 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1736 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1737 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1739 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1740 serverSocket->startAccepting();
1742 // Connect to the server socket
1743 std::shared_ptr<AsyncSocket> socket(
1744 AsyncSocket::newSocket(&eventBase, serverAddress));
1748 // Verify that the server accepted a connection
1749 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1750 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1751 TestAcceptCallback::TYPE_START);
1752 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1753 TestAcceptCallback::TYPE_ACCEPT);
1754 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1755 TestAcceptCallback::TYPE_STOP);
1756 int fd = acceptCallback.getEvents()->at(1).fd;
1758 // The accepted connection should already be in non-blocking mode
1759 int flags = fcntl(fd, F_GETFL, 0);
1760 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1763 // The accepted connection should already have TCP_NODELAY set
1765 socklen_t valueLength = sizeof(value);
1766 int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1773 * Test AsyncServerSocket::removeAcceptCallback()
1775 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1776 // Create a new AsyncServerSocket
1777 EventBase eventBase;
1778 std::shared_ptr<AsyncServerSocket> serverSocket(
1779 AsyncServerSocket::newSocket(&eventBase));
1780 serverSocket->bind(0);
1781 serverSocket->listen(16);
1782 folly::SocketAddress serverAddress;
1783 serverSocket->getAddress(&serverAddress);
1785 // Add several accept callbacks
1786 TestAcceptCallback cb1;
1787 TestAcceptCallback cb2;
1788 TestAcceptCallback cb3;
1789 TestAcceptCallback cb4;
1790 TestAcceptCallback cb5;
1791 TestAcceptCallback cb6;
1792 TestAcceptCallback cb7;
1794 // Test having callbacks remove other callbacks before them on the list,
1795 // after them on the list, or removing themselves.
1797 // Have callback 2 remove callback 3 and callback 5 the first time it is
1800 cb1.setConnectionAcceptedFn([&](int /* fd */,
1801 const folly::SocketAddress& /* addr */) {
1802 std::shared_ptr<AsyncSocket> sock2(
1803 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1805 cb3.setConnectionAcceptedFn(
1806 [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1807 cb4.setConnectionAcceptedFn(
1808 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1809 std::shared_ptr<AsyncSocket> sock3(
1810 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1812 cb5.setConnectionAcceptedFn(
1813 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1814 std::shared_ptr<AsyncSocket> sock5(
1815 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1818 cb2.setConnectionAcceptedFn(
1819 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1820 if (cb2Count == 0) {
1821 serverSocket->removeAcceptCallback(&cb3, nullptr);
1822 serverSocket->removeAcceptCallback(&cb5, nullptr);
1826 // Have callback 6 remove callback 4 the first time it is called,
1827 // and destroy the server socket the second time it is called
1829 cb6.setConnectionAcceptedFn(
1830 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1831 if (cb6Count == 0) {
1832 serverSocket->removeAcceptCallback(&cb4, nullptr);
1833 std::shared_ptr<AsyncSocket> sock6(
1834 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1835 std::shared_ptr<AsyncSocket> sock7(
1836 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1837 std::shared_ptr<AsyncSocket> sock8(
1838 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1841 serverSocket.reset();
1845 // Have callback 7 remove itself
1846 cb7.setConnectionAcceptedFn(
1847 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1848 serverSocket->removeAcceptCallback(&cb7, nullptr);
1851 serverSocket->addAcceptCallback(&cb1, nullptr);
1852 serverSocket->addAcceptCallback(&cb2, nullptr);
1853 serverSocket->addAcceptCallback(&cb3, nullptr);
1854 serverSocket->addAcceptCallback(&cb4, nullptr);
1855 serverSocket->addAcceptCallback(&cb5, nullptr);
1856 serverSocket->addAcceptCallback(&cb6, nullptr);
1857 serverSocket->addAcceptCallback(&cb7, nullptr);
1858 serverSocket->startAccepting();
1860 // Make several connections to the socket
1861 std::shared_ptr<AsyncSocket> sock1(
1862 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1863 std::shared_ptr<AsyncSocket> sock4(
1864 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1866 // Loop until we are stopped
1869 // Check to make sure that the expected callbacks were invoked.
1871 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1872 // the AcceptCallbacks in round-robin fashion, in the order that they were
1873 // added. The code is implemented this way right now, but the API doesn't
1874 // explicitly require it be done this way. If we change the code not to be
1875 // exactly round robin in the future, we can simplify the test checks here.
1876 // (We'll also need to update the termination code, since we expect cb6 to
1877 // get called twice to terminate the loop.)
1878 CHECK_EQ(cb1.getEvents()->size(), 4);
1879 CHECK_EQ(cb1.getEvents()->at(0).type,
1880 TestAcceptCallback::TYPE_START);
1881 CHECK_EQ(cb1.getEvents()->at(1).type,
1882 TestAcceptCallback::TYPE_ACCEPT);
1883 CHECK_EQ(cb1.getEvents()->at(2).type,
1884 TestAcceptCallback::TYPE_ACCEPT);
1885 CHECK_EQ(cb1.getEvents()->at(3).type,
1886 TestAcceptCallback::TYPE_STOP);
1888 CHECK_EQ(cb2.getEvents()->size(), 4);
1889 CHECK_EQ(cb2.getEvents()->at(0).type,
1890 TestAcceptCallback::TYPE_START);
1891 CHECK_EQ(cb2.getEvents()->at(1).type,
1892 TestAcceptCallback::TYPE_ACCEPT);
1893 CHECK_EQ(cb2.getEvents()->at(2).type,
1894 TestAcceptCallback::TYPE_ACCEPT);
1895 CHECK_EQ(cb2.getEvents()->at(3).type,
1896 TestAcceptCallback::TYPE_STOP);
1898 CHECK_EQ(cb3.getEvents()->size(), 2);
1899 CHECK_EQ(cb3.getEvents()->at(0).type,
1900 TestAcceptCallback::TYPE_START);
1901 CHECK_EQ(cb3.getEvents()->at(1).type,
1902 TestAcceptCallback::TYPE_STOP);
1904 CHECK_EQ(cb4.getEvents()->size(), 3);
1905 CHECK_EQ(cb4.getEvents()->at(0).type,
1906 TestAcceptCallback::TYPE_START);
1907 CHECK_EQ(cb4.getEvents()->at(1).type,
1908 TestAcceptCallback::TYPE_ACCEPT);
1909 CHECK_EQ(cb4.getEvents()->at(2).type,
1910 TestAcceptCallback::TYPE_STOP);
1912 CHECK_EQ(cb5.getEvents()->size(), 2);
1913 CHECK_EQ(cb5.getEvents()->at(0).type,
1914 TestAcceptCallback::TYPE_START);
1915 CHECK_EQ(cb5.getEvents()->at(1).type,
1916 TestAcceptCallback::TYPE_STOP);
1918 CHECK_EQ(cb6.getEvents()->size(), 4);
1919 CHECK_EQ(cb6.getEvents()->at(0).type,
1920 TestAcceptCallback::TYPE_START);
1921 CHECK_EQ(cb6.getEvents()->at(1).type,
1922 TestAcceptCallback::TYPE_ACCEPT);
1923 CHECK_EQ(cb6.getEvents()->at(2).type,
1924 TestAcceptCallback::TYPE_ACCEPT);
1925 CHECK_EQ(cb6.getEvents()->at(3).type,
1926 TestAcceptCallback::TYPE_STOP);
1928 CHECK_EQ(cb7.getEvents()->size(), 3);
1929 CHECK_EQ(cb7.getEvents()->at(0).type,
1930 TestAcceptCallback::TYPE_START);
1931 CHECK_EQ(cb7.getEvents()->at(1).type,
1932 TestAcceptCallback::TYPE_ACCEPT);
1933 CHECK_EQ(cb7.getEvents()->at(2).type,
1934 TestAcceptCallback::TYPE_STOP);
1938 * Test AsyncServerSocket::removeAcceptCallback()
1940 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1941 // Create a new AsyncServerSocket
1942 EventBase eventBase;
1943 std::shared_ptr<AsyncServerSocket> serverSocket(
1944 AsyncServerSocket::newSocket(&eventBase));
1945 serverSocket->bind(0);
1946 serverSocket->listen(16);
1947 folly::SocketAddress serverAddress;
1948 serverSocket->getAddress(&serverAddress);
1950 // Add several accept callbacks
1951 TestAcceptCallback cb1;
1952 auto thread_id = std::this_thread::get_id();
1953 cb1.setAcceptStartedFn([&](){
1954 CHECK_NE(thread_id, std::this_thread::get_id());
1955 thread_id = std::this_thread::get_id();
1957 cb1.setConnectionAcceptedFn(
1958 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1959 CHECK_EQ(thread_id, std::this_thread::get_id());
1960 serverSocket->removeAcceptCallback(&cb1, nullptr);
1962 cb1.setAcceptStoppedFn([&](){
1963 CHECK_EQ(thread_id, std::this_thread::get_id());
1966 // Test having callbacks remove other callbacks before them on the list,
1967 serverSocket->addAcceptCallback(&cb1, nullptr);
1968 serverSocket->startAccepting();
1970 // Make several connections to the socket
1971 std::shared_ptr<AsyncSocket> sock1(
1972 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1974 // Loop in another thread
1975 auto other = std::thread([&](){
1980 // Check to make sure that the expected callbacks were invoked.
1982 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1983 // the AcceptCallbacks in round-robin fashion, in the order that they were
1984 // added. The code is implemented this way right now, but the API doesn't
1985 // explicitly require it be done this way. If we change the code not to be
1986 // exactly round robin in the future, we can simplify the test checks here.
1987 // (We'll also need to update the termination code, since we expect cb6 to
1988 // get called twice to terminate the loop.)
1989 CHECK_EQ(cb1.getEvents()->size(), 3);
1990 CHECK_EQ(cb1.getEvents()->at(0).type,
1991 TestAcceptCallback::TYPE_START);
1992 CHECK_EQ(cb1.getEvents()->at(1).type,
1993 TestAcceptCallback::TYPE_ACCEPT);
1994 CHECK_EQ(cb1.getEvents()->at(2).type,
1995 TestAcceptCallback::TYPE_STOP);
1999 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
2000 // Add a callback to accept one connection then stop accepting
2001 TestAcceptCallback acceptCallback;
2002 acceptCallback.setConnectionAcceptedFn(
2003 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2004 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2006 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2007 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2009 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2010 serverSocket->startAccepting();
2012 // Connect to the server socket
2013 EventBase* eventBase = serverSocket->getEventBase();
2014 folly::SocketAddress serverAddress;
2015 serverSocket->getAddress(&serverAddress);
2016 AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
2018 // Loop to process all events
2021 // Verify that the server accepted a connection
2022 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2023 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2024 TestAcceptCallback::TYPE_START);
2025 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2026 TestAcceptCallback::TYPE_ACCEPT);
2027 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2028 TestAcceptCallback::TYPE_STOP);
2031 /* Verify that we don't leak sockets if we are destroyed()
2032 * and there are still writes pending
2034 * If destroy() only calls close() instead of closeNow(),
2035 * it would shutdown(writes) on the socket, but it would
2036 * never be close()'d, and the socket would leak
2038 TEST(AsyncSocketTest, DestroyCloseTest) {
2044 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
2046 socket->connect(&ccb, server.getAddress(), 30);
2048 // Accept the connection
2049 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
2051 acceptedSocket->setReadCB(&rcb);
2053 // Write a large buffer to the socket that is larger than kernel buffer
2054 size_t simpleBufLength = 5000000;
2055 char* simpleBuf = new char[simpleBufLength];
2056 memset(simpleBuf, 'a', simpleBufLength);
2059 // Let the reads and writes run to completion
2060 int fd = acceptedSocket->getFd();
2062 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2064 acceptedSocket.reset();
2066 // Test that server socket was closed
2067 ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2074 * Test AsyncServerSocket::useExistingSocket()
2076 TEST(AsyncSocketTest, ServerExistingSocket) {
2077 EventBase eventBase;
2079 // Test creating a socket, and letting AsyncServerSocket bind and listen
2081 // Manually create a socket
2082 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2085 // Create a server socket
2086 AsyncServerSocket::UniquePtr serverSocket(
2087 new AsyncServerSocket(&eventBase));
2088 serverSocket->useExistingSocket(fd);
2089 folly::SocketAddress address;
2090 serverSocket->getAddress(&address);
2092 serverSocket->bind(address);
2093 serverSocket->listen(16);
2095 // Make sure the socket works
2096 serverSocketSanityTest(serverSocket.get());
2099 // Test creating a socket and binding manually,
2100 // then letting AsyncServerSocket listen
2102 // Manually create a socket
2103 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2106 struct sockaddr_in addr;
2107 addr.sin_family = AF_INET;
2109 addr.sin_addr.s_addr = INADDR_ANY;
2110 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2112 // Look up the address that we bound to
2113 folly::SocketAddress boundAddress;
2114 boundAddress.setFromLocalAddress(fd);
2116 // Create a server socket
2117 AsyncServerSocket::UniquePtr serverSocket(
2118 new AsyncServerSocket(&eventBase));
2119 serverSocket->useExistingSocket(fd);
2120 serverSocket->listen(16);
2122 // Make sure AsyncServerSocket reports the same address that we bound to
2123 folly::SocketAddress serverSocketAddress;
2124 serverSocket->getAddress(&serverSocketAddress);
2125 CHECK_EQ(boundAddress, serverSocketAddress);
2127 // Make sure the socket works
2128 serverSocketSanityTest(serverSocket.get());
2131 // Test creating a socket, binding and listening manually,
2132 // then giving it to AsyncServerSocket
2134 // Manually create a socket
2135 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2138 struct sockaddr_in addr;
2139 addr.sin_family = AF_INET;
2141 addr.sin_addr.s_addr = INADDR_ANY;
2142 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2144 // Look up the address that we bound to
2145 folly::SocketAddress boundAddress;
2146 boundAddress.setFromLocalAddress(fd);
2148 CHECK_EQ(listen(fd, 16), 0);
2150 // Create a server socket
2151 AsyncServerSocket::UniquePtr serverSocket(
2152 new AsyncServerSocket(&eventBase));
2153 serverSocket->useExistingSocket(fd);
2155 // Make sure AsyncServerSocket reports the same address that we bound to
2156 folly::SocketAddress serverSocketAddress;
2157 serverSocket->getAddress(&serverSocketAddress);
2158 CHECK_EQ(boundAddress, serverSocketAddress);
2160 // Make sure the socket works
2161 serverSocketSanityTest(serverSocket.get());
2165 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2166 EventBase eventBase;
2168 // Create a server socket
2169 std::shared_ptr<AsyncServerSocket> serverSocket(
2170 AsyncServerSocket::newSocket(&eventBase));
2172 path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2173 folly::SocketAddress serverAddress;
2174 serverAddress.setFromPath(path);
2175 serverSocket->bind(serverAddress);
2176 serverSocket->listen(16);
2178 // Add a callback to accept one connection then stop the loop
2179 TestAcceptCallback acceptCallback;
2180 acceptCallback.setConnectionAcceptedFn(
2181 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2182 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2184 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2185 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2187 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2188 serverSocket->startAccepting();
2190 // Connect to the server socket
2191 std::shared_ptr<AsyncSocket> socket(
2192 AsyncSocket::newSocket(&eventBase, serverAddress));
2196 // Verify that the server accepted a connection
2197 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2198 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2199 TestAcceptCallback::TYPE_START);
2200 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2201 TestAcceptCallback::TYPE_ACCEPT);
2202 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2203 TestAcceptCallback::TYPE_STOP);
2204 int fd = acceptCallback.getEvents()->at(1).fd;
2206 // The accepted connection should already be in non-blocking mode
2207 int flags = fcntl(fd, F_GETFL, 0);
2208 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2211 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2212 EventBase eventBase;
2213 TestConnectionEventCallback connectionEventCallback;
2215 // Create a server socket
2216 std::shared_ptr<AsyncServerSocket> serverSocket(
2217 AsyncServerSocket::newSocket(&eventBase));
2218 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2219 serverSocket->bind(0);
2220 serverSocket->listen(16);
2221 folly::SocketAddress serverAddress;
2222 serverSocket->getAddress(&serverAddress);
2224 // Add a callback to accept one connection then stop the loop
2225 TestAcceptCallback acceptCallback;
2226 acceptCallback.setConnectionAcceptedFn(
2227 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2228 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2230 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2231 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2233 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2234 serverSocket->startAccepting();
2236 // Connect to the server socket
2237 std::shared_ptr<AsyncSocket> socket(
2238 AsyncSocket::newSocket(&eventBase, serverAddress));
2242 // Validate the connection event counters
2243 ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2244 ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2245 ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2247 connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2248 ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2249 ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2250 ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2251 ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2255 * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2257 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2258 EventBase eventBase;
2260 // Counter of how many connections have been accepted
2263 // Create a server socket
2264 auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2265 serverSocket->bind(0);
2266 serverSocket->listen(16);
2267 folly::SocketAddress serverAddress;
2268 serverSocket->getAddress(&serverAddress);
2270 // Add a callback to accept connections
2271 TestAcceptCallback acceptCallback;
2272 acceptCallback.setConnectionAcceptedFn(
2273 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2275 CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2278 // all messages are processed, remove accept callback
2279 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2282 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2283 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2285 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2286 serverSocket->startAccepting();
2288 // Connect to the server socket, 4 clients, there are 4 connections
2289 auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2290 auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2291 auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2292 auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2298 * Test AsyncTransport::BufferCallback
2300 TEST(AsyncSocketTest, BufferTest) {
2304 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2305 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2307 socket->connect(&ccb, server.getAddress(), 30, option);
2309 char buf[100 * 1024];
2310 memset(buf, 'c', sizeof(buf));
2313 socket->setBufferCallback(&bcb);
2314 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2317 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2318 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
2320 ASSERT_TRUE(bcb.hasBuffered());
2321 ASSERT_TRUE(bcb.hasBufferCleared());
2324 server.verifyConnection(buf, sizeof(buf));
2326 ASSERT_TRUE(socket->isClosedBySelf());
2327 ASSERT_FALSE(socket->isClosedByPeer());
2330 TEST(AsyncSocketTest, BufferCallbackKill) {
2333 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2334 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2336 socket->connect(&ccb, server.getAddress(), 30, option);
2339 char buf[100 * 1024];
2340 memset(buf, 'c', sizeof(buf));
2342 socket->setBufferCallback(&bcb);
2344 wcb.successCallback = [&] {
2345 ASSERT_TRUE(socket.unique());
2349 // This will trigger AsyncSocket::handleWrite,
2350 // which calls WriteCallback::writeSuccess,
2351 // which calls wcb.successCallback above,
2352 // which tries to delete socket
2353 // Then, the socket will also try to use this BufferCallback
2354 // And that should crash us, if there is no DestructorGuard on the stack
2355 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2358 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2362 TEST(AsyncSocketTest, ConnectTFO) {
2363 // Start listening on a local port
2364 TestServer server(true);
2366 // Connect using a AsyncSocket
2368 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2369 socket->enableTFO();
2371 socket->connect(&cb, server.getAddress(), 30);
2373 std::array<uint8_t, 128> buf;
2374 memset(buf.data(), 'a', buf.size());
2376 std::array<uint8_t, 3> readBuf;
2377 auto sendBuf = IOBuf::copyBuffer("hey");
2380 auto acceptedSocket = server.accept();
2381 acceptedSocket->write(buf.data(), buf.size());
2382 acceptedSocket->flush();
2383 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2384 acceptedSocket->close();
2389 CHECK_EQ(cb.state, STATE_SUCCEEDED);
2390 EXPECT_LE(0, socket->getConnectTime().count());
2391 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2392 EXPECT_TRUE(socket->getTFOAttempted());
2394 // Should trigger the connect
2395 WriteCallback write;
2397 socket->writeChain(&write, sendBuf->clone());
2398 socket->setReadCB(&rcb);
2403 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2404 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2405 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2406 ASSERT_EQ(1, rcb.buffers.size());
2407 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2408 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2409 EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
2413 * Test connecting to a server that isn't listening
2415 TEST(AsyncSocketTest, ConnectRefusedTFO) {
2418 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2420 socket->enableTFO();
2422 // Hopefully nothing is actually listening on this address
2423 folly::SocketAddress addr("::1", 65535);
2425 socket->connect(&cb, addr, 30);
2429 WriteCallback write1;
2430 // Trigger the connect if TFO attempt is supported.
2431 socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2433 WriteCallback write2;
2434 socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2437 if (!socket->getTFOFinished()) {
2438 EXPECT_EQ(STATE_FAILED, write1.state);
2439 EXPECT_FALSE(socket->getTFOFinished());
2441 EXPECT_EQ(STATE_SUCCEEDED, write1.state);
2442 EXPECT_TRUE(socket->getTFOFinished());
2445 EXPECT_EQ(STATE_FAILED, write2.state);
2447 EXPECT_EQ(STATE_SUCCEEDED, cb.state);
2448 EXPECT_LE(0, socket->getConnectTime().count());
2449 EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2450 EXPECT_TRUE(socket->getTFOAttempted());
2454 * Test calling closeNow() immediately after connecting.
2456 TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
2457 TestServer server(true);
2461 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2462 socket->enableTFO();
2465 socket->connect(&ccb, server.getAddress(), 30);
2468 std::array<char, 128> buf;
2469 memset(buf.data(), 'a', buf.size());
2474 // Loop, although there shouldn't be anything to do.
2477 EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
2478 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2480 ASSERT_TRUE(socket->isClosedBySelf());
2481 ASSERT_FALSE(socket->isClosedByPeer());
2485 * Test calling close() immediately after connect()
2487 TEST(AsyncSocketTest, ConnectAndCloseTFO) {
2488 TestServer server(true);
2490 // Connect using a AsyncSocket
2492 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2493 socket->enableTFO();
2496 socket->connect(&ccb, server.getAddress(), 30);
2500 // Loop, although there shouldn't be anything to do.
2503 // Make sure the connection was aborted
2504 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2505 EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
2507 ASSERT_TRUE(socket->isClosedBySelf());
2508 ASSERT_FALSE(socket->isClosedByPeer());
2511 class MockAsyncTFOSocket : public AsyncSocket {
2513 using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
2515 explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
2517 MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
2520 TEST(AsyncSocketTest, TestTFOUnsupported) {
2521 TestServer server(true);
2523 // Connect using a AsyncSocket
2525 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2526 socket->enableTFO();
2529 socket->connect(&ccb, server.getAddress(), 30);
2530 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2533 socket->setReadCB(&rcb);
2535 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2536 .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2537 WriteCallback write;
2538 auto sendBuf = IOBuf::copyBuffer("hey");
2539 socket->writeChain(&write, sendBuf->clone());
2540 EXPECT_EQ(STATE_WAITING, write.state);
2542 std::array<uint8_t, 128> buf;
2543 memset(buf.data(), 'a', buf.size());
2545 std::array<uint8_t, 3> readBuf;
2548 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2549 acceptedSocket->write(buf.data(), buf.size());
2550 acceptedSocket->flush();
2551 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2552 acceptedSocket->close();
2558 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2559 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2561 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2562 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2563 ASSERT_EQ(1, rcb.buffers.size());
2564 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2565 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2568 TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
2569 // Try connecting to server that won't respond.
2571 // This depends somewhat on the network where this test is run.
2572 // Hopefully this IP will be routable but unresponsive.
2573 // (Alternatively, we could try listening on a local raw socket, but that
2574 // normally requires root privileges.)
2575 auto host = SocketAddressTestHelper::isIPv6Enabled()
2576 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2577 : SocketAddressTestHelper::isIPv4Enabled()
2578 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2580 SocketAddress addr(host, 65535);
2582 // Connect using a AsyncSocket
2584 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2585 socket->enableTFO();
2588 // Set a very small timeout
2589 socket->connect(&ccb, addr, 1);
2590 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2593 socket->setReadCB(&rcb);
2595 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2596 .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2597 WriteCallback write;
2598 socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2602 EXPECT_EQ(STATE_FAILED, write.state);
2605 TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
2606 TestServer server(true);
2608 // Connect using a AsyncSocket
2610 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2611 socket->enableTFO();
2614 socket->connect(&ccb, server.getAddress(), 30);
2615 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2618 socket->setReadCB(&rcb);
2620 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2621 .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2622 sockaddr_storage addr;
2623 auto len = server.getAddress().getAddress(&addr);
2624 return connect(fd, (const struct sockaddr*)&addr, len);
2626 WriteCallback write;
2627 auto sendBuf = IOBuf::copyBuffer("hey");
2628 socket->writeChain(&write, sendBuf->clone());
2629 EXPECT_EQ(STATE_WAITING, write.state);
2631 std::array<uint8_t, 128> buf;
2632 memset(buf.data(), 'a', buf.size());
2634 std::array<uint8_t, 3> readBuf;
2637 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2638 acceptedSocket->write(buf.data(), buf.size());
2639 acceptedSocket->flush();
2640 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2641 acceptedSocket->close();
2647 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2649 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2650 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2652 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2653 ASSERT_EQ(1, rcb.buffers.size());
2654 ASSERT_EQ(buf.size(), rcb.buffers[0].length);
2655 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2658 TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
2659 // Try connecting to server that won't respond.
2661 // This depends somewhat on the network where this test is run.
2662 // Hopefully this IP will be routable but unresponsive.
2663 // (Alternatively, we could try listening on a local raw socket, but that
2664 // normally requires root privileges.)
2665 auto host = SocketAddressTestHelper::isIPv6Enabled()
2666 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2667 : SocketAddressTestHelper::isIPv4Enabled()
2668 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2670 SocketAddress addr(host, 65535);
2672 // Connect using a AsyncSocket
2674 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2675 socket->enableTFO();
2678 // Set a very small timeout
2679 socket->connect(&ccb, addr, 1);
2680 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2683 socket->setReadCB(&rcb);
2685 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2686 .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2687 sockaddr_storage addr2;
2688 auto len = addr.getAddress(&addr2);
2689 return connect(fd, (const struct sockaddr*)&addr2, len);
2691 WriteCallback write;
2692 socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2696 EXPECT_EQ(STATE_FAILED, write.state);
2699 TEST(AsyncSocketTest, TestTFOEagain) {
2700 TestServer server(true);
2702 // Connect using a AsyncSocket
2704 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2705 socket->enableTFO();
2708 socket->connect(&ccb, server.getAddress(), 30);
2710 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2711 .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
2712 WriteCallback write;
2713 socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2717 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2718 EXPECT_EQ(STATE_FAILED, write.state);
2721 // Sending a large amount of data in the first write which will
2722 // definitely not fit into MSS.
2723 TEST(AsyncSocketTest, ConnectTFOWithBigData) {
2724 // Start listening on a local port
2725 TestServer server(true);
2727 // Connect using a AsyncSocket
2729 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2730 socket->enableTFO();
2732 socket->connect(&cb, server.getAddress(), 30);
2734 std::array<uint8_t, 128> buf;
2735 memset(buf.data(), 'a', buf.size());
2737 constexpr size_t len = 10 * 1024;
2738 auto sendBuf = IOBuf::create(len);
2739 sendBuf->append(len);
2740 std::array<uint8_t, len> readBuf;
2743 auto acceptedSocket = server.accept();
2744 acceptedSocket->write(buf.data(), buf.size());
2745 acceptedSocket->flush();
2746 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2747 acceptedSocket->close();
2752 CHECK_EQ(cb.state, STATE_SUCCEEDED);
2753 EXPECT_LE(0, socket->getConnectTime().count());
2754 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2755 EXPECT_TRUE(socket->getTFOAttempted());
2757 // Should trigger the connect
2758 WriteCallback write;
2760 socket->writeChain(&write, sendBuf->clone());
2761 socket->setReadCB(&rcb);
2766 EXPECT_EQ(socket->getTFOSucceeded(), socket->getTFOFinished());
2767 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2768 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2769 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2770 ASSERT_EQ(1, rcb.buffers.size());
2771 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2772 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));