2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/ExceptionWrapper.h>
17 #include <folly/RWSpinLock.h>
18 #include <folly/Random.h>
19 #include <folly/SocketAddress.h>
20 #include <folly/io/async/AsyncServerSocket.h>
21 #include <folly/io/async/AsyncSocket.h>
22 #include <folly/io/async/AsyncTimeout.h>
23 #include <folly/io/async/EventBase.h>
25 #include <folly/experimental/TestUtil.h>
26 #include <folly/io/IOBuf.h>
27 #include <folly/io/async/test/AsyncSocketTest.h>
28 #include <folly/io/async/test/Util.h>
29 #include <folly/portability/GMock.h>
30 #include <folly/portability/GTest.h>
31 #include <folly/portability/Sockets.h>
32 #include <folly/portability/Unistd.h>
33 #include <folly/test/SocketAddressTestHelper.h>
35 #include <boost/scoped_array.hpp>
37 #include <sys/types.h>
41 using namespace boost;
48 using std::unique_ptr;
49 using std::chrono::milliseconds;
50 using boost::scoped_array;
52 using namespace folly;
53 using namespace testing;
55 namespace fsp = folly::portability::sockets;
57 class DelayedWrite: public AsyncTimeout {
59 DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
60 unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
61 bool cork, bool lastWrite = false):
62 AsyncTimeout(socket->getEventBase()),
64 bufs_(std::move(bufs)),
67 lastWrite_(lastWrite) {}
70 void timeoutExpired() noexcept override {
71 WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
72 socket_->writeChain(wcb_, std::move(bufs_), flags);
74 socket_->shutdownWrite();
78 std::shared_ptr<AsyncSocket> socket_;
79 unique_ptr<IOBuf> bufs_;
80 AsyncTransportWrapper::WriteCallback* wcb_;
85 ///////////////////////////////////////////////////////////////////////////
87 ///////////////////////////////////////////////////////////////////////////
90 * Test connecting to a server
92 TEST(AsyncSocketTest, Connect) {
93 // Start listening on a local port
96 // Connect using a AsyncSocket
98 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
100 socket->connect(&cb, server.getAddress(), 30);
104 ASSERT_EQ(cb.state, STATE_SUCCEEDED);
105 EXPECT_LE(0, socket->getConnectTime().count());
106 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
109 enum class TFOState {
114 class AsyncSocketConnectTest : public ::testing::TestWithParam<TFOState> {};
116 std::vector<TFOState> getTestingValues() {
117 std::vector<TFOState> vals;
118 vals.emplace_back(TFOState::DISABLED);
121 vals.emplace_back(TFOState::ENABLED);
126 INSTANTIATE_TEST_CASE_P(
128 AsyncSocketConnectTest,
129 ::testing::ValuesIn(getTestingValues()));
132 * Test connecting to a server that isn't listening
134 TEST(AsyncSocketTest, ConnectRefused) {
137 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
139 // Hopefully nothing is actually listening on this address
140 folly::SocketAddress addr("127.0.0.1", 65535);
142 socket->connect(&cb, addr, 30);
146 EXPECT_EQ(STATE_FAILED, cb.state);
147 EXPECT_EQ(AsyncSocketException::NOT_OPEN, cb.exception.getType());
148 EXPECT_LE(0, socket->getConnectTime().count());
149 EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
153 * Test connection timeout
155 TEST(AsyncSocketTest, ConnectTimeout) {
158 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
160 // Try connecting to server that won't respond.
162 // This depends somewhat on the network where this test is run.
163 // Hopefully this IP will be routable but unresponsive.
164 // (Alternatively, we could try listening on a local raw socket, but that
165 // normally requires root privileges.)
167 SocketAddressTestHelper::isIPv6Enabled() ?
168 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
169 SocketAddressTestHelper::isIPv4Enabled() ?
170 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
172 SocketAddress addr(host, 65535);
174 socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
178 ASSERT_EQ(cb.state, STATE_FAILED);
179 ASSERT_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
181 // Verify that we can still get the peer address after a timeout.
182 // Use case is if the client was created from a client pool, and we want
183 // to log which peer failed.
184 folly::SocketAddress peer;
185 socket->getPeerAddress(&peer);
186 ASSERT_EQ(peer, addr);
187 EXPECT_LE(0, socket->getConnectTime().count());
188 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
192 * Test writing immediately after connecting, without waiting for connect
195 TEST_P(AsyncSocketConnectTest, ConnectAndWrite) {
200 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
202 if (GetParam() == TFOState::ENABLED) {
207 socket->connect(&ccb, server.getAddress(), 30);
211 memset(buf, 'a', sizeof(buf));
213 socket->write(&wcb, buf, sizeof(buf));
215 // Loop. We don't bother accepting on the server socket yet.
216 // The kernel should be able to buffer the write request so it can succeed.
219 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
220 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
222 // Make sure the server got a connection and received the data
224 server.verifyConnection(buf, sizeof(buf));
226 ASSERT_TRUE(socket->isClosedBySelf());
227 ASSERT_FALSE(socket->isClosedByPeer());
228 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
232 * Test connecting using a nullptr connect callback.
234 TEST_P(AsyncSocketConnectTest, ConnectNullCallback) {
239 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
240 if (GetParam() == TFOState::ENABLED) {
244 socket->connect(nullptr, server.getAddress(), 30);
246 // write some data, just so we have some way of verifing
247 // that the socket works correctly after connecting
249 memset(buf, 'a', sizeof(buf));
251 socket->write(&wcb, buf, sizeof(buf));
255 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
257 // Make sure the server got a connection and received the data
259 server.verifyConnection(buf, sizeof(buf));
261 ASSERT_TRUE(socket->isClosedBySelf());
262 ASSERT_FALSE(socket->isClosedByPeer());
266 * Test calling both write() and close() immediately after connecting, without
267 * waiting for connect to finish.
269 * This exercises the STATE_CONNECTING_CLOSING code.
271 TEST_P(AsyncSocketConnectTest, ConnectWriteAndClose) {
276 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
277 if (GetParam() == TFOState::ENABLED) {
281 socket->connect(&ccb, server.getAddress(), 30);
285 memset(buf, 'a', sizeof(buf));
287 socket->write(&wcb, buf, sizeof(buf));
292 // Loop. We don't bother accepting on the server socket yet.
293 // The kernel should be able to buffer the write request so it can succeed.
296 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
297 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
299 // Make sure the server got a connection and received the data
300 server.verifyConnection(buf, sizeof(buf));
302 ASSERT_TRUE(socket->isClosedBySelf());
303 ASSERT_FALSE(socket->isClosedByPeer());
307 * Test calling close() immediately after connect()
309 TEST(AsyncSocketTest, ConnectAndClose) {
312 // Connect using a AsyncSocket
314 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
316 socket->connect(&ccb, server.getAddress(), 30);
318 // Hopefully the connect didn't succeed immediately.
319 // If it did, we can't exercise the close-while-connecting code path.
320 if (ccb.state == STATE_SUCCEEDED) {
321 LOG(INFO) << "connect() succeeded immediately; aborting test "
322 "of close-during-connect behavior";
328 // Loop, although there shouldn't be anything to do.
331 // Make sure the connection was aborted
332 ASSERT_EQ(ccb.state, STATE_FAILED);
334 ASSERT_TRUE(socket->isClosedBySelf());
335 ASSERT_FALSE(socket->isClosedByPeer());
339 * Test calling closeNow() immediately after connect()
341 * This should be identical to the normal close behavior.
343 TEST(AsyncSocketTest, ConnectAndCloseNow) {
346 // Connect using a AsyncSocket
348 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
350 socket->connect(&ccb, server.getAddress(), 30);
352 // Hopefully the connect didn't succeed immediately.
353 // If it did, we can't exercise the close-while-connecting code path.
354 if (ccb.state == STATE_SUCCEEDED) {
355 LOG(INFO) << "connect() succeeded immediately; aborting test "
356 "of closeNow()-during-connect behavior";
362 // Loop, although there shouldn't be anything to do.
365 // Make sure the connection was aborted
366 ASSERT_EQ(ccb.state, STATE_FAILED);
368 ASSERT_TRUE(socket->isClosedBySelf());
369 ASSERT_FALSE(socket->isClosedByPeer());
373 * Test calling both write() and closeNow() immediately after connecting,
374 * without waiting for connect to finish.
376 * This should abort the pending write.
378 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
383 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
385 socket->connect(&ccb, server.getAddress(), 30);
387 // Hopefully the connect didn't succeed immediately.
388 // If it did, we can't exercise the close-while-connecting code path.
389 if (ccb.state == STATE_SUCCEEDED) {
390 LOG(INFO) << "connect() succeeded immediately; aborting test "
391 "of write-during-connect behavior";
397 memset(buf, 'a', sizeof(buf));
399 socket->write(&wcb, buf, sizeof(buf));
404 // Loop, although there shouldn't be anything to do.
407 ASSERT_EQ(ccb.state, STATE_FAILED);
408 ASSERT_EQ(wcb.state, STATE_FAILED);
410 ASSERT_TRUE(socket->isClosedBySelf());
411 ASSERT_FALSE(socket->isClosedByPeer());
415 * Test installing a read callback immediately, before connect() finishes.
417 TEST_P(AsyncSocketConnectTest, ConnectAndRead) {
422 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
423 if (GetParam() == TFOState::ENABLED) {
428 socket->connect(&ccb, server.getAddress(), 30);
431 socket->setReadCB(&rcb);
433 if (GetParam() == TFOState::ENABLED) {
434 // Trigger a connection
435 socket->writeChain(nullptr, IOBuf::copyBuffer("hey"));
438 // Even though we haven't looped yet, we should be able to accept
439 // the connection and send data to it.
440 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
442 memset(buf, 'a', sizeof(buf));
443 acceptedSocket->write(buf, sizeof(buf));
444 acceptedSocket->flush();
445 acceptedSocket->close();
447 // Loop, although there shouldn't be anything to do.
450 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
451 ASSERT_EQ(rcb.buffers.size(), 1);
452 ASSERT_EQ(rcb.buffers[0].length, sizeof(buf));
453 ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
455 ASSERT_FALSE(socket->isClosedBySelf());
456 ASSERT_FALSE(socket->isClosedByPeer());
460 * Test installing a read callback and then closing immediately before the
461 * connect attempt finishes.
463 TEST(AsyncSocketTest, ConnectReadAndClose) {
468 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
470 socket->connect(&ccb, server.getAddress(), 30);
472 // Hopefully the connect didn't succeed immediately.
473 // If it did, we can't exercise the close-while-connecting code path.
474 if (ccb.state == STATE_SUCCEEDED) {
475 LOG(INFO) << "connect() succeeded immediately; aborting test "
476 "of read-during-connect behavior";
481 socket->setReadCB(&rcb);
486 // Loop, although there shouldn't be anything to do.
489 ASSERT_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
490 ASSERT_EQ(rcb.buffers.size(), 0);
491 ASSERT_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
493 ASSERT_TRUE(socket->isClosedBySelf());
494 ASSERT_FALSE(socket->isClosedByPeer());
498 * Test both writing and installing a read callback immediately,
499 * before connect() finishes.
501 TEST_P(AsyncSocketConnectTest, ConnectWriteAndRead) {
506 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
507 if (GetParam() == TFOState::ENABLED) {
511 socket->connect(&ccb, server.getAddress(), 30);
515 memset(buf1, 'a', sizeof(buf1));
517 socket->write(&wcb, buf1, sizeof(buf1));
519 // set a read callback
521 socket->setReadCB(&rcb);
523 // Even though we haven't looped yet, we should be able to accept
524 // the connection and send data to it.
525 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
527 memset(buf2, 'b', sizeof(buf2));
528 acceptedSocket->write(buf2, sizeof(buf2));
529 acceptedSocket->flush();
531 // shut down the write half of acceptedSocket, so that the AsyncSocket
532 // will stop reading and we can break out of the event loop.
533 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
538 // Make sure the connect succeeded
539 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
541 // Make sure the AsyncSocket read the data written by the accepted socket
542 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
543 ASSERT_EQ(rcb.buffers.size(), 1);
544 ASSERT_EQ(rcb.buffers[0].length, sizeof(buf2));
545 ASSERT_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
547 // Close the AsyncSocket so we'll see EOF on acceptedSocket
550 // Make sure the accepted socket saw the data written by the AsyncSocket
551 uint8_t readbuf[sizeof(buf1)];
552 acceptedSocket->readAll(readbuf, sizeof(readbuf));
553 ASSERT_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
554 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
555 ASSERT_EQ(bytesRead, 0);
557 ASSERT_FALSE(socket->isClosedBySelf());
558 ASSERT_TRUE(socket->isClosedByPeer());
562 * Test writing to the socket then shutting down writes before the connect
565 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
570 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
572 socket->connect(&ccb, server.getAddress(), 30);
574 // Hopefully the connect didn't succeed immediately.
575 // If it did, we can't exercise the write-while-connecting code path.
576 if (ccb.state == STATE_SUCCEEDED) {
577 LOG(INFO) << "connect() succeeded immediately; skipping test";
581 // Ask to write some data
583 memset(wbuf, 'a', sizeof(wbuf));
585 socket->write(&wcb, wbuf, sizeof(wbuf));
586 socket->shutdownWrite();
589 socket->shutdownWrite();
591 // Even though we haven't looped yet, we should be able to accept
593 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
595 // Since the connection is still in progress, there should be no data to
596 // read yet. Verify that the accepted socket is not readable.
597 struct pollfd fds[1];
598 fds[0].fd = acceptedSocket->getSocketFD();
599 fds[0].events = POLLIN;
601 int rc = poll(fds, 1, 0);
604 // Write data to the accepted socket
605 uint8_t acceptedWbuf[192];
606 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
607 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
608 acceptedSocket->flush();
613 // The loop should have completed the connection, written the queued data,
614 // and shutdown writes on the socket.
616 // Check that the connection was completed successfully and that the write
617 // callback succeeded.
618 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
619 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
621 // Check that we can read the data that was written to the socket, and that
622 // we see an EOF, since its socket was half-shutdown.
623 uint8_t readbuf[sizeof(wbuf)];
624 acceptedSocket->readAll(readbuf, sizeof(readbuf));
625 ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
626 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
627 ASSERT_EQ(bytesRead, 0);
629 // Close the accepted socket. This will cause it to see EOF
630 // and uninstall the read callback when we loop next.
631 acceptedSocket->close();
633 // Install a read callback, then loop again.
635 socket->setReadCB(&rcb);
638 // This loop should have read the data and seen the EOF
639 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
640 ASSERT_EQ(rcb.buffers.size(), 1);
641 ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
642 ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
643 acceptedWbuf, sizeof(acceptedWbuf)), 0);
645 ASSERT_FALSE(socket->isClosedBySelf());
646 ASSERT_FALSE(socket->isClosedByPeer());
650 * Test reading, writing, and shutting down writes before the connect attempt
653 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
658 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
660 socket->connect(&ccb, server.getAddress(), 30);
662 // Hopefully the connect didn't succeed immediately.
663 // If it did, we can't exercise the write-while-connecting code path.
664 if (ccb.state == STATE_SUCCEEDED) {
665 LOG(INFO) << "connect() succeeded immediately; skipping test";
669 // Install a read callback
671 socket->setReadCB(&rcb);
673 // Ask to write some data
675 memset(wbuf, 'a', sizeof(wbuf));
677 socket->write(&wcb, wbuf, sizeof(wbuf));
680 socket->shutdownWrite();
682 // Even though we haven't looped yet, we should be able to accept
684 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
686 // Since the connection is still in progress, there should be no data to
687 // read yet. Verify that the accepted socket is not readable.
688 struct pollfd fds[1];
689 fds[0].fd = acceptedSocket->getSocketFD();
690 fds[0].events = POLLIN;
692 int rc = poll(fds, 1, 0);
695 // Write data to the accepted socket
696 uint8_t acceptedWbuf[192];
697 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
698 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
699 acceptedSocket->flush();
700 // Shutdown writes to the accepted socket. This will cause it to see EOF
701 // and uninstall the read callback.
702 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
707 // The loop should have completed the connection, written the queued data,
708 // shutdown writes on the socket, read the data we wrote to it, and see the
711 // Check that the connection was completed successfully and that the read
712 // and write callbacks were invoked as expected.
713 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
714 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
715 ASSERT_EQ(rcb.buffers.size(), 1);
716 ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
717 ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
718 acceptedWbuf, sizeof(acceptedWbuf)), 0);
719 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
721 // Check that we can read the data that was written to the socket, and that
722 // we see an EOF, since its socket was half-shutdown.
723 uint8_t readbuf[sizeof(wbuf)];
724 acceptedSocket->readAll(readbuf, sizeof(readbuf));
725 ASSERT_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
726 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
727 ASSERT_EQ(bytesRead, 0);
729 // Fully close both sockets
730 acceptedSocket->close();
733 ASSERT_FALSE(socket->isClosedBySelf());
734 ASSERT_TRUE(socket->isClosedByPeer());
738 * Test reading, writing, and calling shutdownWriteNow() before the
739 * connect attempt finishes.
741 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
746 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
748 socket->connect(&ccb, server.getAddress(), 30);
750 // Hopefully the connect didn't succeed immediately.
751 // If it did, we can't exercise the write-while-connecting code path.
752 if (ccb.state == STATE_SUCCEEDED) {
753 LOG(INFO) << "connect() succeeded immediately; skipping test";
757 // Install a read callback
759 socket->setReadCB(&rcb);
761 // Ask to write some data
763 memset(wbuf, 'a', sizeof(wbuf));
765 socket->write(&wcb, wbuf, sizeof(wbuf));
767 // Shutdown writes immediately.
768 // This should immediately discard the data that we just tried to write.
769 socket->shutdownWriteNow();
771 // Verify that writeError() was invoked on the write callback.
772 ASSERT_EQ(wcb.state, STATE_FAILED);
773 ASSERT_EQ(wcb.bytesWritten, 0);
775 // Even though we haven't looped yet, we should be able to accept
777 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
779 // Since the connection is still in progress, there should be no data to
780 // read yet. Verify that the accepted socket is not readable.
781 struct pollfd fds[1];
782 fds[0].fd = acceptedSocket->getSocketFD();
783 fds[0].events = POLLIN;
785 int rc = poll(fds, 1, 0);
788 // Write data to the accepted socket
789 uint8_t acceptedWbuf[192];
790 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
791 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
792 acceptedSocket->flush();
793 // Shutdown writes to the accepted socket. This will cause it to see EOF
794 // and uninstall the read callback.
795 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
800 // The loop should have completed the connection, written the queued data,
801 // shutdown writes on the socket, read the data we wrote to it, and see the
804 // Check that the connection was completed successfully and that the read
805 // callback was invoked as expected.
806 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
807 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
808 ASSERT_EQ(rcb.buffers.size(), 1);
809 ASSERT_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
810 ASSERT_EQ(memcmp(rcb.buffers[0].buffer,
811 acceptedWbuf, sizeof(acceptedWbuf)), 0);
813 // Since we used shutdownWriteNow(), it should have discarded all pending
814 // write data. Verify we see an immediate EOF when reading from the accepted
816 uint8_t readbuf[sizeof(wbuf)];
817 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
818 ASSERT_EQ(bytesRead, 0);
820 // Fully close both sockets
821 acceptedSocket->close();
824 ASSERT_FALSE(socket->isClosedBySelf());
825 ASSERT_TRUE(socket->isClosedByPeer());
828 // Helper function for use in testConnectOptWrite()
829 // Temporarily disable the read callback
830 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
831 // Uninstall the read callback
832 socket->setReadCB(nullptr);
833 // Schedule the read callback to be reinstalled after 1ms
834 socket->getEventBase()->runInLoop(
835 std::bind(&AsyncSocket::setReadCB, socket, rcb));
839 * Test connect+write, then have the connect callback perform another write.
841 * This tests interaction of the optimistic writing after connect with
842 * additional write attempts that occur in the connect callback.
844 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
847 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
851 socket->connect(&ccb, server.getAddress(), 30);
853 // Hopefully the connect didn't succeed immediately.
854 // If it did, we can't exercise the optimistic write code path.
855 if (ccb.state == STATE_SUCCEEDED) {
856 LOG(INFO) << "connect() succeeded immediately; aborting test "
857 "of optimistic write behavior";
861 // Tell the connect callback to perform a write when the connect succeeds
863 scoped_array<char> buf2(new char[size2]);
864 memset(buf2.get(), 'b', size2);
866 ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
867 // Tell the second write callback to close the connection when it is done
868 wcb2.successCallback = [&] { socket->closeNow(); };
871 // Schedule one write() immediately, before the connect finishes
872 scoped_array<char> buf1(new char[size1]);
873 memset(buf1.get(), 'a', size1);
876 socket->write(&wcb1, buf1.get(), size1);
880 // immediately perform a close, before connect() completes
884 // Start reading from the other endpoint after 10ms.
885 // If we're using large buffers, we have to read so that the writes don't
887 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
889 rcb.dataAvailableCallback = std::bind(tmpDisableReads,
890 acceptedSocket.get(), &rcb);
891 socket->getEventBase()->tryRunAfterDelay(
892 std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
895 // Loop. We don't bother accepting on the server socket yet.
896 // The kernel should be able to buffer the write request so it can succeed.
899 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
901 ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
904 ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
909 // Make sure the read callback received all of the data
910 size_t bytesRead = 0;
911 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
912 it != rcb.buffers.end();
914 size_t start = bytesRead;
915 bytesRead += it->length;
916 size_t end = bytesRead;
918 size_t cmpLen = min(size1, end) - start;
919 ASSERT_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
921 if (end > size1 && end <= size1 + size2) {
925 if (start >= size1) {
927 buf2Offset = start - size1;
928 cmpLen = end - start;
930 itOffset = size1 - start;
932 cmpLen = end - size1;
934 ASSERT_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
939 ASSERT_EQ(bytesRead, size1 + size2);
942 TEST(AsyncSocketTest, ConnectCallbackWrite) {
943 // Test using small writes that should both succeed immediately
944 testConnectOptWrite(100, 200);
946 // Test using a large buffer in the connect callback, that should block
947 const size_t largeSize = 32 * 1024 * 1024;
948 testConnectOptWrite(100, largeSize);
950 // Test using a large initial write
951 testConnectOptWrite(largeSize, 100);
953 // Test using two large buffers
954 testConnectOptWrite(largeSize, largeSize);
956 // Test a small write in the connect callback,
957 // but no immediate write before connect completes
958 testConnectOptWrite(0, 64);
960 // Test a large write in the connect callback,
961 // but no immediate write before connect completes
962 testConnectOptWrite(0, largeSize);
964 // Test connect, a small write, then immediately call close() before connect
966 testConnectOptWrite(211, 0, true);
968 // Test connect, a large immediate write (that will block), then immediately
969 // call close() before connect completes
970 testConnectOptWrite(largeSize, 0, true);
973 ///////////////////////////////////////////////////////////////////////////
974 // write() related tests
975 ///////////////////////////////////////////////////////////////////////////
978 * Test writing using a nullptr callback
980 TEST(AsyncSocketTest, WriteNullCallback) {
985 std::shared_ptr<AsyncSocket> socket =
986 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
987 evb.loop(); // loop until the socket is connected
989 // write() with a nullptr callback
991 memset(buf, 'a', sizeof(buf));
992 socket->write(nullptr, buf, sizeof(buf));
994 evb.loop(); // loop until the data is sent
996 // Make sure the server got a connection and received the data
998 server.verifyConnection(buf, sizeof(buf));
1000 ASSERT_TRUE(socket->isClosedBySelf());
1001 ASSERT_FALSE(socket->isClosedByPeer());
1005 * Test writing with a send timeout
1007 TEST(AsyncSocketTest, WriteTimeout) {
1012 std::shared_ptr<AsyncSocket> socket =
1013 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1014 evb.loop(); // loop until the socket is connected
1016 // write() a large chunk of data, with no-one on the other end reading.
1017 // Tricky: the kernel caches the connection metrics for recently-used
1018 // routes (see tcp_no_metrics_save) so a freshly opened connection can
1019 // have a send buffer size bigger than wmem_default. This makes the test
1020 // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
1021 size_t writeLength = 32 * 1024 * 1024;
1022 uint32_t timeout = 200;
1023 socket->setSendTimeout(timeout);
1024 scoped_array<char> buf(new char[writeLength]);
1025 memset(buf.get(), 'a', writeLength);
1027 socket->write(&wcb, buf.get(), writeLength);
1033 // Make sure the write attempt timed out as requested
1034 ASSERT_EQ(wcb.state, STATE_FAILED);
1035 ASSERT_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
1037 // Check that the write timed out within a reasonable period of time.
1038 // We don't check for exactly the specified timeout, since AsyncSocket only
1039 // times out when it hasn't made progress for that period of time.
1041 // On linux, the first write sends a few hundred kb of data, then blocks for
1042 // writability, and then unblocks again after 40ms and is able to write
1043 // another smaller of data before blocking permanently. Therefore it doesn't
1044 // time out until 40ms + timeout.
1046 // I haven't fully verified the cause of this, but I believe it probably
1047 // occurs because the receiving end delays sending an ack for up to 40ms.
1048 // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
1049 // the ack, it can send some more data. However, after that point the
1050 // receiver's kernel buffer is full. This 40ms delay happens even with
1051 // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
1052 // kernel may be automatically disabling TCP_QUICKACK after receiving some
1055 // For now, we simply check that the timeout occurred within 160ms of
1056 // the requested value.
1057 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1061 * Test writing to a socket that the remote endpoint has closed
1063 TEST(AsyncSocketTest, WritePipeError) {
1068 std::shared_ptr<AsyncSocket> socket =
1069 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1070 socket->setSendTimeout(1000);
1071 evb.loop(); // loop until the socket is connected
1073 // accept and immediately close the socket
1074 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1075 acceptedSocket->close();
1077 // write() a large chunk of data
1078 size_t writeLength = 32 * 1024 * 1024;
1079 scoped_array<char> buf(new char[writeLength]);
1080 memset(buf.get(), 'a', writeLength);
1082 socket->write(&wcb, buf.get(), writeLength);
1086 // Make sure the write failed.
1087 // It would be nice if AsyncSocketException could convey the errno value,
1088 // so that we could check for EPIPE
1089 ASSERT_EQ(wcb.state, STATE_FAILED);
1090 ASSERT_EQ(wcb.exception.getType(),
1091 AsyncSocketException::INTERNAL_ERROR);
1093 ASSERT_FALSE(socket->isClosedBySelf());
1094 ASSERT_FALSE(socket->isClosedByPeer());
1098 * Test that bytes written is correctly computed in case of write failure
1100 TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) {
1101 // Send and receive buffer sizes for the sockets.
1102 const int sockBufSize = 8 * 1024;
1104 TestServer server(false, sockBufSize);
1106 AsyncSocket::OptionMap options{
1107 {{SOL_SOCKET, SO_SNDBUF}, sockBufSize},
1108 {{SOL_SOCKET, SO_RCVBUF}, sockBufSize},
1109 {{IPPROTO_TCP, TCP_NODELAY}, 1},
1112 // The current thread will be used by the receiver - use a separate thread
1114 EventBase senderEvb;
1115 std::thread senderThread([&]() { senderEvb.loopForever(); });
1118 std::shared_ptr<AsyncSocket> socket;
1120 senderEvb.runInEventBaseThreadAndWait([&]() {
1121 socket = AsyncSocket::newSocket(&senderEvb);
1122 socket->connect(&ccb, server.getAddress(), 30, options);
1125 // accept the socket on the server side
1126 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1128 // Send a big (45KB) write so that it is partially written. The first write
1129 // is 16KB (8KB on both sides) and subsequent writes are 8KB each. Reading
1130 // just under 24KB would cause 3-4 writes for the total of 32-40KB in the
1131 // following sequence: 16KB + 8KB + 8KB (+ 8KB). This ensures that not all
1132 // bytes are written when the socket is reset. Having at least 3 writes
1133 // ensures that the total size (45KB) would be exceeed in case of overcounting
1134 // based on the initial write size of 16KB.
1135 constexpr size_t sendSize = 45 * 1024;
1136 auto const sendBuf = std::vector<char>(sendSize, 'a');
1140 senderEvb.runInEventBaseThreadAndWait(
1141 [&]() { socket->write(&wcb, sendBuf.data(), sendSize); });
1143 // Reading 20KB would cause three additional writes of 8KB, but less
1144 // than 45KB total, so the socket is reset before all bytes are written.
1145 constexpr size_t recvSize = 20 * 1024;
1146 uint8_t recvBuf[recvSize];
1147 int bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf));
1149 acceptedSocket->closeWithReset();
1151 senderEvb.terminateLoopSoon();
1152 senderThread.join();
1154 LOG(INFO) << "Bytes written: " << wcb.bytesWritten;
1156 ASSERT_EQ(STATE_FAILED, wcb.state);
1157 ASSERT_GE(wcb.bytesWritten, bytesRead);
1158 ASSERT_LE(wcb.bytesWritten, sendSize);
1159 ASSERT_EQ(recvSize, bytesRead);
1160 ASSERT(32 * 1024 == wcb.bytesWritten || 40 * 1024 == wcb.bytesWritten);
1164 * Test writing a mix of simple buffers and IOBufs
1166 TEST(AsyncSocketTest, WriteIOBuf) {
1171 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1173 socket->connect(&ccb, server.getAddress(), 30);
1175 // Accept the connection
1176 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1178 acceptedSocket->setReadCB(&rcb);
1180 // Check if EOR tracking flag can be set and reset.
1181 EXPECT_FALSE(socket->isEorTrackingEnabled());
1182 socket->setEorTracking(true);
1183 EXPECT_TRUE(socket->isEorTrackingEnabled());
1184 socket->setEorTracking(false);
1185 EXPECT_FALSE(socket->isEorTrackingEnabled());
1187 // Write a simple buffer to the socket
1188 constexpr size_t simpleBufLength = 5;
1189 char simpleBuf[simpleBufLength];
1190 memset(simpleBuf, 'a', simpleBufLength);
1192 socket->write(&wcb, simpleBuf, simpleBufLength);
1194 // Write a single-element IOBuf chain
1195 size_t buf1Length = 7;
1196 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1197 memset(buf1->writableData(), 'b', buf1Length);
1198 buf1->append(buf1Length);
1199 unique_ptr<IOBuf> buf1Copy(buf1->clone());
1201 socket->writeChain(&wcb2, std::move(buf1));
1203 // Write a multiple-element IOBuf chain
1204 size_t buf2Length = 11;
1205 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1206 memset(buf2->writableData(), 'c', buf2Length);
1207 buf2->append(buf2Length);
1208 size_t buf3Length = 13;
1209 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1210 memset(buf3->writableData(), 'd', buf3Length);
1211 buf3->append(buf3Length);
1212 buf2->appendChain(std::move(buf3));
1213 unique_ptr<IOBuf> buf2Copy(buf2->clone());
1214 buf2Copy->coalesce();
1216 socket->writeChain(&wcb3, std::move(buf2));
1217 socket->shutdownWrite();
1219 // Let the reads and writes run to completion
1222 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1223 ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1224 ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1226 // Make sure the reader got the right data in the right order
1227 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
1228 ASSERT_EQ(rcb.buffers.size(), 1);
1229 ASSERT_EQ(rcb.buffers[0].length,
1230 simpleBufLength + buf1Length + buf2Length + buf3Length);
1232 memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1234 memcmp(rcb.buffers[0].buffer + simpleBufLength,
1235 buf1Copy->data(), buf1Copy->length()), 0);
1237 memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1238 buf2Copy->data(), buf2Copy->length()), 0);
1240 acceptedSocket->close();
1243 ASSERT_TRUE(socket->isClosedBySelf());
1244 ASSERT_FALSE(socket->isClosedByPeer());
1247 TEST(AsyncSocketTest, WriteIOBufCorked) {
1252 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1254 socket->connect(&ccb, server.getAddress(), 30);
1256 // Accept the connection
1257 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1259 acceptedSocket->setReadCB(&rcb);
1261 // Do three writes, 100ms apart, with the "cork" flag set
1262 // on the second write. The reader should see the first write
1263 // arrive by itself, followed by the second and third writes
1264 // arriving together.
1265 size_t buf1Length = 5;
1266 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1267 memset(buf1->writableData(), 'a', buf1Length);
1268 buf1->append(buf1Length);
1269 size_t buf2Length = 7;
1270 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1271 memset(buf2->writableData(), 'b', buf2Length);
1272 buf2->append(buf2Length);
1273 size_t buf3Length = 11;
1274 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1275 memset(buf3->writableData(), 'c', buf3Length);
1276 buf3->append(buf3Length);
1278 socket->writeChain(&wcb1, std::move(buf1));
1280 DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1281 write2.scheduleTimeout(100);
1283 DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1284 write3.scheduleTimeout(140);
1287 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
1288 ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1289 ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1290 if (wcb3.state != STATE_SUCCEEDED) {
1291 throw(wcb3.exception);
1293 ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1295 // Make sure the reader got the data with the right grouping
1296 ASSERT_EQ(rcb.state, STATE_SUCCEEDED);
1297 ASSERT_EQ(rcb.buffers.size(), 2);
1298 ASSERT_EQ(rcb.buffers[0].length, buf1Length);
1299 ASSERT_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1301 acceptedSocket->close();
1304 ASSERT_TRUE(socket->isClosedBySelf());
1305 ASSERT_FALSE(socket->isClosedByPeer());
1309 * Test performing a zero-length write
1311 TEST(AsyncSocketTest, ZeroLengthWrite) {
1316 std::shared_ptr<AsyncSocket> socket =
1317 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1318 evb.loop(); // loop until the socket is connected
1320 auto acceptedSocket = server.acceptAsync(&evb);
1322 acceptedSocket->setReadCB(&rcb);
1324 size_t len1 = 1024*1024;
1325 size_t len2 = 1024*1024;
1326 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1327 memset(buf.get(), 'a', len1);
1328 memset(buf.get(), 'b', len2);
1334 socket->write(&wcb1, buf.get(), 0);
1335 socket->write(&wcb2, buf.get(), len1);
1336 socket->write(&wcb3, buf.get() + len1, 0);
1337 socket->write(&wcb4, buf.get() + len1, len2);
1340 evb.loop(); // loop until the data is sent
1342 ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1343 ASSERT_EQ(wcb2.state, STATE_SUCCEEDED);
1344 ASSERT_EQ(wcb3.state, STATE_SUCCEEDED);
1345 ASSERT_EQ(wcb4.state, STATE_SUCCEEDED);
1346 rcb.verifyData(buf.get(), len1 + len2);
1348 ASSERT_TRUE(socket->isClosedBySelf());
1349 ASSERT_FALSE(socket->isClosedByPeer());
1352 TEST(AsyncSocketTest, ZeroLengthWritev) {
1357 std::shared_ptr<AsyncSocket> socket =
1358 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1359 evb.loop(); // loop until the socket is connected
1361 auto acceptedSocket = server.acceptAsync(&evb);
1363 acceptedSocket->setReadCB(&rcb);
1365 size_t len1 = 1024*1024;
1366 size_t len2 = 1024*1024;
1367 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1368 memset(buf.get(), 'a', len1);
1369 memset(buf.get(), 'b', len2);
1372 constexpr size_t iovCount = 4;
1373 struct iovec iov[iovCount];
1374 iov[0].iov_base = buf.get();
1375 iov[0].iov_len = len1;
1376 iov[1].iov_base = buf.get() + len1;
1378 iov[2].iov_base = buf.get() + len1;
1379 iov[2].iov_len = len2;
1380 iov[3].iov_base = buf.get() + len1 + len2;
1383 socket->writev(&wcb, iov, iovCount);
1385 evb.loop(); // loop until the data is sent
1387 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1388 rcb.verifyData(buf.get(), len1 + len2);
1390 ASSERT_TRUE(socket->isClosedBySelf());
1391 ASSERT_FALSE(socket->isClosedByPeer());
1394 ///////////////////////////////////////////////////////////////////////////
1395 // close() related tests
1396 ///////////////////////////////////////////////////////////////////////////
1399 * Test calling close() with pending writes when the socket is already closing.
1401 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1406 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1408 socket->connect(&ccb, server.getAddress(), 30);
1410 // accept the socket on the server side
1411 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1413 // Loop to ensure the connect has completed
1416 // Make sure we are connected
1417 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
1419 // Schedule pending writes, until several write attempts have blocked
1421 memset(buf, 'a', sizeof(buf));
1422 typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1423 WriteCallbackVector writeCallbacks;
1425 writeCallbacks.reserve(5);
1426 while (writeCallbacks.size() < 5) {
1427 std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1429 socket->write(wcb.get(), buf, sizeof(buf));
1430 if (wcb->state == STATE_SUCCEEDED) {
1431 // Succeeded immediately. Keep performing more writes
1435 // This write is blocked.
1436 // Have the write callback call close() when writeError() is invoked
1437 wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1438 writeCallbacks.push_back(wcb);
1441 // Call closeNow() to immediately fail the pending writes
1444 // Make sure writeError() was invoked on all of the pending write callbacks
1445 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1446 it != writeCallbacks.end();
1448 ASSERT_EQ((*it)->state, STATE_FAILED);
1451 ASSERT_TRUE(socket->isClosedBySelf());
1452 ASSERT_FALSE(socket->isClosedByPeer());
1455 ///////////////////////////////////////////////////////////////////////////
1456 // ImmediateRead related tests
1457 ///////////////////////////////////////////////////////////////////////////
1459 /* AsyncSocket use to verify immediate read works */
1460 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1462 bool immediateReadCalled = false;
1463 explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1465 void checkForImmediateRead() noexcept override {
1466 immediateReadCalled = true;
1467 AsyncSocket::handleRead();
1471 TEST(AsyncSocket, ConnectReadImmediateRead) {
1474 const size_t maxBufferSz = 100;
1475 const size_t maxReadsPerEvent = 1;
1476 const size_t expectedDataSz = maxBufferSz * 3;
1477 char expectedData[expectedDataSz];
1478 memset(expectedData, 'j', expectedDataSz);
1481 ReadCallback rcb(maxBufferSz);
1482 AsyncSocketImmediateRead socket(&evb);
1483 socket.connect(nullptr, server.getAddress(), 30);
1485 evb.loop(); // loop until the socket is connected
1487 socket.setReadCB(&rcb);
1488 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1489 socket.immediateReadCalled = false;
1491 auto acceptedSocket = server.acceptAsync(&evb);
1493 ReadCallback rcbServer;
1494 WriteCallback wcbServer;
1495 rcbServer.dataAvailableCallback = [&]() {
1496 if (rcbServer.dataRead() == expectedDataSz) {
1497 // write back all data read
1498 rcbServer.verifyData(expectedData, expectedDataSz);
1499 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1500 acceptedSocket->close();
1503 acceptedSocket->setReadCB(&rcbServer);
1507 socket.write(&wcb1, expectedData, expectedDataSz);
1509 ASSERT_EQ(wcb1.state, STATE_SUCCEEDED);
1510 rcb.verifyData(expectedData, expectedDataSz);
1511 ASSERT_EQ(socket.immediateReadCalled, true);
1513 ASSERT_FALSE(socket.isClosedBySelf());
1514 ASSERT_FALSE(socket.isClosedByPeer());
1517 TEST(AsyncSocket, ConnectReadUninstallRead) {
1520 const size_t maxBufferSz = 100;
1521 const size_t maxReadsPerEvent = 1;
1522 const size_t expectedDataSz = maxBufferSz * 3;
1523 char expectedData[expectedDataSz];
1524 memset(expectedData, 'k', expectedDataSz);
1527 ReadCallback rcb(maxBufferSz);
1528 AsyncSocketImmediateRead socket(&evb);
1529 socket.connect(nullptr, server.getAddress(), 30);
1531 evb.loop(); // loop until the socket is connected
1533 socket.setReadCB(&rcb);
1534 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1535 socket.immediateReadCalled = false;
1537 auto acceptedSocket = server.acceptAsync(&evb);
1539 ReadCallback rcbServer;
1540 WriteCallback wcbServer;
1541 rcbServer.dataAvailableCallback = [&]() {
1542 if (rcbServer.dataRead() == expectedDataSz) {
1543 // write back all data read
1544 rcbServer.verifyData(expectedData, expectedDataSz);
1545 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1546 acceptedSocket->close();
1549 acceptedSocket->setReadCB(&rcbServer);
1551 rcb.dataAvailableCallback = [&]() {
1552 // we read data and reset readCB
1553 socket.setReadCB(nullptr);
1558 socket.write(&wcb, expectedData, expectedDataSz);
1560 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
1562 /* we shoud've only read maxBufferSz data since readCallback_
1563 * was reset in dataAvailableCallback */
1564 ASSERT_EQ(rcb.dataRead(), maxBufferSz);
1565 ASSERT_EQ(socket.immediateReadCalled, false);
1567 ASSERT_FALSE(socket.isClosedBySelf());
1568 ASSERT_FALSE(socket.isClosedByPeer());
1572 // - Test connect() and have the connect callback set the read callback
1573 // - Test connect() and have the connect callback unset the read callback
1574 // - Test reading/writing/closing/destroying the socket in the connect callback
1575 // - Test reading/writing/closing/destroying the socket in the read callback
1576 // - Test reading/writing/closing/destroying the socket in the write callback
1577 // - Test one-way shutdown behavior
1578 // - Test changing the EventBase
1580 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1581 // in connectSuccess(), readDataAvailable(), writeSuccess()
1584 ///////////////////////////////////////////////////////////////////////////
1585 // AsyncServerSocket tests
1586 ///////////////////////////////////////////////////////////////////////////
1589 * Helper ConnectionEventCallback class for the test code.
1590 * It maintains counters protected by a spin lock.
1592 class TestConnectionEventCallback :
1593 public AsyncServerSocket::ConnectionEventCallback {
1595 virtual void onConnectionAccepted(
1596 const int /* socket */,
1597 const SocketAddress& /* addr */) noexcept override {
1598 folly::RWSpinLock::WriteHolder holder(spinLock_);
1599 connectionAccepted_++;
1602 virtual void onConnectionAcceptError(const int /* err */) noexcept override {
1603 folly::RWSpinLock::WriteHolder holder(spinLock_);
1604 connectionAcceptedError_++;
1607 virtual void onConnectionDropped(
1608 const int /* socket */,
1609 const SocketAddress& /* addr */) noexcept override {
1610 folly::RWSpinLock::WriteHolder holder(spinLock_);
1611 connectionDropped_++;
1614 virtual void onConnectionEnqueuedForAcceptorCallback(
1615 const int /* socket */,
1616 const SocketAddress& /* addr */) noexcept override {
1617 folly::RWSpinLock::WriteHolder holder(spinLock_);
1618 connectionEnqueuedForAcceptCallback_++;
1621 virtual void onConnectionDequeuedByAcceptorCallback(
1622 const int /* socket */,
1623 const SocketAddress& /* addr */) noexcept override {
1624 folly::RWSpinLock::WriteHolder holder(spinLock_);
1625 connectionDequeuedByAcceptCallback_++;
1628 virtual void onBackoffStarted() noexcept override {
1629 folly::RWSpinLock::WriteHolder holder(spinLock_);
1633 virtual void onBackoffEnded() noexcept override {
1634 folly::RWSpinLock::WriteHolder holder(spinLock_);
1638 virtual void onBackoffError() noexcept override {
1639 folly::RWSpinLock::WriteHolder holder(spinLock_);
1643 unsigned int getConnectionAccepted() const {
1644 folly::RWSpinLock::ReadHolder holder(spinLock_);
1645 return connectionAccepted_;
1648 unsigned int getConnectionAcceptedError() const {
1649 folly::RWSpinLock::ReadHolder holder(spinLock_);
1650 return connectionAcceptedError_;
1653 unsigned int getConnectionDropped() const {
1654 folly::RWSpinLock::ReadHolder holder(spinLock_);
1655 return connectionDropped_;
1658 unsigned int getConnectionEnqueuedForAcceptCallback() const {
1659 folly::RWSpinLock::ReadHolder holder(spinLock_);
1660 return connectionEnqueuedForAcceptCallback_;
1663 unsigned int getConnectionDequeuedByAcceptCallback() const {
1664 folly::RWSpinLock::ReadHolder holder(spinLock_);
1665 return connectionDequeuedByAcceptCallback_;
1668 unsigned int getBackoffStarted() const {
1669 folly::RWSpinLock::ReadHolder holder(spinLock_);
1670 return backoffStarted_;
1673 unsigned int getBackoffEnded() const {
1674 folly::RWSpinLock::ReadHolder holder(spinLock_);
1675 return backoffEnded_;
1678 unsigned int getBackoffError() const {
1679 folly::RWSpinLock::ReadHolder holder(spinLock_);
1680 return backoffError_;
1684 mutable folly::RWSpinLock spinLock_;
1685 unsigned int connectionAccepted_{0};
1686 unsigned int connectionAcceptedError_{0};
1687 unsigned int connectionDropped_{0};
1688 unsigned int connectionEnqueuedForAcceptCallback_{0};
1689 unsigned int connectionDequeuedByAcceptCallback_{0};
1690 unsigned int backoffStarted_{0};
1691 unsigned int backoffEnded_{0};
1692 unsigned int backoffError_{0};
1696 * Helper AcceptCallback class for the test code
1697 * It records the callbacks that were invoked, and also supports calling
1698 * generic std::function objects in each callback.
1700 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1709 EventInfo(int fd, const folly::SocketAddress& addr)
1710 : type(TYPE_ACCEPT),
1714 explicit EventInfo(const std::string& msg)
1719 explicit EventInfo(EventType et)
1726 int fd; // valid for TYPE_ACCEPT
1727 folly::SocketAddress address; // valid for TYPE_ACCEPT
1728 string errorMsg; // valid for TYPE_ERROR
1730 typedef std::deque<EventInfo> EventList;
1732 TestAcceptCallback()
1733 : connectionAcceptedFn_(),
1738 std::deque<EventInfo>* getEvents() {
1742 void setConnectionAcceptedFn(
1743 const std::function<void(int, const folly::SocketAddress&)>& fn) {
1744 connectionAcceptedFn_ = fn;
1746 void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1747 acceptErrorFn_ = fn;
1749 void setAcceptStartedFn(const std::function<void()>& fn) {
1750 acceptStartedFn_ = fn;
1752 void setAcceptStoppedFn(const std::function<void()>& fn) {
1753 acceptStoppedFn_ = fn;
1756 void connectionAccepted(
1757 int fd, const folly::SocketAddress& clientAddr) noexcept override {
1758 events_.emplace_back(fd, clientAddr);
1760 if (connectionAcceptedFn_) {
1761 connectionAcceptedFn_(fd, clientAddr);
1764 void acceptError(const std::exception& ex) noexcept override {
1765 events_.emplace_back(ex.what());
1767 if (acceptErrorFn_) {
1771 void acceptStarted() noexcept override {
1772 events_.emplace_back(TYPE_START);
1774 if (acceptStartedFn_) {
1778 void acceptStopped() noexcept override {
1779 events_.emplace_back(TYPE_STOP);
1781 if (acceptStoppedFn_) {
1787 std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1788 std::function<void(const std::exception&)> acceptErrorFn_;
1789 std::function<void()> acceptStartedFn_;
1790 std::function<void()> acceptStoppedFn_;
1792 std::deque<EventInfo> events_;
1797 * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1799 TEST(AsyncSocketTest, ServerAcceptOptions) {
1800 EventBase eventBase;
1802 // Create a server socket
1803 std::shared_ptr<AsyncServerSocket> serverSocket(
1804 AsyncServerSocket::newSocket(&eventBase));
1805 serverSocket->bind(0);
1806 serverSocket->listen(16);
1807 folly::SocketAddress serverAddress;
1808 serverSocket->getAddress(&serverAddress);
1810 // Add a callback to accept one connection then stop the loop
1811 TestAcceptCallback acceptCallback;
1812 acceptCallback.setConnectionAcceptedFn(
1813 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1814 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1816 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1817 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
1819 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
1820 serverSocket->startAccepting();
1822 // Connect to the server socket
1823 std::shared_ptr<AsyncSocket> socket(
1824 AsyncSocket::newSocket(&eventBase, serverAddress));
1828 // Verify that the server accepted a connection
1829 ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
1830 ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
1831 TestAcceptCallback::TYPE_START);
1832 ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
1833 TestAcceptCallback::TYPE_ACCEPT);
1834 ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
1835 TestAcceptCallback::TYPE_STOP);
1836 int fd = acceptCallback.getEvents()->at(1).fd;
1838 // The accepted connection should already be in non-blocking mode
1839 int flags = fcntl(fd, F_GETFL, 0);
1840 ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1843 // The accepted connection should already have TCP_NODELAY set
1845 socklen_t valueLength = sizeof(value);
1846 int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1848 ASSERT_EQ(value, 1);
1853 * Test AsyncServerSocket::removeAcceptCallback()
1855 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1856 // Create a new AsyncServerSocket
1857 EventBase eventBase;
1858 std::shared_ptr<AsyncServerSocket> serverSocket(
1859 AsyncServerSocket::newSocket(&eventBase));
1860 serverSocket->bind(0);
1861 serverSocket->listen(16);
1862 folly::SocketAddress serverAddress;
1863 serverSocket->getAddress(&serverAddress);
1865 // Add several accept callbacks
1866 TestAcceptCallback cb1;
1867 TestAcceptCallback cb2;
1868 TestAcceptCallback cb3;
1869 TestAcceptCallback cb4;
1870 TestAcceptCallback cb5;
1871 TestAcceptCallback cb6;
1872 TestAcceptCallback cb7;
1874 // Test having callbacks remove other callbacks before them on the list,
1875 // after them on the list, or removing themselves.
1877 // Have callback 2 remove callback 3 and callback 5 the first time it is
1880 cb1.setConnectionAcceptedFn([&](int /* fd */,
1881 const folly::SocketAddress& /* addr */) {
1882 std::shared_ptr<AsyncSocket> sock2(
1883 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1885 cb3.setConnectionAcceptedFn(
1886 [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1887 cb4.setConnectionAcceptedFn(
1888 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1889 std::shared_ptr<AsyncSocket> sock3(
1890 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1892 cb5.setConnectionAcceptedFn(
1893 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1894 std::shared_ptr<AsyncSocket> sock5(
1895 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1898 cb2.setConnectionAcceptedFn(
1899 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1900 if (cb2Count == 0) {
1901 serverSocket->removeAcceptCallback(&cb3, nullptr);
1902 serverSocket->removeAcceptCallback(&cb5, nullptr);
1906 // Have callback 6 remove callback 4 the first time it is called,
1907 // and destroy the server socket the second time it is called
1909 cb6.setConnectionAcceptedFn(
1910 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1911 if (cb6Count == 0) {
1912 serverSocket->removeAcceptCallback(&cb4, nullptr);
1913 std::shared_ptr<AsyncSocket> sock6(
1914 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1915 std::shared_ptr<AsyncSocket> sock7(
1916 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1917 std::shared_ptr<AsyncSocket> sock8(
1918 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1921 serverSocket.reset();
1925 // Have callback 7 remove itself
1926 cb7.setConnectionAcceptedFn(
1927 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1928 serverSocket->removeAcceptCallback(&cb7, nullptr);
1931 serverSocket->addAcceptCallback(&cb1, &eventBase);
1932 serverSocket->addAcceptCallback(&cb2, &eventBase);
1933 serverSocket->addAcceptCallback(&cb3, &eventBase);
1934 serverSocket->addAcceptCallback(&cb4, &eventBase);
1935 serverSocket->addAcceptCallback(&cb5, &eventBase);
1936 serverSocket->addAcceptCallback(&cb6, &eventBase);
1937 serverSocket->addAcceptCallback(&cb7, &eventBase);
1938 serverSocket->startAccepting();
1940 // Make several connections to the socket
1941 std::shared_ptr<AsyncSocket> sock1(
1942 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1943 std::shared_ptr<AsyncSocket> sock4(
1944 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1946 // Loop until we are stopped
1949 // Check to make sure that the expected callbacks were invoked.
1951 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1952 // the AcceptCallbacks in round-robin fashion, in the order that they were
1953 // added. The code is implemented this way right now, but the API doesn't
1954 // explicitly require it be done this way. If we change the code not to be
1955 // exactly round robin in the future, we can simplify the test checks here.
1956 // (We'll also need to update the termination code, since we expect cb6 to
1957 // get called twice to terminate the loop.)
1958 ASSERT_EQ(cb1.getEvents()->size(), 4);
1959 ASSERT_EQ(cb1.getEvents()->at(0).type,
1960 TestAcceptCallback::TYPE_START);
1961 ASSERT_EQ(cb1.getEvents()->at(1).type,
1962 TestAcceptCallback::TYPE_ACCEPT);
1963 ASSERT_EQ(cb1.getEvents()->at(2).type,
1964 TestAcceptCallback::TYPE_ACCEPT);
1965 ASSERT_EQ(cb1.getEvents()->at(3).type,
1966 TestAcceptCallback::TYPE_STOP);
1968 ASSERT_EQ(cb2.getEvents()->size(), 4);
1969 ASSERT_EQ(cb2.getEvents()->at(0).type,
1970 TestAcceptCallback::TYPE_START);
1971 ASSERT_EQ(cb2.getEvents()->at(1).type,
1972 TestAcceptCallback::TYPE_ACCEPT);
1973 ASSERT_EQ(cb2.getEvents()->at(2).type,
1974 TestAcceptCallback::TYPE_ACCEPT);
1975 ASSERT_EQ(cb2.getEvents()->at(3).type,
1976 TestAcceptCallback::TYPE_STOP);
1978 ASSERT_EQ(cb3.getEvents()->size(), 2);
1979 ASSERT_EQ(cb3.getEvents()->at(0).type,
1980 TestAcceptCallback::TYPE_START);
1981 ASSERT_EQ(cb3.getEvents()->at(1).type,
1982 TestAcceptCallback::TYPE_STOP);
1984 ASSERT_EQ(cb4.getEvents()->size(), 3);
1985 ASSERT_EQ(cb4.getEvents()->at(0).type,
1986 TestAcceptCallback::TYPE_START);
1987 ASSERT_EQ(cb4.getEvents()->at(1).type,
1988 TestAcceptCallback::TYPE_ACCEPT);
1989 ASSERT_EQ(cb4.getEvents()->at(2).type,
1990 TestAcceptCallback::TYPE_STOP);
1992 ASSERT_EQ(cb5.getEvents()->size(), 2);
1993 ASSERT_EQ(cb5.getEvents()->at(0).type,
1994 TestAcceptCallback::TYPE_START);
1995 ASSERT_EQ(cb5.getEvents()->at(1).type,
1996 TestAcceptCallback::TYPE_STOP);
1998 ASSERT_EQ(cb6.getEvents()->size(), 4);
1999 ASSERT_EQ(cb6.getEvents()->at(0).type,
2000 TestAcceptCallback::TYPE_START);
2001 ASSERT_EQ(cb6.getEvents()->at(1).type,
2002 TestAcceptCallback::TYPE_ACCEPT);
2003 ASSERT_EQ(cb6.getEvents()->at(2).type,
2004 TestAcceptCallback::TYPE_ACCEPT);
2005 ASSERT_EQ(cb6.getEvents()->at(3).type,
2006 TestAcceptCallback::TYPE_STOP);
2008 ASSERT_EQ(cb7.getEvents()->size(), 3);
2009 ASSERT_EQ(cb7.getEvents()->at(0).type,
2010 TestAcceptCallback::TYPE_START);
2011 ASSERT_EQ(cb7.getEvents()->at(1).type,
2012 TestAcceptCallback::TYPE_ACCEPT);
2013 ASSERT_EQ(cb7.getEvents()->at(2).type,
2014 TestAcceptCallback::TYPE_STOP);
2018 * Test AsyncServerSocket::removeAcceptCallback()
2020 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
2021 // Create a new AsyncServerSocket
2022 EventBase eventBase;
2023 std::shared_ptr<AsyncServerSocket> serverSocket(
2024 AsyncServerSocket::newSocket(&eventBase));
2025 serverSocket->bind(0);
2026 serverSocket->listen(16);
2027 folly::SocketAddress serverAddress;
2028 serverSocket->getAddress(&serverAddress);
2030 // Add several accept callbacks
2031 TestAcceptCallback cb1;
2032 auto thread_id = std::this_thread::get_id();
2033 cb1.setAcceptStartedFn([&](){
2034 CHECK_NE(thread_id, std::this_thread::get_id());
2035 thread_id = std::this_thread::get_id();
2037 cb1.setConnectionAcceptedFn(
2038 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2039 ASSERT_EQ(thread_id, std::this_thread::get_id());
2040 serverSocket->removeAcceptCallback(&cb1, &eventBase);
2042 cb1.setAcceptStoppedFn([&](){
2043 ASSERT_EQ(thread_id, std::this_thread::get_id());
2046 // Test having callbacks remove other callbacks before them on the list,
2047 serverSocket->addAcceptCallback(&cb1, &eventBase);
2048 serverSocket->startAccepting();
2050 // Make several connections to the socket
2051 std::shared_ptr<AsyncSocket> sock1(
2052 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
2054 // Loop in another thread
2055 auto other = std::thread([&](){
2060 // Check to make sure that the expected callbacks were invoked.
2062 // NOTE: This code depends on the AsyncServerSocket operating calling all of
2063 // the AcceptCallbacks in round-robin fashion, in the order that they were
2064 // added. The code is implemented this way right now, but the API doesn't
2065 // explicitly require it be done this way. If we change the code not to be
2066 // exactly round robin in the future, we can simplify the test checks here.
2067 // (We'll also need to update the termination code, since we expect cb6 to
2068 // get called twice to terminate the loop.)
2069 ASSERT_EQ(cb1.getEvents()->size(), 3);
2070 ASSERT_EQ(cb1.getEvents()->at(0).type,
2071 TestAcceptCallback::TYPE_START);
2072 ASSERT_EQ(cb1.getEvents()->at(1).type,
2073 TestAcceptCallback::TYPE_ACCEPT);
2074 ASSERT_EQ(cb1.getEvents()->at(2).type,
2075 TestAcceptCallback::TYPE_STOP);
2079 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
2080 EventBase* eventBase = serverSocket->getEventBase();
2083 // Add a callback to accept one connection then stop accepting
2084 TestAcceptCallback acceptCallback;
2085 acceptCallback.setConnectionAcceptedFn(
2086 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2087 serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
2089 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2090 serverSocket->removeAcceptCallback(&acceptCallback, eventBase);
2092 serverSocket->addAcceptCallback(&acceptCallback, eventBase);
2093 serverSocket->startAccepting();
2095 // Connect to the server socket
2096 folly::SocketAddress serverAddress;
2097 serverSocket->getAddress(&serverAddress);
2098 AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
2100 // Loop to process all events
2103 // Verify that the server accepted a connection
2104 ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
2105 ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
2106 TestAcceptCallback::TYPE_START);
2107 ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
2108 TestAcceptCallback::TYPE_ACCEPT);
2109 ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
2110 TestAcceptCallback::TYPE_STOP);
2113 /* Verify that we don't leak sockets if we are destroyed()
2114 * and there are still writes pending
2116 * If destroy() only calls close() instead of closeNow(),
2117 * it would shutdown(writes) on the socket, but it would
2118 * never be close()'d, and the socket would leak
2120 TEST(AsyncSocketTest, DestroyCloseTest) {
2126 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
2128 socket->connect(&ccb, server.getAddress(), 30);
2130 // Accept the connection
2131 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
2133 acceptedSocket->setReadCB(&rcb);
2135 // Write a large buffer to the socket that is larger than kernel buffer
2136 size_t simpleBufLength = 5000000;
2137 char* simpleBuf = new char[simpleBufLength];
2138 memset(simpleBuf, 'a', simpleBufLength);
2141 // Let the reads and writes run to completion
2142 int fd = acceptedSocket->getFd();
2144 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2146 acceptedSocket.reset();
2148 // Test that server socket was closed
2149 folly::test::msvcSuppressAbortOnInvalidParams([&] {
2150 ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2152 ASSERT_EQ(errno, EBADF);
2158 * Test AsyncServerSocket::useExistingSocket()
2160 TEST(AsyncSocketTest, ServerExistingSocket) {
2161 EventBase eventBase;
2163 // Test creating a socket, and letting AsyncServerSocket bind and listen
2165 // Manually create a socket
2166 int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2169 // Create a server socket
2170 AsyncServerSocket::UniquePtr serverSocket(
2171 new AsyncServerSocket(&eventBase));
2172 serverSocket->useExistingSocket(fd);
2173 folly::SocketAddress address;
2174 serverSocket->getAddress(&address);
2176 serverSocket->bind(address);
2177 serverSocket->listen(16);
2179 // Make sure the socket works
2180 serverSocketSanityTest(serverSocket.get());
2183 // Test creating a socket and binding manually,
2184 // then letting AsyncServerSocket listen
2186 // Manually create a socket
2187 int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2190 struct sockaddr_in addr;
2191 addr.sin_family = AF_INET;
2193 addr.sin_addr.s_addr = INADDR_ANY;
2194 ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2196 // Look up the address that we bound to
2197 folly::SocketAddress boundAddress;
2198 boundAddress.setFromLocalAddress(fd);
2200 // Create a server socket
2201 AsyncServerSocket::UniquePtr serverSocket(
2202 new AsyncServerSocket(&eventBase));
2203 serverSocket->useExistingSocket(fd);
2204 serverSocket->listen(16);
2206 // Make sure AsyncServerSocket reports the same address that we bound to
2207 folly::SocketAddress serverSocketAddress;
2208 serverSocket->getAddress(&serverSocketAddress);
2209 ASSERT_EQ(boundAddress, serverSocketAddress);
2211 // Make sure the socket works
2212 serverSocketSanityTest(serverSocket.get());
2215 // Test creating a socket, binding and listening manually,
2216 // then giving it to AsyncServerSocket
2218 // Manually create a socket
2219 int fd = fsp::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2222 struct sockaddr_in addr;
2223 addr.sin_family = AF_INET;
2225 addr.sin_addr.s_addr = INADDR_ANY;
2226 ASSERT_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2228 // Look up the address that we bound to
2229 folly::SocketAddress boundAddress;
2230 boundAddress.setFromLocalAddress(fd);
2232 ASSERT_EQ(listen(fd, 16), 0);
2234 // Create a server socket
2235 AsyncServerSocket::UniquePtr serverSocket(
2236 new AsyncServerSocket(&eventBase));
2237 serverSocket->useExistingSocket(fd);
2239 // Make sure AsyncServerSocket reports the same address that we bound to
2240 folly::SocketAddress serverSocketAddress;
2241 serverSocket->getAddress(&serverSocketAddress);
2242 ASSERT_EQ(boundAddress, serverSocketAddress);
2244 // Make sure the socket works
2245 serverSocketSanityTest(serverSocket.get());
2249 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2250 EventBase eventBase;
2252 // Create a server socket
2253 std::shared_ptr<AsyncServerSocket> serverSocket(
2254 AsyncServerSocket::newSocket(&eventBase));
2256 path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2257 folly::SocketAddress serverAddress;
2258 serverAddress.setFromPath(path);
2259 serverSocket->bind(serverAddress);
2260 serverSocket->listen(16);
2262 // Add a callback to accept one connection then stop the loop
2263 TestAcceptCallback acceptCallback;
2264 acceptCallback.setConnectionAcceptedFn(
2265 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2266 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2268 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2269 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2271 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2272 serverSocket->startAccepting();
2274 // Connect to the server socket
2275 std::shared_ptr<AsyncSocket> socket(
2276 AsyncSocket::newSocket(&eventBase, serverAddress));
2280 // Verify that the server accepted a connection
2281 ASSERT_EQ(acceptCallback.getEvents()->size(), 3);
2282 ASSERT_EQ(acceptCallback.getEvents()->at(0).type,
2283 TestAcceptCallback::TYPE_START);
2284 ASSERT_EQ(acceptCallback.getEvents()->at(1).type,
2285 TestAcceptCallback::TYPE_ACCEPT);
2286 ASSERT_EQ(acceptCallback.getEvents()->at(2).type,
2287 TestAcceptCallback::TYPE_STOP);
2288 int fd = acceptCallback.getEvents()->at(1).fd;
2290 // The accepted connection should already be in non-blocking mode
2291 int flags = fcntl(fd, F_GETFL, 0);
2292 ASSERT_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2295 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2296 EventBase eventBase;
2297 TestConnectionEventCallback connectionEventCallback;
2299 // Create a server socket
2300 std::shared_ptr<AsyncServerSocket> serverSocket(
2301 AsyncServerSocket::newSocket(&eventBase));
2302 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2303 serverSocket->bind(0);
2304 serverSocket->listen(16);
2305 folly::SocketAddress serverAddress;
2306 serverSocket->getAddress(&serverAddress);
2308 // Add a callback to accept one connection then stop the loop
2309 TestAcceptCallback acceptCallback;
2310 acceptCallback.setConnectionAcceptedFn(
2311 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2312 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2314 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2315 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2317 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2318 serverSocket->startAccepting();
2320 // Connect to the server socket
2321 std::shared_ptr<AsyncSocket> socket(
2322 AsyncSocket::newSocket(&eventBase, serverAddress));
2326 // Validate the connection event counters
2327 ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2328 ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2329 ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2331 connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2332 ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2333 ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2334 ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2335 ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2338 TEST(AsyncSocketTest, CallbackInPrimaryEventBase) {
2339 EventBase eventBase;
2340 TestConnectionEventCallback connectionEventCallback;
2342 // Create a server socket
2343 std::shared_ptr<AsyncServerSocket> serverSocket(
2344 AsyncServerSocket::newSocket(&eventBase));
2345 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2346 serverSocket->bind(0);
2347 serverSocket->listen(16);
2348 folly::SocketAddress serverAddress;
2349 serverSocket->getAddress(&serverAddress);
2351 // Add a callback to accept one connection then stop the loop
2352 TestAcceptCallback acceptCallback;
2353 acceptCallback.setConnectionAcceptedFn(
2354 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2355 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2357 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2358 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2360 bool acceptStartedFlag{false};
2361 acceptCallback.setAcceptStartedFn([&acceptStartedFlag](){
2362 acceptStartedFlag = true;
2364 bool acceptStoppedFlag{false};
2365 acceptCallback.setAcceptStoppedFn([&acceptStoppedFlag](){
2366 acceptStoppedFlag = true;
2368 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2369 serverSocket->startAccepting();
2371 // Connect to the server socket
2372 std::shared_ptr<AsyncSocket> socket(
2373 AsyncSocket::newSocket(&eventBase, serverAddress));
2377 ASSERT_TRUE(acceptStartedFlag);
2378 ASSERT_TRUE(acceptStoppedFlag);
2379 // Validate the connection event counters
2380 ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2381 ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2382 ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2384 connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 0);
2385 ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 0);
2386 ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2387 ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2388 ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2394 * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2396 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2397 EventBase eventBase;
2399 // Counter of how many connections have been accepted
2402 // Create a server socket
2403 auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2404 serverSocket->bind(0);
2405 serverSocket->listen(16);
2406 folly::SocketAddress serverAddress;
2407 serverSocket->getAddress(&serverAddress);
2409 // Add a callback to accept connections
2410 TestAcceptCallback acceptCallback;
2411 acceptCallback.setConnectionAcceptedFn(
2412 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2414 ASSERT_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2417 // all messages are processed, remove accept callback
2418 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2421 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2422 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2424 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2425 serverSocket->startAccepting();
2427 // Connect to the server socket, 4 clients, there are 4 connections
2428 auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2429 auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2430 auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2431 auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2437 * Test AsyncTransport::BufferCallback
2439 TEST(AsyncSocketTest, BufferTest) {
2443 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2444 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2446 socket->connect(&ccb, server.getAddress(), 30, option);
2448 char buf[100 * 1024];
2449 memset(buf, 'c', sizeof(buf));
2452 socket->setBufferCallback(&bcb);
2453 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2456 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2457 ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
2459 ASSERT_TRUE(bcb.hasBuffered());
2460 ASSERT_TRUE(bcb.hasBufferCleared());
2463 server.verifyConnection(buf, sizeof(buf));
2465 ASSERT_TRUE(socket->isClosedBySelf());
2466 ASSERT_FALSE(socket->isClosedByPeer());
2469 TEST(AsyncSocketTest, BufferCallbackKill) {
2472 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2473 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2475 socket->connect(&ccb, server.getAddress(), 30, option);
2478 char buf[100 * 1024];
2479 memset(buf, 'c', sizeof(buf));
2481 socket->setBufferCallback(&bcb);
2483 wcb.successCallback = [&] {
2484 ASSERT_TRUE(socket.unique());
2488 // This will trigger AsyncSocket::handleWrite,
2489 // which calls WriteCallback::writeSuccess,
2490 // which calls wcb.successCallback above,
2491 // which tries to delete socket
2492 // Then, the socket will also try to use this BufferCallback
2493 // And that should crash us, if there is no DestructorGuard on the stack
2494 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2497 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2501 TEST(AsyncSocketTest, ConnectTFO) {
2502 // Start listening on a local port
2503 TestServer server(true);
2505 // Connect using a AsyncSocket
2507 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2508 socket->enableTFO();
2510 socket->connect(&cb, server.getAddress(), 30);
2512 std::array<uint8_t, 128> buf;
2513 memset(buf.data(), 'a', buf.size());
2515 std::array<uint8_t, 3> readBuf;
2516 auto sendBuf = IOBuf::copyBuffer("hey");
2519 auto acceptedSocket = server.accept();
2520 acceptedSocket->write(buf.data(), buf.size());
2521 acceptedSocket->flush();
2522 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2523 acceptedSocket->close();
2528 ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2529 EXPECT_LE(0, socket->getConnectTime().count());
2530 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2531 EXPECT_TRUE(socket->getTFOAttempted());
2533 // Should trigger the connect
2534 WriteCallback write;
2536 socket->writeChain(&write, sendBuf->clone());
2537 socket->setReadCB(&rcb);
2542 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2543 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2544 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2545 ASSERT_EQ(1, rcb.buffers.size());
2546 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2547 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2548 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2551 TEST(AsyncSocketTest, ConnectTFOSupplyEarlyReadCB) {
2552 // Start listening on a local port
2553 TestServer server(true);
2555 // Connect using a AsyncSocket
2557 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2558 socket->enableTFO();
2560 socket->connect(&cb, server.getAddress(), 30);
2562 socket->setReadCB(&rcb);
2564 std::array<uint8_t, 128> buf;
2565 memset(buf.data(), 'a', buf.size());
2567 std::array<uint8_t, 3> readBuf;
2568 auto sendBuf = IOBuf::copyBuffer("hey");
2571 auto acceptedSocket = server.accept();
2572 acceptedSocket->write(buf.data(), buf.size());
2573 acceptedSocket->flush();
2574 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2575 acceptedSocket->close();
2580 ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2581 EXPECT_LE(0, socket->getConnectTime().count());
2582 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2583 EXPECT_TRUE(socket->getTFOAttempted());
2585 // Should trigger the connect
2586 WriteCallback write;
2587 socket->writeChain(&write, sendBuf->clone());
2592 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2593 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2594 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2595 ASSERT_EQ(1, rcb.buffers.size());
2596 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2597 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2598 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2602 * Test connecting to a server that isn't listening
2604 TEST(AsyncSocketTest, ConnectRefusedImmediatelyTFO) {
2607 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2609 socket->enableTFO();
2611 // Hopefully nothing is actually listening on this address
2612 folly::SocketAddress addr("::1", 65535);
2614 socket->connect(&cb, addr, 30);
2618 WriteCallback write1;
2619 // Trigger the connect if TFO attempt is supported.
2620 socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2621 WriteCallback write2;
2622 socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2625 if (!socket->getTFOFinished()) {
2626 EXPECT_EQ(STATE_FAILED, write1.state);
2628 EXPECT_EQ(STATE_SUCCEEDED, write1.state);
2629 EXPECT_FALSE(socket->getTFOSucceded());
2632 EXPECT_EQ(STATE_FAILED, write2.state);
2634 EXPECT_EQ(STATE_SUCCEEDED, cb.state);
2635 EXPECT_LE(0, socket->getConnectTime().count());
2636 EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2637 EXPECT_TRUE(socket->getTFOAttempted());
2641 * Test calling closeNow() immediately after connecting.
2643 TEST(AsyncSocketTest, ConnectWriteAndCloseNowTFO) {
2644 TestServer server(true);
2648 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2649 socket->enableTFO();
2652 socket->connect(&ccb, server.getAddress(), 30);
2655 std::array<char, 128> buf;
2656 memset(buf.data(), 'a', buf.size());
2661 // Loop, although there shouldn't be anything to do.
2664 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2666 ASSERT_TRUE(socket->isClosedBySelf());
2667 ASSERT_FALSE(socket->isClosedByPeer());
2671 * Test calling close() immediately after connect()
2673 TEST(AsyncSocketTest, ConnectAndCloseTFO) {
2674 TestServer server(true);
2676 // Connect using a AsyncSocket
2678 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2679 socket->enableTFO();
2682 socket->connect(&ccb, server.getAddress(), 30);
2686 // Loop, although there shouldn't be anything to do.
2689 // Make sure the connection was aborted
2690 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2692 ASSERT_TRUE(socket->isClosedBySelf());
2693 ASSERT_FALSE(socket->isClosedByPeer());
2696 class MockAsyncTFOSocket : public AsyncSocket {
2698 using UniquePtr = std::unique_ptr<MockAsyncTFOSocket, Destructor>;
2700 explicit MockAsyncTFOSocket(EventBase* evb) : AsyncSocket(evb) {}
2702 MOCK_METHOD3(tfoSendMsg, ssize_t(int fd, struct msghdr* msg, int msg_flags));
2705 TEST(AsyncSocketTest, TestTFOUnsupported) {
2706 TestServer server(true);
2708 // Connect using a AsyncSocket
2710 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2711 socket->enableTFO();
2714 socket->connect(&ccb, server.getAddress(), 30);
2715 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2718 socket->setReadCB(&rcb);
2720 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2721 .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2722 WriteCallback write;
2723 auto sendBuf = IOBuf::copyBuffer("hey");
2724 socket->writeChain(&write, sendBuf->clone());
2725 EXPECT_EQ(STATE_WAITING, write.state);
2727 std::array<uint8_t, 128> buf;
2728 memset(buf.data(), 'a', buf.size());
2730 std::array<uint8_t, 3> readBuf;
2733 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2734 acceptedSocket->write(buf.data(), buf.size());
2735 acceptedSocket->flush();
2736 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2737 acceptedSocket->close();
2743 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2744 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2746 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2747 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2748 ASSERT_EQ(1, rcb.buffers.size());
2749 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
2750 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2751 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
2754 TEST(AsyncSocketTest, ConnectRefusedDelayedTFO) {
2757 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2758 socket->enableTFO();
2760 // Hopefully this fails
2761 folly::SocketAddress fakeAddr("127.0.0.1", 65535);
2762 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2763 .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2764 sockaddr_storage addr;
2765 auto len = fakeAddr.getAddress(&addr);
2766 int ret = connect(fd, (const struct sockaddr*)&addr, len);
2767 LOG(INFO) << "connecting the socket " << fd << " : " << ret << " : "
2772 // Hopefully nothing is actually listening on this address
2774 socket->connect(&cb, fakeAddr, 30);
2776 WriteCallback write1;
2777 // Trigger the connect if TFO attempt is supported.
2778 socket->writeChain(&write1, IOBuf::copyBuffer("hey"));
2780 if (socket->getTFOFinished()) {
2781 // This test is useless now.
2784 WriteCallback write2;
2785 // Trigger the connect if TFO attempt is supported.
2786 socket->writeChain(&write2, IOBuf::copyBuffer("hey"));
2789 EXPECT_EQ(STATE_FAILED, write1.state);
2790 EXPECT_EQ(STATE_FAILED, write2.state);
2791 EXPECT_FALSE(socket->getTFOSucceded());
2793 EXPECT_EQ(STATE_SUCCEEDED, cb.state);
2794 EXPECT_LE(0, socket->getConnectTime().count());
2795 EXPECT_EQ(std::chrono::milliseconds(30), socket->getConnectTimeout());
2796 EXPECT_TRUE(socket->getTFOAttempted());
2799 TEST(AsyncSocketTest, TestTFOUnsupportedTimeout) {
2800 // Try connecting to server that won't respond.
2802 // This depends somewhat on the network where this test is run.
2803 // Hopefully this IP will be routable but unresponsive.
2804 // (Alternatively, we could try listening on a local raw socket, but that
2805 // normally requires root privileges.)
2806 auto host = SocketAddressTestHelper::isIPv6Enabled()
2807 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2808 : SocketAddressTestHelper::isIPv4Enabled()
2809 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2811 SocketAddress addr(host, 65535);
2813 // Connect using a AsyncSocket
2815 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2816 socket->enableTFO();
2819 // Set a very small timeout
2820 socket->connect(&ccb, addr, 1);
2821 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2824 socket->setReadCB(&rcb);
2826 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2827 .WillOnce(SetErrnoAndReturn(EOPNOTSUPP, -1));
2828 WriteCallback write;
2829 socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2833 EXPECT_EQ(STATE_FAILED, write.state);
2836 TEST(AsyncSocketTest, TestTFOFallbackToConnect) {
2837 TestServer server(true);
2839 // Connect using a AsyncSocket
2841 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2842 socket->enableTFO();
2845 socket->connect(&ccb, server.getAddress(), 30);
2846 ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
2849 socket->setReadCB(&rcb);
2851 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2852 .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2853 sockaddr_storage addr;
2854 auto len = server.getAddress().getAddress(&addr);
2855 return connect(fd, (const struct sockaddr*)&addr, len);
2857 WriteCallback write;
2858 auto sendBuf = IOBuf::copyBuffer("hey");
2859 socket->writeChain(&write, sendBuf->clone());
2860 EXPECT_EQ(STATE_WAITING, write.state);
2862 std::array<uint8_t, 128> buf;
2863 memset(buf.data(), 'a', buf.size());
2865 std::array<uint8_t, 3> readBuf;
2868 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
2869 acceptedSocket->write(buf.data(), buf.size());
2870 acceptedSocket->flush();
2871 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2872 acceptedSocket->close();
2878 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2880 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2881 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2883 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
2884 ASSERT_EQ(1, rcb.buffers.size());
2885 ASSERT_EQ(buf.size(), rcb.buffers[0].length);
2886 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
2889 TEST(AsyncSocketTest, TestTFOFallbackTimeout) {
2890 // Try connecting to server that won't respond.
2892 // This depends somewhat on the network where this test is run.
2893 // Hopefully this IP will be routable but unresponsive.
2894 // (Alternatively, we could try listening on a local raw socket, but that
2895 // normally requires root privileges.)
2896 auto host = SocketAddressTestHelper::isIPv6Enabled()
2897 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6
2898 : SocketAddressTestHelper::isIPv4Enabled()
2899 ? SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4
2901 SocketAddress addr(host, 65535);
2903 // Connect using a AsyncSocket
2905 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2906 socket->enableTFO();
2909 // Set a very small timeout
2910 socket->connect(&ccb, addr, 1);
2911 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2914 socket->setReadCB(&rcb);
2916 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2917 .WillOnce(Invoke([&](int fd, struct msghdr*, int) {
2918 sockaddr_storage addr2;
2919 auto len = addr.getAddress(&addr2);
2920 return connect(fd, (const struct sockaddr*)&addr2, len);
2922 WriteCallback write;
2923 socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2927 EXPECT_EQ(STATE_FAILED, write.state);
2930 TEST(AsyncSocketTest, TestTFOEagain) {
2931 TestServer server(true);
2933 // Connect using a AsyncSocket
2935 auto socket = MockAsyncTFOSocket::UniquePtr(new MockAsyncTFOSocket(&evb));
2936 socket->enableTFO();
2939 socket->connect(&ccb, server.getAddress(), 30);
2941 EXPECT_CALL(*socket, tfoSendMsg(_, _, _))
2942 .WillOnce(SetErrnoAndReturn(EAGAIN, -1));
2943 WriteCallback write;
2944 socket->writeChain(&write, IOBuf::copyBuffer("hey"));
2948 EXPECT_EQ(STATE_SUCCEEDED, ccb.state);
2949 EXPECT_EQ(STATE_FAILED, write.state);
2952 // Sending a large amount of data in the first write which will
2953 // definitely not fit into MSS.
2954 TEST(AsyncSocketTest, ConnectTFOWithBigData) {
2955 // Start listening on a local port
2956 TestServer server(true);
2958 // Connect using a AsyncSocket
2960 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2961 socket->enableTFO();
2963 socket->connect(&cb, server.getAddress(), 30);
2965 std::array<uint8_t, 128> buf;
2966 memset(buf.data(), 'a', buf.size());
2968 constexpr size_t len = 10 * 1024;
2969 auto sendBuf = IOBuf::create(len);
2970 sendBuf->append(len);
2971 std::array<uint8_t, len> readBuf;
2974 auto acceptedSocket = server.accept();
2975 acceptedSocket->write(buf.data(), buf.size());
2976 acceptedSocket->flush();
2977 acceptedSocket->readAll(readBuf.data(), readBuf.size());
2978 acceptedSocket->close();
2983 ASSERT_EQ(cb.state, STATE_SUCCEEDED);
2984 EXPECT_LE(0, socket->getConnectTime().count());
2985 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
2986 EXPECT_TRUE(socket->getTFOAttempted());
2988 // Should trigger the connect
2989 WriteCallback write;
2991 socket->writeChain(&write, sendBuf->clone());
2992 socket->setReadCB(&rcb);
2997 EXPECT_EQ(STATE_SUCCEEDED, write.state);
2998 EXPECT_EQ(0, memcmp(readBuf.data(), sendBuf->data(), readBuf.size()));
2999 EXPECT_EQ(STATE_SUCCEEDED, rcb.state);
3000 ASSERT_EQ(1, rcb.buffers.size());
3001 ASSERT_EQ(sizeof(buf), rcb.buffers[0].length);
3002 EXPECT_EQ(0, memcmp(rcb.buffers[0].buffer, buf.data(), buf.size()));
3003 EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
3006 class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback {
3008 MOCK_METHOD1(evbAttached, void(AsyncSocket*));
3009 MOCK_METHOD1(evbDetached, void(AsyncSocket*));
3012 TEST(AsyncSocketTest, EvbCallbacks) {
3013 auto cb = folly::make_unique<MockEvbChangeCallback>();
3015 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
3018 EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
3019 EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
3021 socket->setEvbChangedCallback(std::move(cb));
3022 socket->detachEventBase();
3023 socket->attachEventBase(&evb);