2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23 #include <folly/io/async/test/AsyncSocketTest.h>
24 #include <folly/io/async/test/Util.h>
25 #include <folly/test/SocketAddressTestHelper.h>
27 #include <gtest/gtest.h>
28 #include <boost/scoped_array.hpp>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/tcp.h>
38 using namespace boost;
45 using std::unique_ptr;
46 using std::chrono::milliseconds;
47 using boost::scoped_array;
49 using namespace folly;
51 class DelayedWrite: public AsyncTimeout {
53 DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
54 unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
55 bool cork, bool lastWrite = false):
56 AsyncTimeout(socket->getEventBase()),
58 bufs_(std::move(bufs)),
61 lastWrite_(lastWrite) {}
64 void timeoutExpired() noexcept override {
65 WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
66 socket_->writeChain(wcb_, std::move(bufs_), flags);
68 socket_->shutdownWrite();
72 std::shared_ptr<AsyncSocket> socket_;
73 unique_ptr<IOBuf> bufs_;
74 AsyncTransportWrapper::WriteCallback* wcb_;
79 ///////////////////////////////////////////////////////////////////////////
81 ///////////////////////////////////////////////////////////////////////////
84 * Test connecting to a server
86 TEST(AsyncSocketTest, Connect) {
87 // Start listening on a local port
90 // Connect using a AsyncSocket
92 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
94 socket->connect(&cb, server.getAddress(), 30);
98 CHECK_EQ(cb.state, STATE_SUCCEEDED);
99 EXPECT_LE(0, socket->getConnectTime().count());
103 * Test connecting to a server that isn't listening
105 TEST(AsyncSocketTest, ConnectRefused) {
108 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
110 // Hopefully nothing is actually listening on this address
111 folly::SocketAddress addr("127.0.0.1", 65535);
113 socket->connect(&cb, addr, 30);
117 CHECK_EQ(cb.state, STATE_FAILED);
118 CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
119 EXPECT_LE(0, socket->getConnectTime().count());
123 * Test connection timeout
125 TEST(AsyncSocketTest, ConnectTimeout) {
128 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
130 // Try connecting to server that won't respond.
132 // This depends somewhat on the network where this test is run.
133 // Hopefully this IP will be routable but unresponsive.
134 // (Alternatively, we could try listening on a local raw socket, but that
135 // normally requires root privileges.)
137 SocketAddressTestHelper::isIPv6Enabled() ?
138 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
139 SocketAddressTestHelper::isIPv4Enabled() ?
140 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
142 SocketAddress addr(host, 65535);
144 socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
148 CHECK_EQ(cb.state, STATE_FAILED);
149 CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
151 // Verify that we can still get the peer address after a timeout.
152 // Use case is if the client was created from a client pool, and we want
153 // to log which peer failed.
154 folly::SocketAddress peer;
155 socket->getPeerAddress(&peer);
156 CHECK_EQ(peer, addr);
157 EXPECT_LE(0, socket->getConnectTime().count());
161 * Test writing immediately after connecting, without waiting for connect
164 TEST(AsyncSocketTest, ConnectAndWrite) {
169 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
171 socket->connect(&ccb, server.getAddress(), 30);
175 memset(buf, 'a', sizeof(buf));
177 socket->write(&wcb, buf, sizeof(buf));
179 // Loop. We don't bother accepting on the server socket yet.
180 // The kernel should be able to buffer the write request so it can succeed.
183 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
184 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
186 // Make sure the server got a connection and received the data
188 server.verifyConnection(buf, sizeof(buf));
190 ASSERT_TRUE(socket->isClosedBySelf());
191 ASSERT_FALSE(socket->isClosedByPeer());
195 * Test connecting using a nullptr connect callback.
197 TEST(AsyncSocketTest, ConnectNullCallback) {
202 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
203 socket->connect(nullptr, server.getAddress(), 30);
205 // write some data, just so we have some way of verifing
206 // that the socket works correctly after connecting
208 memset(buf, 'a', sizeof(buf));
210 socket->write(&wcb, buf, sizeof(buf));
214 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
216 // Make sure the server got a connection and received the data
218 server.verifyConnection(buf, sizeof(buf));
220 ASSERT_TRUE(socket->isClosedBySelf());
221 ASSERT_FALSE(socket->isClosedByPeer());
225 * Test calling both write() and close() immediately after connecting, without
226 * waiting for connect to finish.
228 * This exercises the STATE_CONNECTING_CLOSING code.
230 TEST(AsyncSocketTest, ConnectWriteAndClose) {
235 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
237 socket->connect(&ccb, server.getAddress(), 30);
241 memset(buf, 'a', sizeof(buf));
243 socket->write(&wcb, buf, sizeof(buf));
248 // Loop. We don't bother accepting on the server socket yet.
249 // The kernel should be able to buffer the write request so it can succeed.
252 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
253 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
255 // Make sure the server got a connection and received the data
256 server.verifyConnection(buf, sizeof(buf));
258 ASSERT_TRUE(socket->isClosedBySelf());
259 ASSERT_FALSE(socket->isClosedByPeer());
263 * Test calling close() immediately after connect()
265 TEST(AsyncSocketTest, ConnectAndClose) {
268 // Connect using a AsyncSocket
270 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
272 socket->connect(&ccb, server.getAddress(), 30);
274 // Hopefully the connect didn't succeed immediately.
275 // If it did, we can't exercise the close-while-connecting code path.
276 if (ccb.state == STATE_SUCCEEDED) {
277 LOG(INFO) << "connect() succeeded immediately; aborting test "
278 "of close-during-connect behavior";
284 // Loop, although there shouldn't be anything to do.
287 // Make sure the connection was aborted
288 CHECK_EQ(ccb.state, STATE_FAILED);
290 ASSERT_TRUE(socket->isClosedBySelf());
291 ASSERT_FALSE(socket->isClosedByPeer());
295 * Test calling closeNow() immediately after connect()
297 * This should be identical to the normal close behavior.
299 TEST(AsyncSocketTest, ConnectAndCloseNow) {
302 // Connect using a AsyncSocket
304 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
306 socket->connect(&ccb, server.getAddress(), 30);
308 // Hopefully the connect didn't succeed immediately.
309 // If it did, we can't exercise the close-while-connecting code path.
310 if (ccb.state == STATE_SUCCEEDED) {
311 LOG(INFO) << "connect() succeeded immediately; aborting test "
312 "of closeNow()-during-connect behavior";
318 // Loop, although there shouldn't be anything to do.
321 // Make sure the connection was aborted
322 CHECK_EQ(ccb.state, STATE_FAILED);
324 ASSERT_TRUE(socket->isClosedBySelf());
325 ASSERT_FALSE(socket->isClosedByPeer());
329 * Test calling both write() and closeNow() immediately after connecting,
330 * without waiting for connect to finish.
332 * This should abort the pending write.
334 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
339 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
341 socket->connect(&ccb, server.getAddress(), 30);
343 // Hopefully the connect didn't succeed immediately.
344 // If it did, we can't exercise the close-while-connecting code path.
345 if (ccb.state == STATE_SUCCEEDED) {
346 LOG(INFO) << "connect() succeeded immediately; aborting test "
347 "of write-during-connect behavior";
353 memset(buf, 'a', sizeof(buf));
355 socket->write(&wcb, buf, sizeof(buf));
360 // Loop, although there shouldn't be anything to do.
363 CHECK_EQ(ccb.state, STATE_FAILED);
364 CHECK_EQ(wcb.state, STATE_FAILED);
366 ASSERT_TRUE(socket->isClosedBySelf());
367 ASSERT_FALSE(socket->isClosedByPeer());
371 * Test installing a read callback immediately, before connect() finishes.
373 TEST(AsyncSocketTest, ConnectAndRead) {
378 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
380 socket->connect(&ccb, server.getAddress(), 30);
383 socket->setReadCB(&rcb);
385 // Even though we haven't looped yet, we should be able to accept
386 // the connection and send data to it.
387 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
389 memset(buf, 'a', sizeof(buf));
390 acceptedSocket->write(buf, sizeof(buf));
391 acceptedSocket->flush();
392 acceptedSocket->close();
394 // Loop, although there shouldn't be anything to do.
397 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
398 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
399 CHECK_EQ(rcb.buffers.size(), 1);
400 CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
401 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
403 ASSERT_FALSE(socket->isClosedBySelf());
404 ASSERT_FALSE(socket->isClosedByPeer());
408 * Test installing a read callback and then closing immediately before the
409 * connect attempt finishes.
411 TEST(AsyncSocketTest, ConnectReadAndClose) {
416 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
418 socket->connect(&ccb, server.getAddress(), 30);
420 // Hopefully the connect didn't succeed immediately.
421 // If it did, we can't exercise the close-while-connecting code path.
422 if (ccb.state == STATE_SUCCEEDED) {
423 LOG(INFO) << "connect() succeeded immediately; aborting test "
424 "of read-during-connect behavior";
429 socket->setReadCB(&rcb);
434 // Loop, although there shouldn't be anything to do.
437 CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
438 CHECK_EQ(rcb.buffers.size(), 0);
439 CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
441 ASSERT_TRUE(socket->isClosedBySelf());
442 ASSERT_FALSE(socket->isClosedByPeer());
446 * Test both writing and installing a read callback immediately,
447 * before connect() finishes.
449 TEST(AsyncSocketTest, ConnectWriteAndRead) {
454 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
456 socket->connect(&ccb, server.getAddress(), 30);
460 memset(buf1, 'a', sizeof(buf1));
462 socket->write(&wcb, buf1, sizeof(buf1));
464 // set a read callback
466 socket->setReadCB(&rcb);
468 // Even though we haven't looped yet, we should be able to accept
469 // the connection and send data to it.
470 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
472 memset(buf2, 'b', sizeof(buf2));
473 acceptedSocket->write(buf2, sizeof(buf2));
474 acceptedSocket->flush();
476 // shut down the write half of acceptedSocket, so that the AsyncSocket
477 // will stop reading and we can break out of the event loop.
478 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
483 // Make sure the connect succeeded
484 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
486 // Make sure the AsyncSocket read the data written by the accepted socket
487 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
488 CHECK_EQ(rcb.buffers.size(), 1);
489 CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
490 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
492 // Close the AsyncSocket so we'll see EOF on acceptedSocket
495 // Make sure the accepted socket saw the data written by the AsyncSocket
496 uint8_t readbuf[sizeof(buf1)];
497 acceptedSocket->readAll(readbuf, sizeof(readbuf));
498 CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
499 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
500 CHECK_EQ(bytesRead, 0);
502 ASSERT_FALSE(socket->isClosedBySelf());
503 ASSERT_TRUE(socket->isClosedByPeer());
507 * Test writing to the socket then shutting down writes before the connect
510 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
515 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
517 socket->connect(&ccb, server.getAddress(), 30);
519 // Hopefully the connect didn't succeed immediately.
520 // If it did, we can't exercise the write-while-connecting code path.
521 if (ccb.state == STATE_SUCCEEDED) {
522 LOG(INFO) << "connect() succeeded immediately; skipping test";
526 // Ask to write some data
528 memset(wbuf, 'a', sizeof(wbuf));
530 socket->write(&wcb, wbuf, sizeof(wbuf));
531 socket->shutdownWrite();
534 socket->shutdownWrite();
536 // Even though we haven't looped yet, we should be able to accept
538 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
540 // Since the connection is still in progress, there should be no data to
541 // read yet. Verify that the accepted socket is not readable.
542 struct pollfd fds[1];
543 fds[0].fd = acceptedSocket->getSocketFD();
544 fds[0].events = POLLIN;
546 int rc = poll(fds, 1, 0);
549 // Write data to the accepted socket
550 uint8_t acceptedWbuf[192];
551 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
552 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
553 acceptedSocket->flush();
558 // The loop should have completed the connection, written the queued data,
559 // and shutdown writes on the socket.
561 // Check that the connection was completed successfully and that the write
562 // callback succeeded.
563 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
564 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
566 // Check that we can read the data that was written to the socket, and that
567 // we see an EOF, since its socket was half-shutdown.
568 uint8_t readbuf[sizeof(wbuf)];
569 acceptedSocket->readAll(readbuf, sizeof(readbuf));
570 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
571 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
572 CHECK_EQ(bytesRead, 0);
574 // Close the accepted socket. This will cause it to see EOF
575 // and uninstall the read callback when we loop next.
576 acceptedSocket->close();
578 // Install a read callback, then loop again.
580 socket->setReadCB(&rcb);
583 // This loop should have read the data and seen the EOF
584 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
585 CHECK_EQ(rcb.buffers.size(), 1);
586 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
587 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
588 acceptedWbuf, sizeof(acceptedWbuf)), 0);
590 ASSERT_FALSE(socket->isClosedBySelf());
591 ASSERT_FALSE(socket->isClosedByPeer());
595 * Test reading, writing, and shutting down writes before the connect attempt
598 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
603 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
605 socket->connect(&ccb, server.getAddress(), 30);
607 // Hopefully the connect didn't succeed immediately.
608 // If it did, we can't exercise the write-while-connecting code path.
609 if (ccb.state == STATE_SUCCEEDED) {
610 LOG(INFO) << "connect() succeeded immediately; skipping test";
614 // Install a read callback
616 socket->setReadCB(&rcb);
618 // Ask to write some data
620 memset(wbuf, 'a', sizeof(wbuf));
622 socket->write(&wcb, wbuf, sizeof(wbuf));
625 socket->shutdownWrite();
627 // Even though we haven't looped yet, we should be able to accept
629 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
631 // Since the connection is still in progress, there should be no data to
632 // read yet. Verify that the accepted socket is not readable.
633 struct pollfd fds[1];
634 fds[0].fd = acceptedSocket->getSocketFD();
635 fds[0].events = POLLIN;
637 int rc = poll(fds, 1, 0);
640 // Write data to the accepted socket
641 uint8_t acceptedWbuf[192];
642 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
643 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
644 acceptedSocket->flush();
645 // Shutdown writes to the accepted socket. This will cause it to see EOF
646 // and uninstall the read callback.
647 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
652 // The loop should have completed the connection, written the queued data,
653 // shutdown writes on the socket, read the data we wrote to it, and see the
656 // Check that the connection was completed successfully and that the read
657 // and write callbacks were invoked as expected.
658 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
659 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
660 CHECK_EQ(rcb.buffers.size(), 1);
661 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
662 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
663 acceptedWbuf, sizeof(acceptedWbuf)), 0);
664 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
666 // Check that we can read the data that was written to the socket, and that
667 // we see an EOF, since its socket was half-shutdown.
668 uint8_t readbuf[sizeof(wbuf)];
669 acceptedSocket->readAll(readbuf, sizeof(readbuf));
670 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
671 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
672 CHECK_EQ(bytesRead, 0);
674 // Fully close both sockets
675 acceptedSocket->close();
678 ASSERT_FALSE(socket->isClosedBySelf());
679 ASSERT_TRUE(socket->isClosedByPeer());
683 * Test reading, writing, and calling shutdownWriteNow() before the
684 * connect attempt finishes.
686 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
691 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
693 socket->connect(&ccb, server.getAddress(), 30);
695 // Hopefully the connect didn't succeed immediately.
696 // If it did, we can't exercise the write-while-connecting code path.
697 if (ccb.state == STATE_SUCCEEDED) {
698 LOG(INFO) << "connect() succeeded immediately; skipping test";
702 // Install a read callback
704 socket->setReadCB(&rcb);
706 // Ask to write some data
708 memset(wbuf, 'a', sizeof(wbuf));
710 socket->write(&wcb, wbuf, sizeof(wbuf));
712 // Shutdown writes immediately.
713 // This should immediately discard the data that we just tried to write.
714 socket->shutdownWriteNow();
716 // Verify that writeError() was invoked on the write callback.
717 CHECK_EQ(wcb.state, STATE_FAILED);
718 CHECK_EQ(wcb.bytesWritten, 0);
720 // Even though we haven't looped yet, we should be able to accept
722 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
724 // Since the connection is still in progress, there should be no data to
725 // read yet. Verify that the accepted socket is not readable.
726 struct pollfd fds[1];
727 fds[0].fd = acceptedSocket->getSocketFD();
728 fds[0].events = POLLIN;
730 int rc = poll(fds, 1, 0);
733 // Write data to the accepted socket
734 uint8_t acceptedWbuf[192];
735 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
736 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
737 acceptedSocket->flush();
738 // Shutdown writes to the accepted socket. This will cause it to see EOF
739 // and uninstall the read callback.
740 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
745 // The loop should have completed the connection, written the queued data,
746 // shutdown writes on the socket, read the data we wrote to it, and see the
749 // Check that the connection was completed successfully and that the read
750 // callback was invoked as expected.
751 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
752 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
753 CHECK_EQ(rcb.buffers.size(), 1);
754 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
755 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
756 acceptedWbuf, sizeof(acceptedWbuf)), 0);
758 // Since we used shutdownWriteNow(), it should have discarded all pending
759 // write data. Verify we see an immediate EOF when reading from the accepted
761 uint8_t readbuf[sizeof(wbuf)];
762 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
763 CHECK_EQ(bytesRead, 0);
765 // Fully close both sockets
766 acceptedSocket->close();
769 ASSERT_FALSE(socket->isClosedBySelf());
770 ASSERT_TRUE(socket->isClosedByPeer());
773 // Helper function for use in testConnectOptWrite()
774 // Temporarily disable the read callback
775 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
776 // Uninstall the read callback
777 socket->setReadCB(nullptr);
778 // Schedule the read callback to be reinstalled after 1ms
779 socket->getEventBase()->runInLoop(
780 std::bind(&AsyncSocket::setReadCB, socket, rcb));
784 * Test connect+write, then have the connect callback perform another write.
786 * This tests interaction of the optimistic writing after connect with
787 * additional write attempts that occur in the connect callback.
789 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
792 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
796 socket->connect(&ccb, server.getAddress(), 30);
798 // Hopefully the connect didn't succeed immediately.
799 // If it did, we can't exercise the optimistic write code path.
800 if (ccb.state == STATE_SUCCEEDED) {
801 LOG(INFO) << "connect() succeeded immediately; aborting test "
802 "of optimistic write behavior";
806 // Tell the connect callback to perform a write when the connect succeeds
808 scoped_array<char> buf2(new char[size2]);
809 memset(buf2.get(), 'b', size2);
811 ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
812 // Tell the second write callback to close the connection when it is done
813 wcb2.successCallback = [&] { socket->closeNow(); };
816 // Schedule one write() immediately, before the connect finishes
817 scoped_array<char> buf1(new char[size1]);
818 memset(buf1.get(), 'a', size1);
821 socket->write(&wcb1, buf1.get(), size1);
825 // immediately perform a close, before connect() completes
829 // Start reading from the other endpoint after 10ms.
830 // If we're using large buffers, we have to read so that the writes don't
832 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
834 rcb.dataAvailableCallback = std::bind(tmpDisableReads,
835 acceptedSocket.get(), &rcb);
836 socket->getEventBase()->tryRunAfterDelay(
837 std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
840 // Loop. We don't bother accepting on the server socket yet.
841 // The kernel should be able to buffer the write request so it can succeed.
844 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
846 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
849 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
854 // Make sure the read callback received all of the data
855 size_t bytesRead = 0;
856 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
857 it != rcb.buffers.end();
859 size_t start = bytesRead;
860 bytesRead += it->length;
861 size_t end = bytesRead;
863 size_t cmpLen = min(size1, end) - start;
864 CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
866 if (end > size1 && end <= size1 + size2) {
870 if (start >= size1) {
872 buf2Offset = start - size1;
873 cmpLen = end - start;
875 itOffset = size1 - start;
877 cmpLen = end - size1;
879 CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
884 CHECK_EQ(bytesRead, size1 + size2);
887 TEST(AsyncSocketTest, ConnectCallbackWrite) {
888 // Test using small writes that should both succeed immediately
889 testConnectOptWrite(100, 200);
891 // Test using a large buffer in the connect callback, that should block
892 const size_t largeSize = 8*1024*1024;
893 testConnectOptWrite(100, largeSize);
895 // Test using a large initial write
896 testConnectOptWrite(largeSize, 100);
898 // Test using two large buffers
899 testConnectOptWrite(largeSize, largeSize);
901 // Test a small write in the connect callback,
902 // but no immediate write before connect completes
903 testConnectOptWrite(0, 64);
905 // Test a large write in the connect callback,
906 // but no immediate write before connect completes
907 testConnectOptWrite(0, largeSize);
909 // Test connect, a small write, then immediately call close() before connect
911 testConnectOptWrite(211, 0, true);
913 // Test connect, a large immediate write (that will block), then immediately
914 // call close() before connect completes
915 testConnectOptWrite(largeSize, 0, true);
918 ///////////////////////////////////////////////////////////////////////////
919 // write() related tests
920 ///////////////////////////////////////////////////////////////////////////
923 * Test writing using a nullptr callback
925 TEST(AsyncSocketTest, WriteNullCallback) {
930 std::shared_ptr<AsyncSocket> socket =
931 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
932 evb.loop(); // loop until the socket is connected
934 // write() with a nullptr callback
936 memset(buf, 'a', sizeof(buf));
937 socket->write(nullptr, buf, sizeof(buf));
939 evb.loop(); // loop until the data is sent
941 // Make sure the server got a connection and received the data
943 server.verifyConnection(buf, sizeof(buf));
945 ASSERT_TRUE(socket->isClosedBySelf());
946 ASSERT_FALSE(socket->isClosedByPeer());
950 * Test writing with a send timeout
952 TEST(AsyncSocketTest, WriteTimeout) {
957 std::shared_ptr<AsyncSocket> socket =
958 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
959 evb.loop(); // loop until the socket is connected
961 // write() a large chunk of data, with no-one on the other end reading
962 size_t writeLength = 8*1024*1024;
963 uint32_t timeout = 200;
964 socket->setSendTimeout(timeout);
965 scoped_array<char> buf(new char[writeLength]);
966 memset(buf.get(), 'a', writeLength);
968 socket->write(&wcb, buf.get(), writeLength);
974 // Make sure the write attempt timed out as requested
975 CHECK_EQ(wcb.state, STATE_FAILED);
976 CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
978 // Check that the write timed out within a reasonable period of time.
979 // We don't check for exactly the specified timeout, since AsyncSocket only
980 // times out when it hasn't made progress for that period of time.
982 // On linux, the first write sends a few hundred kb of data, then blocks for
983 // writability, and then unblocks again after 40ms and is able to write
984 // another smaller of data before blocking permanently. Therefore it doesn't
985 // time out until 40ms + timeout.
987 // I haven't fully verified the cause of this, but I believe it probably
988 // occurs because the receiving end delays sending an ack for up to 40ms.
989 // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
990 // the ack, it can send some more data. However, after that point the
991 // receiver's kernel buffer is full. This 40ms delay happens even with
992 // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
993 // kernel may be automatically disabling TCP_QUICKACK after receiving some
996 // For now, we simply check that the timeout occurred within 160ms of
997 // the requested value.
998 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1002 * Test writing to a socket that the remote endpoint has closed
1004 TEST(AsyncSocketTest, WritePipeError) {
1009 std::shared_ptr<AsyncSocket> socket =
1010 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1011 socket->setSendTimeout(1000);
1012 evb.loop(); // loop until the socket is connected
1014 // accept and immediately close the socket
1015 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1016 acceptedSocket.reset();
1018 // write() a large chunk of data
1019 size_t writeLength = 8*1024*1024;
1020 scoped_array<char> buf(new char[writeLength]);
1021 memset(buf.get(), 'a', writeLength);
1023 socket->write(&wcb, buf.get(), writeLength);
1027 // Make sure the write failed.
1028 // It would be nice if AsyncSocketException could convey the errno value,
1029 // so that we could check for EPIPE
1030 CHECK_EQ(wcb.state, STATE_FAILED);
1031 CHECK_EQ(wcb.exception.getType(),
1032 AsyncSocketException::INTERNAL_ERROR);
1034 ASSERT_FALSE(socket->isClosedBySelf());
1035 ASSERT_FALSE(socket->isClosedByPeer());
1039 * Test writing a mix of simple buffers and IOBufs
1041 TEST(AsyncSocketTest, WriteIOBuf) {
1046 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1048 socket->connect(&ccb, server.getAddress(), 30);
1050 // Accept the connection
1051 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1053 acceptedSocket->setReadCB(&rcb);
1055 // Write a simple buffer to the socket
1056 size_t simpleBufLength = 5;
1057 char simpleBuf[simpleBufLength];
1058 memset(simpleBuf, 'a', simpleBufLength);
1060 socket->write(&wcb, simpleBuf, simpleBufLength);
1062 // Write a single-element IOBuf chain
1063 size_t buf1Length = 7;
1064 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1065 memset(buf1->writableData(), 'b', buf1Length);
1066 buf1->append(buf1Length);
1067 unique_ptr<IOBuf> buf1Copy(buf1->clone());
1069 socket->writeChain(&wcb2, std::move(buf1));
1071 // Write a multiple-element IOBuf chain
1072 size_t buf2Length = 11;
1073 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1074 memset(buf2->writableData(), 'c', buf2Length);
1075 buf2->append(buf2Length);
1076 size_t buf3Length = 13;
1077 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1078 memset(buf3->writableData(), 'd', buf3Length);
1079 buf3->append(buf3Length);
1080 buf2->appendChain(std::move(buf3));
1081 unique_ptr<IOBuf> buf2Copy(buf2->clone());
1082 buf2Copy->coalesce();
1084 socket->writeChain(&wcb3, std::move(buf2));
1085 socket->shutdownWrite();
1087 // Let the reads and writes run to completion
1090 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1091 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1092 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1094 // Make sure the reader got the right data in the right order
1095 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1096 CHECK_EQ(rcb.buffers.size(), 1);
1097 CHECK_EQ(rcb.buffers[0].length,
1098 simpleBufLength + buf1Length + buf2Length + buf3Length);
1100 memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1102 memcmp(rcb.buffers[0].buffer + simpleBufLength,
1103 buf1Copy->data(), buf1Copy->length()), 0);
1105 memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1106 buf2Copy->data(), buf2Copy->length()), 0);
1108 acceptedSocket->close();
1111 ASSERT_TRUE(socket->isClosedBySelf());
1112 ASSERT_FALSE(socket->isClosedByPeer());
1115 TEST(AsyncSocketTest, WriteIOBufCorked) {
1120 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1122 socket->connect(&ccb, server.getAddress(), 30);
1124 // Accept the connection
1125 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1127 acceptedSocket->setReadCB(&rcb);
1129 // Do three writes, 100ms apart, with the "cork" flag set
1130 // on the second write. The reader should see the first write
1131 // arrive by itself, followed by the second and third writes
1132 // arriving together.
1133 size_t buf1Length = 5;
1134 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1135 memset(buf1->writableData(), 'a', buf1Length);
1136 buf1->append(buf1Length);
1137 size_t buf2Length = 7;
1138 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1139 memset(buf2->writableData(), 'b', buf2Length);
1140 buf2->append(buf2Length);
1141 size_t buf3Length = 11;
1142 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1143 memset(buf3->writableData(), 'c', buf3Length);
1144 buf3->append(buf3Length);
1146 socket->writeChain(&wcb1, std::move(buf1));
1148 DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1149 write2.scheduleTimeout(100);
1151 DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1152 write3.scheduleTimeout(200);
1155 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1156 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1157 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1158 if (wcb3.state != STATE_SUCCEEDED) {
1159 throw(wcb3.exception);
1161 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1163 // Make sure the reader got the data with the right grouping
1164 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1165 CHECK_EQ(rcb.buffers.size(), 2);
1166 CHECK_EQ(rcb.buffers[0].length, buf1Length);
1167 CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1169 acceptedSocket->close();
1172 ASSERT_TRUE(socket->isClosedBySelf());
1173 ASSERT_FALSE(socket->isClosedByPeer());
1177 * Test performing a zero-length write
1179 TEST(AsyncSocketTest, ZeroLengthWrite) {
1184 std::shared_ptr<AsyncSocket> socket =
1185 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1186 evb.loop(); // loop until the socket is connected
1188 auto acceptedSocket = server.acceptAsync(&evb);
1190 acceptedSocket->setReadCB(&rcb);
1192 size_t len1 = 1024*1024;
1193 size_t len2 = 1024*1024;
1194 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1195 memset(buf.get(), 'a', len1);
1196 memset(buf.get(), 'b', len2);
1202 socket->write(&wcb1, buf.get(), 0);
1203 socket->write(&wcb2, buf.get(), len1);
1204 socket->write(&wcb3, buf.get() + len1, 0);
1205 socket->write(&wcb4, buf.get() + len1, len2);
1208 evb.loop(); // loop until the data is sent
1210 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1211 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1212 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1213 CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1214 rcb.verifyData(buf.get(), len1 + len2);
1216 ASSERT_TRUE(socket->isClosedBySelf());
1217 ASSERT_FALSE(socket->isClosedByPeer());
1220 TEST(AsyncSocketTest, ZeroLengthWritev) {
1225 std::shared_ptr<AsyncSocket> socket =
1226 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1227 evb.loop(); // loop until the socket is connected
1229 auto acceptedSocket = server.acceptAsync(&evb);
1231 acceptedSocket->setReadCB(&rcb);
1233 size_t len1 = 1024*1024;
1234 size_t len2 = 1024*1024;
1235 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1236 memset(buf.get(), 'a', len1);
1237 memset(buf.get(), 'b', len2);
1240 size_t iovCount = 4;
1241 struct iovec iov[iovCount];
1242 iov[0].iov_base = buf.get();
1243 iov[0].iov_len = len1;
1244 iov[1].iov_base = buf.get() + len1;
1246 iov[2].iov_base = buf.get() + len1;
1247 iov[2].iov_len = len2;
1248 iov[3].iov_base = buf.get() + len1 + len2;
1251 socket->writev(&wcb, iov, iovCount);
1253 evb.loop(); // loop until the data is sent
1255 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1256 rcb.verifyData(buf.get(), len1 + len2);
1258 ASSERT_TRUE(socket->isClosedBySelf());
1259 ASSERT_FALSE(socket->isClosedByPeer());
1262 ///////////////////////////////////////////////////////////////////////////
1263 // close() related tests
1264 ///////////////////////////////////////////////////////////////////////////
1267 * Test calling close() with pending writes when the socket is already closing.
1269 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1274 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1276 socket->connect(&ccb, server.getAddress(), 30);
1278 // accept the socket on the server side
1279 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1281 // Loop to ensure the connect has completed
1284 // Make sure we are connected
1285 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1287 // Schedule pending writes, until several write attempts have blocked
1289 memset(buf, 'a', sizeof(buf));
1290 typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1291 WriteCallbackVector writeCallbacks;
1293 writeCallbacks.reserve(5);
1294 while (writeCallbacks.size() < 5) {
1295 std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1297 socket->write(wcb.get(), buf, sizeof(buf));
1298 if (wcb->state == STATE_SUCCEEDED) {
1299 // Succeeded immediately. Keep performing more writes
1303 // This write is blocked.
1304 // Have the write callback call close() when writeError() is invoked
1305 wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1306 writeCallbacks.push_back(wcb);
1309 // Call closeNow() to immediately fail the pending writes
1312 // Make sure writeError() was invoked on all of the pending write callbacks
1313 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1314 it != writeCallbacks.end();
1316 CHECK_EQ((*it)->state, STATE_FAILED);
1319 ASSERT_TRUE(socket->isClosedBySelf());
1320 ASSERT_FALSE(socket->isClosedByPeer());
1323 ///////////////////////////////////////////////////////////////////////////
1324 // ImmediateRead related tests
1325 ///////////////////////////////////////////////////////////////////////////
1327 /* AsyncSocket use to verify immediate read works */
1328 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1330 bool immediateReadCalled = false;
1331 explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1333 void checkForImmediateRead() noexcept override {
1334 immediateReadCalled = true;
1335 AsyncSocket::handleRead();
1339 TEST(AsyncSocket, ConnectReadImmediateRead) {
1342 const size_t maxBufferSz = 100;
1343 const size_t maxReadsPerEvent = 1;
1344 const size_t expectedDataSz = maxBufferSz * 3;
1345 char expectedData[expectedDataSz];
1346 memset(expectedData, 'j', expectedDataSz);
1349 ReadCallback rcb(maxBufferSz);
1350 AsyncSocketImmediateRead socket(&evb);
1351 socket.connect(nullptr, server.getAddress(), 30);
1353 evb.loop(); // loop until the socket is connected
1355 socket.setReadCB(&rcb);
1356 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1357 socket.immediateReadCalled = false;
1359 auto acceptedSocket = server.acceptAsync(&evb);
1361 ReadCallback rcbServer;
1362 WriteCallback wcbServer;
1363 rcbServer.dataAvailableCallback = [&]() {
1364 if (rcbServer.dataRead() == expectedDataSz) {
1365 // write back all data read
1366 rcbServer.verifyData(expectedData, expectedDataSz);
1367 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1368 acceptedSocket->close();
1371 acceptedSocket->setReadCB(&rcbServer);
1375 socket.write(&wcb1, expectedData, expectedDataSz);
1377 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1378 rcb.verifyData(expectedData, expectedDataSz);
1379 CHECK_EQ(socket.immediateReadCalled, true);
1381 ASSERT_FALSE(socket.isClosedBySelf());
1382 ASSERT_FALSE(socket.isClosedByPeer());
1385 TEST(AsyncSocket, ConnectReadUninstallRead) {
1388 const size_t maxBufferSz = 100;
1389 const size_t maxReadsPerEvent = 1;
1390 const size_t expectedDataSz = maxBufferSz * 3;
1391 char expectedData[expectedDataSz];
1392 memset(expectedData, 'k', expectedDataSz);
1395 ReadCallback rcb(maxBufferSz);
1396 AsyncSocketImmediateRead socket(&evb);
1397 socket.connect(nullptr, server.getAddress(), 30);
1399 evb.loop(); // loop until the socket is connected
1401 socket.setReadCB(&rcb);
1402 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1403 socket.immediateReadCalled = false;
1405 auto acceptedSocket = server.acceptAsync(&evb);
1407 ReadCallback rcbServer;
1408 WriteCallback wcbServer;
1409 rcbServer.dataAvailableCallback = [&]() {
1410 if (rcbServer.dataRead() == expectedDataSz) {
1411 // write back all data read
1412 rcbServer.verifyData(expectedData, expectedDataSz);
1413 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1414 acceptedSocket->close();
1417 acceptedSocket->setReadCB(&rcbServer);
1419 rcb.dataAvailableCallback = [&]() {
1420 // we read data and reset readCB
1421 socket.setReadCB(nullptr);
1426 socket.write(&wcb, expectedData, expectedDataSz);
1428 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1430 /* we shoud've only read maxBufferSz data since readCallback_
1431 * was reset in dataAvailableCallback */
1432 CHECK_EQ(rcb.dataRead(), maxBufferSz);
1433 CHECK_EQ(socket.immediateReadCalled, false);
1435 ASSERT_FALSE(socket.isClosedBySelf());
1436 ASSERT_FALSE(socket.isClosedByPeer());
1440 // - Test connect() and have the connect callback set the read callback
1441 // - Test connect() and have the connect callback unset the read callback
1442 // - Test reading/writing/closing/destroying the socket in the connect callback
1443 // - Test reading/writing/closing/destroying the socket in the read callback
1444 // - Test reading/writing/closing/destroying the socket in the write callback
1445 // - Test one-way shutdown behavior
1446 // - Test changing the EventBase
1448 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1449 // in connectSuccess(), readDataAvailable(), writeSuccess()
1452 ///////////////////////////////////////////////////////////////////////////
1453 // AsyncServerSocket tests
1454 ///////////////////////////////////////////////////////////////////////////
1457 * Helper AcceptCallback class for the test code
1458 * It records the callbacks that were invoked, and also supports calling
1459 * generic std::function objects in each callback.
1461 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1470 EventInfo(int fd, const folly::SocketAddress& addr)
1471 : type(TYPE_ACCEPT),
1475 explicit EventInfo(const std::string& msg)
1480 explicit EventInfo(EventType et)
1487 int fd; // valid for TYPE_ACCEPT
1488 folly::SocketAddress address; // valid for TYPE_ACCEPT
1489 string errorMsg; // valid for TYPE_ERROR
1491 typedef std::deque<EventInfo> EventList;
1493 TestAcceptCallback()
1494 : connectionAcceptedFn_(),
1499 std::deque<EventInfo>* getEvents() {
1503 void setConnectionAcceptedFn(
1504 const std::function<void(int, const folly::SocketAddress&)>& fn) {
1505 connectionAcceptedFn_ = fn;
1507 void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1508 acceptErrorFn_ = fn;
1510 void setAcceptStartedFn(const std::function<void()>& fn) {
1511 acceptStartedFn_ = fn;
1513 void setAcceptStoppedFn(const std::function<void()>& fn) {
1514 acceptStoppedFn_ = fn;
1517 void connectionAccepted(
1518 int fd, const folly::SocketAddress& clientAddr) noexcept override {
1519 events_.emplace_back(fd, clientAddr);
1521 if (connectionAcceptedFn_) {
1522 connectionAcceptedFn_(fd, clientAddr);
1525 void acceptError(const std::exception& ex) noexcept override {
1526 events_.emplace_back(ex.what());
1528 if (acceptErrorFn_) {
1532 void acceptStarted() noexcept override {
1533 events_.emplace_back(TYPE_START);
1535 if (acceptStartedFn_) {
1539 void acceptStopped() noexcept override {
1540 events_.emplace_back(TYPE_STOP);
1542 if (acceptStoppedFn_) {
1548 std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1549 std::function<void(const std::exception&)> acceptErrorFn_;
1550 std::function<void()> acceptStartedFn_;
1551 std::function<void()> acceptStoppedFn_;
1553 std::deque<EventInfo> events_;
1557 * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1559 TEST(AsyncSocketTest, ServerAcceptOptions) {
1560 EventBase eventBase;
1562 // Create a server socket
1563 std::shared_ptr<AsyncServerSocket> serverSocket(
1564 AsyncServerSocket::newSocket(&eventBase));
1565 serverSocket->bind(0);
1566 serverSocket->listen(16);
1567 folly::SocketAddress serverAddress;
1568 serverSocket->getAddress(&serverAddress);
1570 // Add a callback to accept one connection then stop the loop
1571 TestAcceptCallback acceptCallback;
1572 acceptCallback.setConnectionAcceptedFn(
1573 [&](int fd, const folly::SocketAddress& addr) {
1574 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1576 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1577 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1579 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1580 serverSocket->startAccepting();
1582 // Connect to the server socket
1583 std::shared_ptr<AsyncSocket> socket(
1584 AsyncSocket::newSocket(&eventBase, serverAddress));
1588 // Verify that the server accepted a connection
1589 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1590 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1591 TestAcceptCallback::TYPE_START);
1592 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1593 TestAcceptCallback::TYPE_ACCEPT);
1594 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1595 TestAcceptCallback::TYPE_STOP);
1596 int fd = acceptCallback.getEvents()->at(1).fd;
1598 // The accepted connection should already be in non-blocking mode
1599 int flags = fcntl(fd, F_GETFL, 0);
1600 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1603 // The accepted connection should already have TCP_NODELAY set
1605 socklen_t valueLength = sizeof(value);
1606 int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1613 * Test AsyncServerSocket::removeAcceptCallback()
1615 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1616 // Create a new AsyncServerSocket
1617 EventBase eventBase;
1618 std::shared_ptr<AsyncServerSocket> serverSocket(
1619 AsyncServerSocket::newSocket(&eventBase));
1620 serverSocket->bind(0);
1621 serverSocket->listen(16);
1622 folly::SocketAddress serverAddress;
1623 serverSocket->getAddress(&serverAddress);
1625 // Add several accept callbacks
1626 TestAcceptCallback cb1;
1627 TestAcceptCallback cb2;
1628 TestAcceptCallback cb3;
1629 TestAcceptCallback cb4;
1630 TestAcceptCallback cb5;
1631 TestAcceptCallback cb6;
1632 TestAcceptCallback cb7;
1634 // Test having callbacks remove other callbacks before them on the list,
1635 // after them on the list, or removing themselves.
1637 // Have callback 2 remove callback 3 and callback 5 the first time it is
1640 cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1641 std::shared_ptr<AsyncSocket> sock2(
1642 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1644 cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1646 cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1647 std::shared_ptr<AsyncSocket> sock3(
1648 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1650 cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1651 std::shared_ptr<AsyncSocket> sock5(
1652 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1655 cb2.setConnectionAcceptedFn(
1656 [&](int fd, const folly::SocketAddress& addr) {
1657 if (cb2Count == 0) {
1658 serverSocket->removeAcceptCallback(&cb3, nullptr);
1659 serverSocket->removeAcceptCallback(&cb5, nullptr);
1663 // Have callback 6 remove callback 4 the first time it is called,
1664 // and destroy the server socket the second time it is called
1666 cb6.setConnectionAcceptedFn(
1667 [&](int fd, const folly::SocketAddress& addr) {
1668 if (cb6Count == 0) {
1669 serverSocket->removeAcceptCallback(&cb4, nullptr);
1670 std::shared_ptr<AsyncSocket> sock6(
1671 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1672 std::shared_ptr<AsyncSocket> sock7(
1673 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1674 std::shared_ptr<AsyncSocket> sock8(
1675 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1678 serverSocket.reset();
1682 // Have callback 7 remove itself
1683 cb7.setConnectionAcceptedFn(
1684 [&](int fd, const folly::SocketAddress& addr) {
1685 serverSocket->removeAcceptCallback(&cb7, nullptr);
1688 serverSocket->addAcceptCallback(&cb1, nullptr);
1689 serverSocket->addAcceptCallback(&cb2, nullptr);
1690 serverSocket->addAcceptCallback(&cb3, nullptr);
1691 serverSocket->addAcceptCallback(&cb4, nullptr);
1692 serverSocket->addAcceptCallback(&cb5, nullptr);
1693 serverSocket->addAcceptCallback(&cb6, nullptr);
1694 serverSocket->addAcceptCallback(&cb7, nullptr);
1695 serverSocket->startAccepting();
1697 // Make several connections to the socket
1698 std::shared_ptr<AsyncSocket> sock1(
1699 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1700 std::shared_ptr<AsyncSocket> sock4(
1701 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1703 // Loop until we are stopped
1706 // Check to make sure that the expected callbacks were invoked.
1708 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1709 // the AcceptCallbacks in round-robin fashion, in the order that they were
1710 // added. The code is implemented this way right now, but the API doesn't
1711 // explicitly require it be done this way. If we change the code not to be
1712 // exactly round robin in the future, we can simplify the test checks here.
1713 // (We'll also need to update the termination code, since we expect cb6 to
1714 // get called twice to terminate the loop.)
1715 CHECK_EQ(cb1.getEvents()->size(), 4);
1716 CHECK_EQ(cb1.getEvents()->at(0).type,
1717 TestAcceptCallback::TYPE_START);
1718 CHECK_EQ(cb1.getEvents()->at(1).type,
1719 TestAcceptCallback::TYPE_ACCEPT);
1720 CHECK_EQ(cb1.getEvents()->at(2).type,
1721 TestAcceptCallback::TYPE_ACCEPT);
1722 CHECK_EQ(cb1.getEvents()->at(3).type,
1723 TestAcceptCallback::TYPE_STOP);
1725 CHECK_EQ(cb2.getEvents()->size(), 4);
1726 CHECK_EQ(cb2.getEvents()->at(0).type,
1727 TestAcceptCallback::TYPE_START);
1728 CHECK_EQ(cb2.getEvents()->at(1).type,
1729 TestAcceptCallback::TYPE_ACCEPT);
1730 CHECK_EQ(cb2.getEvents()->at(2).type,
1731 TestAcceptCallback::TYPE_ACCEPT);
1732 CHECK_EQ(cb2.getEvents()->at(3).type,
1733 TestAcceptCallback::TYPE_STOP);
1735 CHECK_EQ(cb3.getEvents()->size(), 2);
1736 CHECK_EQ(cb3.getEvents()->at(0).type,
1737 TestAcceptCallback::TYPE_START);
1738 CHECK_EQ(cb3.getEvents()->at(1).type,
1739 TestAcceptCallback::TYPE_STOP);
1741 CHECK_EQ(cb4.getEvents()->size(), 3);
1742 CHECK_EQ(cb4.getEvents()->at(0).type,
1743 TestAcceptCallback::TYPE_START);
1744 CHECK_EQ(cb4.getEvents()->at(1).type,
1745 TestAcceptCallback::TYPE_ACCEPT);
1746 CHECK_EQ(cb4.getEvents()->at(2).type,
1747 TestAcceptCallback::TYPE_STOP);
1749 CHECK_EQ(cb5.getEvents()->size(), 2);
1750 CHECK_EQ(cb5.getEvents()->at(0).type,
1751 TestAcceptCallback::TYPE_START);
1752 CHECK_EQ(cb5.getEvents()->at(1).type,
1753 TestAcceptCallback::TYPE_STOP);
1755 CHECK_EQ(cb6.getEvents()->size(), 4);
1756 CHECK_EQ(cb6.getEvents()->at(0).type,
1757 TestAcceptCallback::TYPE_START);
1758 CHECK_EQ(cb6.getEvents()->at(1).type,
1759 TestAcceptCallback::TYPE_ACCEPT);
1760 CHECK_EQ(cb6.getEvents()->at(2).type,
1761 TestAcceptCallback::TYPE_ACCEPT);
1762 CHECK_EQ(cb6.getEvents()->at(3).type,
1763 TestAcceptCallback::TYPE_STOP);
1765 CHECK_EQ(cb7.getEvents()->size(), 3);
1766 CHECK_EQ(cb7.getEvents()->at(0).type,
1767 TestAcceptCallback::TYPE_START);
1768 CHECK_EQ(cb7.getEvents()->at(1).type,
1769 TestAcceptCallback::TYPE_ACCEPT);
1770 CHECK_EQ(cb7.getEvents()->at(2).type,
1771 TestAcceptCallback::TYPE_STOP);
1775 * Test AsyncServerSocket::removeAcceptCallback()
1777 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1778 // Create a new AsyncServerSocket
1779 EventBase eventBase;
1780 std::shared_ptr<AsyncServerSocket> serverSocket(
1781 AsyncServerSocket::newSocket(&eventBase));
1782 serverSocket->bind(0);
1783 serverSocket->listen(16);
1784 folly::SocketAddress serverAddress;
1785 serverSocket->getAddress(&serverAddress);
1787 // Add several accept callbacks
1788 TestAcceptCallback cb1;
1789 auto thread_id = pthread_self();
1790 cb1.setAcceptStartedFn([&](){
1791 CHECK_NE(thread_id, pthread_self());
1792 thread_id = pthread_self();
1794 cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1795 CHECK_EQ(thread_id, pthread_self());
1796 serverSocket->removeAcceptCallback(&cb1, nullptr);
1798 cb1.setAcceptStoppedFn([&](){
1799 CHECK_EQ(thread_id, pthread_self());
1802 // Test having callbacks remove other callbacks before them on the list,
1803 serverSocket->addAcceptCallback(&cb1, nullptr);
1804 serverSocket->startAccepting();
1806 // Make several connections to the socket
1807 std::shared_ptr<AsyncSocket> sock1(
1808 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1810 // Loop in another thread
1811 auto other = std::thread([&](){
1816 // Check to make sure that the expected callbacks were invoked.
1818 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1819 // the AcceptCallbacks in round-robin fashion, in the order that they were
1820 // added. The code is implemented this way right now, but the API doesn't
1821 // explicitly require it be done this way. If we change the code not to be
1822 // exactly round robin in the future, we can simplify the test checks here.
1823 // (We'll also need to update the termination code, since we expect cb6 to
1824 // get called twice to terminate the loop.)
1825 CHECK_EQ(cb1.getEvents()->size(), 3);
1826 CHECK_EQ(cb1.getEvents()->at(0).type,
1827 TestAcceptCallback::TYPE_START);
1828 CHECK_EQ(cb1.getEvents()->at(1).type,
1829 TestAcceptCallback::TYPE_ACCEPT);
1830 CHECK_EQ(cb1.getEvents()->at(2).type,
1831 TestAcceptCallback::TYPE_STOP);
1835 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1836 // Add a callback to accept one connection then stop accepting
1837 TestAcceptCallback acceptCallback;
1838 acceptCallback.setConnectionAcceptedFn(
1839 [&](int fd, const folly::SocketAddress& addr) {
1840 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1842 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1843 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1845 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1846 serverSocket->startAccepting();
1848 // Connect to the server socket
1849 EventBase* eventBase = serverSocket->getEventBase();
1850 folly::SocketAddress serverAddress;
1851 serverSocket->getAddress(&serverAddress);
1852 AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1854 // Loop to process all events
1857 // Verify that the server accepted a connection
1858 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1859 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1860 TestAcceptCallback::TYPE_START);
1861 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1862 TestAcceptCallback::TYPE_ACCEPT);
1863 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1864 TestAcceptCallback::TYPE_STOP);
1867 /* Verify that we don't leak sockets if we are destroyed()
1868 * and there are still writes pending
1870 * If destroy() only calls close() instead of closeNow(),
1871 * it would shutdown(writes) on the socket, but it would
1872 * never be close()'d, and the socket would leak
1874 TEST(AsyncSocketTest, DestroyCloseTest) {
1880 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1882 socket->connect(&ccb, server.getAddress(), 30);
1884 // Accept the connection
1885 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1887 acceptedSocket->setReadCB(&rcb);
1889 // Write a large buffer to the socket that is larger than kernel buffer
1890 size_t simpleBufLength = 5000000;
1891 char* simpleBuf = new char[simpleBufLength];
1892 memset(simpleBuf, 'a', simpleBufLength);
1895 // Let the reads and writes run to completion
1896 int fd = acceptedSocket->getFd();
1898 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1900 acceptedSocket.reset();
1902 // Test that server socket was closed
1903 ssize_t sz = read(fd, simpleBuf, simpleBufLength);
1910 * Test AsyncServerSocket::useExistingSocket()
1912 TEST(AsyncSocketTest, ServerExistingSocket) {
1913 EventBase eventBase;
1915 // Test creating a socket, and letting AsyncServerSocket bind and listen
1917 // Manually create a socket
1918 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1921 // Create a server socket
1922 AsyncServerSocket::UniquePtr serverSocket(
1923 new AsyncServerSocket(&eventBase));
1924 serverSocket->useExistingSocket(fd);
1925 folly::SocketAddress address;
1926 serverSocket->getAddress(&address);
1928 serverSocket->bind(address);
1929 serverSocket->listen(16);
1931 // Make sure the socket works
1932 serverSocketSanityTest(serverSocket.get());
1935 // Test creating a socket and binding manually,
1936 // then letting AsyncServerSocket listen
1938 // Manually create a socket
1939 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1942 struct sockaddr_in addr;
1943 addr.sin_family = AF_INET;
1945 addr.sin_addr.s_addr = INADDR_ANY;
1946 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1948 // Look up the address that we bound to
1949 folly::SocketAddress boundAddress;
1950 boundAddress.setFromLocalAddress(fd);
1952 // Create a server socket
1953 AsyncServerSocket::UniquePtr serverSocket(
1954 new AsyncServerSocket(&eventBase));
1955 serverSocket->useExistingSocket(fd);
1956 serverSocket->listen(16);
1958 // Make sure AsyncServerSocket reports the same address that we bound to
1959 folly::SocketAddress serverSocketAddress;
1960 serverSocket->getAddress(&serverSocketAddress);
1961 CHECK_EQ(boundAddress, serverSocketAddress);
1963 // Make sure the socket works
1964 serverSocketSanityTest(serverSocket.get());
1967 // Test creating a socket, binding and listening manually,
1968 // then giving it to AsyncServerSocket
1970 // Manually create a socket
1971 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1974 struct sockaddr_in addr;
1975 addr.sin_family = AF_INET;
1977 addr.sin_addr.s_addr = INADDR_ANY;
1978 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1980 // Look up the address that we bound to
1981 folly::SocketAddress boundAddress;
1982 boundAddress.setFromLocalAddress(fd);
1984 CHECK_EQ(listen(fd, 16), 0);
1986 // Create a server socket
1987 AsyncServerSocket::UniquePtr serverSocket(
1988 new AsyncServerSocket(&eventBase));
1989 serverSocket->useExistingSocket(fd);
1991 // Make sure AsyncServerSocket reports the same address that we bound to
1992 folly::SocketAddress serverSocketAddress;
1993 serverSocket->getAddress(&serverSocketAddress);
1994 CHECK_EQ(boundAddress, serverSocketAddress);
1996 // Make sure the socket works
1997 serverSocketSanityTest(serverSocket.get());
2001 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2002 EventBase eventBase;
2004 // Create a server socket
2005 std::shared_ptr<AsyncServerSocket> serverSocket(
2006 AsyncServerSocket::newSocket(&eventBase));
2008 path.append("/anonymous");
2009 folly::SocketAddress serverAddress;
2010 serverAddress.setFromPath(path);
2011 serverSocket->bind(serverAddress);
2012 serverSocket->listen(16);
2014 // Add a callback to accept one connection then stop the loop
2015 TestAcceptCallback acceptCallback;
2016 acceptCallback.setConnectionAcceptedFn(
2017 [&](int fd, const folly::SocketAddress& addr) {
2018 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2020 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
2021 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2023 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2024 serverSocket->startAccepting();
2026 // Connect to the server socket
2027 std::shared_ptr<AsyncSocket> socket(
2028 AsyncSocket::newSocket(&eventBase, serverAddress));
2032 // Verify that the server accepted a connection
2033 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2034 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2035 TestAcceptCallback::TYPE_START);
2036 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2037 TestAcceptCallback::TYPE_ACCEPT);
2038 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2039 TestAcceptCallback::TYPE_STOP);
2040 int fd = acceptCallback.getEvents()->at(1).fd;
2042 // The accepted connection should already be in non-blocking mode
2043 int flags = fcntl(fd, F_GETFL, 0);
2044 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);