2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/RWSpinLock.h>
21 #include <folly/SocketAddress.h>
22 #include <folly/Random.h>
24 #include <folly/io/IOBuf.h>
25 #include <folly/io/async/test/AsyncSocketTest.h>
26 #include <folly/io/async/test/Util.h>
27 #include <folly/portability/Unistd.h>
28 #include <folly/test/SocketAddressTestHelper.h>
30 #include <gtest/gtest.h>
31 #include <boost/scoped_array.hpp>
35 #include <sys/types.h>
36 #include <sys/socket.h>
37 #include <netinet/tcp.h>
40 using namespace boost;
47 using std::unique_ptr;
48 using std::chrono::milliseconds;
49 using boost::scoped_array;
51 using namespace folly;
53 class DelayedWrite: public AsyncTimeout {
55 DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
56 unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
57 bool cork, bool lastWrite = false):
58 AsyncTimeout(socket->getEventBase()),
60 bufs_(std::move(bufs)),
63 lastWrite_(lastWrite) {}
66 void timeoutExpired() noexcept override {
67 WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
68 socket_->writeChain(wcb_, std::move(bufs_), flags);
70 socket_->shutdownWrite();
74 std::shared_ptr<AsyncSocket> socket_;
75 unique_ptr<IOBuf> bufs_;
76 AsyncTransportWrapper::WriteCallback* wcb_;
81 ///////////////////////////////////////////////////////////////////////////
83 ///////////////////////////////////////////////////////////////////////////
86 * Test connecting to a server
88 TEST(AsyncSocketTest, Connect) {
89 // Start listening on a local port
92 // Connect using a AsyncSocket
94 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
96 socket->connect(&cb, server.getAddress(), 30);
100 CHECK_EQ(cb.state, STATE_SUCCEEDED);
101 EXPECT_LE(0, socket->getConnectTime().count());
102 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
106 * Test connecting to a server that isn't listening
108 TEST(AsyncSocketTest, ConnectRefused) {
111 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
113 // Hopefully nothing is actually listening on this address
114 folly::SocketAddress addr("127.0.0.1", 65535);
116 socket->connect(&cb, addr, 30);
120 CHECK_EQ(cb.state, STATE_FAILED);
121 CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
122 EXPECT_LE(0, socket->getConnectTime().count());
123 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
127 * Test connection timeout
129 TEST(AsyncSocketTest, ConnectTimeout) {
132 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
134 // Try connecting to server that won't respond.
136 // This depends somewhat on the network where this test is run.
137 // Hopefully this IP will be routable but unresponsive.
138 // (Alternatively, we could try listening on a local raw socket, but that
139 // normally requires root privileges.)
141 SocketAddressTestHelper::isIPv6Enabled() ?
142 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
143 SocketAddressTestHelper::isIPv4Enabled() ?
144 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
146 SocketAddress addr(host, 65535);
148 socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
152 CHECK_EQ(cb.state, STATE_FAILED);
153 CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
155 // Verify that we can still get the peer address after a timeout.
156 // Use case is if the client was created from a client pool, and we want
157 // to log which peer failed.
158 folly::SocketAddress peer;
159 socket->getPeerAddress(&peer);
160 CHECK_EQ(peer, addr);
161 EXPECT_LE(0, socket->getConnectTime().count());
162 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
166 * Test writing immediately after connecting, without waiting for connect
169 TEST(AsyncSocketTest, ConnectAndWrite) {
174 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
176 socket->connect(&ccb, server.getAddress(), 30);
180 memset(buf, 'a', sizeof(buf));
182 socket->write(&wcb, buf, sizeof(buf));
184 // Loop. We don't bother accepting on the server socket yet.
185 // The kernel should be able to buffer the write request so it can succeed.
188 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
189 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
191 // Make sure the server got a connection and received the data
193 server.verifyConnection(buf, sizeof(buf));
195 ASSERT_TRUE(socket->isClosedBySelf());
196 ASSERT_FALSE(socket->isClosedByPeer());
197 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
201 * Test connecting using a nullptr connect callback.
203 TEST(AsyncSocketTest, ConnectNullCallback) {
208 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
209 socket->connect(nullptr, server.getAddress(), 30);
211 // write some data, just so we have some way of verifing
212 // that the socket works correctly after connecting
214 memset(buf, 'a', sizeof(buf));
216 socket->write(&wcb, buf, sizeof(buf));
220 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
222 // Make sure the server got a connection and received the data
224 server.verifyConnection(buf, sizeof(buf));
226 ASSERT_TRUE(socket->isClosedBySelf());
227 ASSERT_FALSE(socket->isClosedByPeer());
231 * Test calling both write() and close() immediately after connecting, without
232 * waiting for connect to finish.
234 * This exercises the STATE_CONNECTING_CLOSING code.
236 TEST(AsyncSocketTest, ConnectWriteAndClose) {
241 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
243 socket->connect(&ccb, server.getAddress(), 30);
247 memset(buf, 'a', sizeof(buf));
249 socket->write(&wcb, buf, sizeof(buf));
254 // Loop. We don't bother accepting on the server socket yet.
255 // The kernel should be able to buffer the write request so it can succeed.
258 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
259 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
261 // Make sure the server got a connection and received the data
262 server.verifyConnection(buf, sizeof(buf));
264 ASSERT_TRUE(socket->isClosedBySelf());
265 ASSERT_FALSE(socket->isClosedByPeer());
269 * Test calling close() immediately after connect()
271 TEST(AsyncSocketTest, ConnectAndClose) {
274 // Connect using a AsyncSocket
276 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
278 socket->connect(&ccb, server.getAddress(), 30);
280 // Hopefully the connect didn't succeed immediately.
281 // If it did, we can't exercise the close-while-connecting code path.
282 if (ccb.state == STATE_SUCCEEDED) {
283 LOG(INFO) << "connect() succeeded immediately; aborting test "
284 "of close-during-connect behavior";
290 // Loop, although there shouldn't be anything to do.
293 // Make sure the connection was aborted
294 CHECK_EQ(ccb.state, STATE_FAILED);
296 ASSERT_TRUE(socket->isClosedBySelf());
297 ASSERT_FALSE(socket->isClosedByPeer());
301 * Test calling closeNow() immediately after connect()
303 * This should be identical to the normal close behavior.
305 TEST(AsyncSocketTest, ConnectAndCloseNow) {
308 // Connect using a AsyncSocket
310 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
312 socket->connect(&ccb, server.getAddress(), 30);
314 // Hopefully the connect didn't succeed immediately.
315 // If it did, we can't exercise the close-while-connecting code path.
316 if (ccb.state == STATE_SUCCEEDED) {
317 LOG(INFO) << "connect() succeeded immediately; aborting test "
318 "of closeNow()-during-connect behavior";
324 // Loop, although there shouldn't be anything to do.
327 // Make sure the connection was aborted
328 CHECK_EQ(ccb.state, STATE_FAILED);
330 ASSERT_TRUE(socket->isClosedBySelf());
331 ASSERT_FALSE(socket->isClosedByPeer());
335 * Test calling both write() and closeNow() immediately after connecting,
336 * without waiting for connect to finish.
338 * This should abort the pending write.
340 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
345 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
347 socket->connect(&ccb, server.getAddress(), 30);
349 // Hopefully the connect didn't succeed immediately.
350 // If it did, we can't exercise the close-while-connecting code path.
351 if (ccb.state == STATE_SUCCEEDED) {
352 LOG(INFO) << "connect() succeeded immediately; aborting test "
353 "of write-during-connect behavior";
359 memset(buf, 'a', sizeof(buf));
361 socket->write(&wcb, buf, sizeof(buf));
366 // Loop, although there shouldn't be anything to do.
369 CHECK_EQ(ccb.state, STATE_FAILED);
370 CHECK_EQ(wcb.state, STATE_FAILED);
372 ASSERT_TRUE(socket->isClosedBySelf());
373 ASSERT_FALSE(socket->isClosedByPeer());
377 * Test installing a read callback immediately, before connect() finishes.
379 TEST(AsyncSocketTest, ConnectAndRead) {
384 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
386 socket->connect(&ccb, server.getAddress(), 30);
389 socket->setReadCB(&rcb);
391 // Even though we haven't looped yet, we should be able to accept
392 // the connection and send data to it.
393 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
395 memset(buf, 'a', sizeof(buf));
396 acceptedSocket->write(buf, sizeof(buf));
397 acceptedSocket->flush();
398 acceptedSocket->close();
400 // Loop, although there shouldn't be anything to do.
403 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
404 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
405 CHECK_EQ(rcb.buffers.size(), 1);
406 CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
407 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
409 ASSERT_FALSE(socket->isClosedBySelf());
410 ASSERT_FALSE(socket->isClosedByPeer());
414 * Test installing a read callback and then closing immediately before the
415 * connect attempt finishes.
417 TEST(AsyncSocketTest, ConnectReadAndClose) {
422 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
424 socket->connect(&ccb, server.getAddress(), 30);
426 // Hopefully the connect didn't succeed immediately.
427 // If it did, we can't exercise the close-while-connecting code path.
428 if (ccb.state == STATE_SUCCEEDED) {
429 LOG(INFO) << "connect() succeeded immediately; aborting test "
430 "of read-during-connect behavior";
435 socket->setReadCB(&rcb);
440 // Loop, although there shouldn't be anything to do.
443 CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
444 CHECK_EQ(rcb.buffers.size(), 0);
445 CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
447 ASSERT_TRUE(socket->isClosedBySelf());
448 ASSERT_FALSE(socket->isClosedByPeer());
452 * Test both writing and installing a read callback immediately,
453 * before connect() finishes.
455 TEST(AsyncSocketTest, ConnectWriteAndRead) {
460 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
462 socket->connect(&ccb, server.getAddress(), 30);
466 memset(buf1, 'a', sizeof(buf1));
468 socket->write(&wcb, buf1, sizeof(buf1));
470 // set a read callback
472 socket->setReadCB(&rcb);
474 // Even though we haven't looped yet, we should be able to accept
475 // the connection and send data to it.
476 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
478 memset(buf2, 'b', sizeof(buf2));
479 acceptedSocket->write(buf2, sizeof(buf2));
480 acceptedSocket->flush();
482 // shut down the write half of acceptedSocket, so that the AsyncSocket
483 // will stop reading and we can break out of the event loop.
484 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
489 // Make sure the connect succeeded
490 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
492 // Make sure the AsyncSocket read the data written by the accepted socket
493 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
494 CHECK_EQ(rcb.buffers.size(), 1);
495 CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
496 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
498 // Close the AsyncSocket so we'll see EOF on acceptedSocket
501 // Make sure the accepted socket saw the data written by the AsyncSocket
502 uint8_t readbuf[sizeof(buf1)];
503 acceptedSocket->readAll(readbuf, sizeof(readbuf));
504 CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
505 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
506 CHECK_EQ(bytesRead, 0);
508 ASSERT_FALSE(socket->isClosedBySelf());
509 ASSERT_TRUE(socket->isClosedByPeer());
513 * Test writing to the socket then shutting down writes before the connect
516 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
521 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
523 socket->connect(&ccb, server.getAddress(), 30);
525 // Hopefully the connect didn't succeed immediately.
526 // If it did, we can't exercise the write-while-connecting code path.
527 if (ccb.state == STATE_SUCCEEDED) {
528 LOG(INFO) << "connect() succeeded immediately; skipping test";
532 // Ask to write some data
534 memset(wbuf, 'a', sizeof(wbuf));
536 socket->write(&wcb, wbuf, sizeof(wbuf));
537 socket->shutdownWrite();
540 socket->shutdownWrite();
542 // Even though we haven't looped yet, we should be able to accept
544 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
546 // Since the connection is still in progress, there should be no data to
547 // read yet. Verify that the accepted socket is not readable.
548 struct pollfd fds[1];
549 fds[0].fd = acceptedSocket->getSocketFD();
550 fds[0].events = POLLIN;
552 int rc = poll(fds, 1, 0);
555 // Write data to the accepted socket
556 uint8_t acceptedWbuf[192];
557 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
558 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
559 acceptedSocket->flush();
564 // The loop should have completed the connection, written the queued data,
565 // and shutdown writes on the socket.
567 // Check that the connection was completed successfully and that the write
568 // callback succeeded.
569 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
570 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
572 // Check that we can read the data that was written to the socket, and that
573 // we see an EOF, since its socket was half-shutdown.
574 uint8_t readbuf[sizeof(wbuf)];
575 acceptedSocket->readAll(readbuf, sizeof(readbuf));
576 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
577 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
578 CHECK_EQ(bytesRead, 0);
580 // Close the accepted socket. This will cause it to see EOF
581 // and uninstall the read callback when we loop next.
582 acceptedSocket->close();
584 // Install a read callback, then loop again.
586 socket->setReadCB(&rcb);
589 // This loop should have read the data and seen the EOF
590 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
591 CHECK_EQ(rcb.buffers.size(), 1);
592 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
593 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
594 acceptedWbuf, sizeof(acceptedWbuf)), 0);
596 ASSERT_FALSE(socket->isClosedBySelf());
597 ASSERT_FALSE(socket->isClosedByPeer());
601 * Test reading, writing, and shutting down writes before the connect attempt
604 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
609 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
611 socket->connect(&ccb, server.getAddress(), 30);
613 // Hopefully the connect didn't succeed immediately.
614 // If it did, we can't exercise the write-while-connecting code path.
615 if (ccb.state == STATE_SUCCEEDED) {
616 LOG(INFO) << "connect() succeeded immediately; skipping test";
620 // Install a read callback
622 socket->setReadCB(&rcb);
624 // Ask to write some data
626 memset(wbuf, 'a', sizeof(wbuf));
628 socket->write(&wcb, wbuf, sizeof(wbuf));
631 socket->shutdownWrite();
633 // Even though we haven't looped yet, we should be able to accept
635 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
637 // Since the connection is still in progress, there should be no data to
638 // read yet. Verify that the accepted socket is not readable.
639 struct pollfd fds[1];
640 fds[0].fd = acceptedSocket->getSocketFD();
641 fds[0].events = POLLIN;
643 int rc = poll(fds, 1, 0);
646 // Write data to the accepted socket
647 uint8_t acceptedWbuf[192];
648 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
649 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
650 acceptedSocket->flush();
651 // Shutdown writes to the accepted socket. This will cause it to see EOF
652 // and uninstall the read callback.
653 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
658 // The loop should have completed the connection, written the queued data,
659 // shutdown writes on the socket, read the data we wrote to it, and see the
662 // Check that the connection was completed successfully and that the read
663 // and write callbacks were invoked as expected.
664 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
665 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
666 CHECK_EQ(rcb.buffers.size(), 1);
667 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
668 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
669 acceptedWbuf, sizeof(acceptedWbuf)), 0);
670 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
672 // Check that we can read the data that was written to the socket, and that
673 // we see an EOF, since its socket was half-shutdown.
674 uint8_t readbuf[sizeof(wbuf)];
675 acceptedSocket->readAll(readbuf, sizeof(readbuf));
676 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
677 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
678 CHECK_EQ(bytesRead, 0);
680 // Fully close both sockets
681 acceptedSocket->close();
684 ASSERT_FALSE(socket->isClosedBySelf());
685 ASSERT_TRUE(socket->isClosedByPeer());
689 * Test reading, writing, and calling shutdownWriteNow() before the
690 * connect attempt finishes.
692 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
697 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
699 socket->connect(&ccb, server.getAddress(), 30);
701 // Hopefully the connect didn't succeed immediately.
702 // If it did, we can't exercise the write-while-connecting code path.
703 if (ccb.state == STATE_SUCCEEDED) {
704 LOG(INFO) << "connect() succeeded immediately; skipping test";
708 // Install a read callback
710 socket->setReadCB(&rcb);
712 // Ask to write some data
714 memset(wbuf, 'a', sizeof(wbuf));
716 socket->write(&wcb, wbuf, sizeof(wbuf));
718 // Shutdown writes immediately.
719 // This should immediately discard the data that we just tried to write.
720 socket->shutdownWriteNow();
722 // Verify that writeError() was invoked on the write callback.
723 CHECK_EQ(wcb.state, STATE_FAILED);
724 CHECK_EQ(wcb.bytesWritten, 0);
726 // Even though we haven't looped yet, we should be able to accept
728 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
730 // Since the connection is still in progress, there should be no data to
731 // read yet. Verify that the accepted socket is not readable.
732 struct pollfd fds[1];
733 fds[0].fd = acceptedSocket->getSocketFD();
734 fds[0].events = POLLIN;
736 int rc = poll(fds, 1, 0);
739 // Write data to the accepted socket
740 uint8_t acceptedWbuf[192];
741 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
742 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
743 acceptedSocket->flush();
744 // Shutdown writes to the accepted socket. This will cause it to see EOF
745 // and uninstall the read callback.
746 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
751 // The loop should have completed the connection, written the queued data,
752 // shutdown writes on the socket, read the data we wrote to it, and see the
755 // Check that the connection was completed successfully and that the read
756 // callback was invoked as expected.
757 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
758 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
759 CHECK_EQ(rcb.buffers.size(), 1);
760 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
761 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
762 acceptedWbuf, sizeof(acceptedWbuf)), 0);
764 // Since we used shutdownWriteNow(), it should have discarded all pending
765 // write data. Verify we see an immediate EOF when reading from the accepted
767 uint8_t readbuf[sizeof(wbuf)];
768 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
769 CHECK_EQ(bytesRead, 0);
771 // Fully close both sockets
772 acceptedSocket->close();
775 ASSERT_FALSE(socket->isClosedBySelf());
776 ASSERT_TRUE(socket->isClosedByPeer());
779 // Helper function for use in testConnectOptWrite()
780 // Temporarily disable the read callback
781 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
782 // Uninstall the read callback
783 socket->setReadCB(nullptr);
784 // Schedule the read callback to be reinstalled after 1ms
785 socket->getEventBase()->runInLoop(
786 std::bind(&AsyncSocket::setReadCB, socket, rcb));
790 * Test connect+write, then have the connect callback perform another write.
792 * This tests interaction of the optimistic writing after connect with
793 * additional write attempts that occur in the connect callback.
795 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
798 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
802 socket->connect(&ccb, server.getAddress(), 30);
804 // Hopefully the connect didn't succeed immediately.
805 // If it did, we can't exercise the optimistic write code path.
806 if (ccb.state == STATE_SUCCEEDED) {
807 LOG(INFO) << "connect() succeeded immediately; aborting test "
808 "of optimistic write behavior";
812 // Tell the connect callback to perform a write when the connect succeeds
814 scoped_array<char> buf2(new char[size2]);
815 memset(buf2.get(), 'b', size2);
817 ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
818 // Tell the second write callback to close the connection when it is done
819 wcb2.successCallback = [&] { socket->closeNow(); };
822 // Schedule one write() immediately, before the connect finishes
823 scoped_array<char> buf1(new char[size1]);
824 memset(buf1.get(), 'a', size1);
827 socket->write(&wcb1, buf1.get(), size1);
831 // immediately perform a close, before connect() completes
835 // Start reading from the other endpoint after 10ms.
836 // If we're using large buffers, we have to read so that the writes don't
838 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
840 rcb.dataAvailableCallback = std::bind(tmpDisableReads,
841 acceptedSocket.get(), &rcb);
842 socket->getEventBase()->tryRunAfterDelay(
843 std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
846 // Loop. We don't bother accepting on the server socket yet.
847 // The kernel should be able to buffer the write request so it can succeed.
850 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
852 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
855 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
860 // Make sure the read callback received all of the data
861 size_t bytesRead = 0;
862 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
863 it != rcb.buffers.end();
865 size_t start = bytesRead;
866 bytesRead += it->length;
867 size_t end = bytesRead;
869 size_t cmpLen = min(size1, end) - start;
870 CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
872 if (end > size1 && end <= size1 + size2) {
876 if (start >= size1) {
878 buf2Offset = start - size1;
879 cmpLen = end - start;
881 itOffset = size1 - start;
883 cmpLen = end - size1;
885 CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
890 CHECK_EQ(bytesRead, size1 + size2);
893 TEST(AsyncSocketTest, ConnectCallbackWrite) {
894 // Test using small writes that should both succeed immediately
895 testConnectOptWrite(100, 200);
897 // Test using a large buffer in the connect callback, that should block
898 const size_t largeSize = 8*1024*1024;
899 testConnectOptWrite(100, largeSize);
901 // Test using a large initial write
902 testConnectOptWrite(largeSize, 100);
904 // Test using two large buffers
905 testConnectOptWrite(largeSize, largeSize);
907 // Test a small write in the connect callback,
908 // but no immediate write before connect completes
909 testConnectOptWrite(0, 64);
911 // Test a large write in the connect callback,
912 // but no immediate write before connect completes
913 testConnectOptWrite(0, largeSize);
915 // Test connect, a small write, then immediately call close() before connect
917 testConnectOptWrite(211, 0, true);
919 // Test connect, a large immediate write (that will block), then immediately
920 // call close() before connect completes
921 testConnectOptWrite(largeSize, 0, true);
924 ///////////////////////////////////////////////////////////////////////////
925 // write() related tests
926 ///////////////////////////////////////////////////////////////////////////
929 * Test writing using a nullptr callback
931 TEST(AsyncSocketTest, WriteNullCallback) {
936 std::shared_ptr<AsyncSocket> socket =
937 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
938 evb.loop(); // loop until the socket is connected
940 // write() with a nullptr callback
942 memset(buf, 'a', sizeof(buf));
943 socket->write(nullptr, buf, sizeof(buf));
945 evb.loop(); // loop until the data is sent
947 // Make sure the server got a connection and received the data
949 server.verifyConnection(buf, sizeof(buf));
951 ASSERT_TRUE(socket->isClosedBySelf());
952 ASSERT_FALSE(socket->isClosedByPeer());
956 * Test writing with a send timeout
958 TEST(AsyncSocketTest, WriteTimeout) {
963 std::shared_ptr<AsyncSocket> socket =
964 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
965 evb.loop(); // loop until the socket is connected
967 // write() a large chunk of data, with no-one on the other end reading
968 size_t writeLength = 8*1024*1024;
969 uint32_t timeout = 200;
970 socket->setSendTimeout(timeout);
971 scoped_array<char> buf(new char[writeLength]);
972 memset(buf.get(), 'a', writeLength);
974 socket->write(&wcb, buf.get(), writeLength);
980 // Make sure the write attempt timed out as requested
981 CHECK_EQ(wcb.state, STATE_FAILED);
982 CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
984 // Check that the write timed out within a reasonable period of time.
985 // We don't check for exactly the specified timeout, since AsyncSocket only
986 // times out when it hasn't made progress for that period of time.
988 // On linux, the first write sends a few hundred kb of data, then blocks for
989 // writability, and then unblocks again after 40ms and is able to write
990 // another smaller of data before blocking permanently. Therefore it doesn't
991 // time out until 40ms + timeout.
993 // I haven't fully verified the cause of this, but I believe it probably
994 // occurs because the receiving end delays sending an ack for up to 40ms.
995 // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
996 // the ack, it can send some more data. However, after that point the
997 // receiver's kernel buffer is full. This 40ms delay happens even with
998 // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
999 // kernel may be automatically disabling TCP_QUICKACK after receiving some
1002 // For now, we simply check that the timeout occurred within 160ms of
1003 // the requested value.
1004 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1008 * Test writing to a socket that the remote endpoint has closed
1010 TEST(AsyncSocketTest, WritePipeError) {
1015 std::shared_ptr<AsyncSocket> socket =
1016 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1017 socket->setSendTimeout(1000);
1018 evb.loop(); // loop until the socket is connected
1020 // accept and immediately close the socket
1021 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1022 acceptedSocket.reset();
1024 // write() a large chunk of data
1025 size_t writeLength = 8*1024*1024;
1026 scoped_array<char> buf(new char[writeLength]);
1027 memset(buf.get(), 'a', writeLength);
1029 socket->write(&wcb, buf.get(), writeLength);
1033 // Make sure the write failed.
1034 // It would be nice if AsyncSocketException could convey the errno value,
1035 // so that we could check for EPIPE
1036 CHECK_EQ(wcb.state, STATE_FAILED);
1037 CHECK_EQ(wcb.exception.getType(),
1038 AsyncSocketException::INTERNAL_ERROR);
1040 ASSERT_FALSE(socket->isClosedBySelf());
1041 ASSERT_FALSE(socket->isClosedByPeer());
1045 * Test writing a mix of simple buffers and IOBufs
1047 TEST(AsyncSocketTest, WriteIOBuf) {
1052 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1054 socket->connect(&ccb, server.getAddress(), 30);
1056 // Accept the connection
1057 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1059 acceptedSocket->setReadCB(&rcb);
1061 // Write a simple buffer to the socket
1062 size_t simpleBufLength = 5;
1063 char simpleBuf[simpleBufLength];
1064 memset(simpleBuf, 'a', simpleBufLength);
1066 socket->write(&wcb, simpleBuf, simpleBufLength);
1068 // Write a single-element IOBuf chain
1069 size_t buf1Length = 7;
1070 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1071 memset(buf1->writableData(), 'b', buf1Length);
1072 buf1->append(buf1Length);
1073 unique_ptr<IOBuf> buf1Copy(buf1->clone());
1075 socket->writeChain(&wcb2, std::move(buf1));
1077 // Write a multiple-element IOBuf chain
1078 size_t buf2Length = 11;
1079 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1080 memset(buf2->writableData(), 'c', buf2Length);
1081 buf2->append(buf2Length);
1082 size_t buf3Length = 13;
1083 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1084 memset(buf3->writableData(), 'd', buf3Length);
1085 buf3->append(buf3Length);
1086 buf2->appendChain(std::move(buf3));
1087 unique_ptr<IOBuf> buf2Copy(buf2->clone());
1088 buf2Copy->coalesce();
1090 socket->writeChain(&wcb3, std::move(buf2));
1091 socket->shutdownWrite();
1093 // Let the reads and writes run to completion
1096 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1097 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1098 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1100 // Make sure the reader got the right data in the right order
1101 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1102 CHECK_EQ(rcb.buffers.size(), 1);
1103 CHECK_EQ(rcb.buffers[0].length,
1104 simpleBufLength + buf1Length + buf2Length + buf3Length);
1106 memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1108 memcmp(rcb.buffers[0].buffer + simpleBufLength,
1109 buf1Copy->data(), buf1Copy->length()), 0);
1111 memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1112 buf2Copy->data(), buf2Copy->length()), 0);
1114 acceptedSocket->close();
1117 ASSERT_TRUE(socket->isClosedBySelf());
1118 ASSERT_FALSE(socket->isClosedByPeer());
1121 TEST(AsyncSocketTest, WriteIOBufCorked) {
1126 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1128 socket->connect(&ccb, server.getAddress(), 30);
1130 // Accept the connection
1131 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1133 acceptedSocket->setReadCB(&rcb);
1135 // Do three writes, 100ms apart, with the "cork" flag set
1136 // on the second write. The reader should see the first write
1137 // arrive by itself, followed by the second and third writes
1138 // arriving together.
1139 size_t buf1Length = 5;
1140 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1141 memset(buf1->writableData(), 'a', buf1Length);
1142 buf1->append(buf1Length);
1143 size_t buf2Length = 7;
1144 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1145 memset(buf2->writableData(), 'b', buf2Length);
1146 buf2->append(buf2Length);
1147 size_t buf3Length = 11;
1148 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1149 memset(buf3->writableData(), 'c', buf3Length);
1150 buf3->append(buf3Length);
1152 socket->writeChain(&wcb1, std::move(buf1));
1154 DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1155 write2.scheduleTimeout(100);
1157 DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1158 write3.scheduleTimeout(140);
1161 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1162 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1163 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1164 if (wcb3.state != STATE_SUCCEEDED) {
1165 throw(wcb3.exception);
1167 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1169 // Make sure the reader got the data with the right grouping
1170 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1171 CHECK_EQ(rcb.buffers.size(), 2);
1172 CHECK_EQ(rcb.buffers[0].length, buf1Length);
1173 CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1175 acceptedSocket->close();
1178 ASSERT_TRUE(socket->isClosedBySelf());
1179 ASSERT_FALSE(socket->isClosedByPeer());
1183 * Test performing a zero-length write
1185 TEST(AsyncSocketTest, ZeroLengthWrite) {
1190 std::shared_ptr<AsyncSocket> socket =
1191 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1192 evb.loop(); // loop until the socket is connected
1194 auto acceptedSocket = server.acceptAsync(&evb);
1196 acceptedSocket->setReadCB(&rcb);
1198 size_t len1 = 1024*1024;
1199 size_t len2 = 1024*1024;
1200 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1201 memset(buf.get(), 'a', len1);
1202 memset(buf.get(), 'b', len2);
1208 socket->write(&wcb1, buf.get(), 0);
1209 socket->write(&wcb2, buf.get(), len1);
1210 socket->write(&wcb3, buf.get() + len1, 0);
1211 socket->write(&wcb4, buf.get() + len1, len2);
1214 evb.loop(); // loop until the data is sent
1216 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1217 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1218 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1219 CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1220 rcb.verifyData(buf.get(), len1 + len2);
1222 ASSERT_TRUE(socket->isClosedBySelf());
1223 ASSERT_FALSE(socket->isClosedByPeer());
1226 TEST(AsyncSocketTest, ZeroLengthWritev) {
1231 std::shared_ptr<AsyncSocket> socket =
1232 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1233 evb.loop(); // loop until the socket is connected
1235 auto acceptedSocket = server.acceptAsync(&evb);
1237 acceptedSocket->setReadCB(&rcb);
1239 size_t len1 = 1024*1024;
1240 size_t len2 = 1024*1024;
1241 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1242 memset(buf.get(), 'a', len1);
1243 memset(buf.get(), 'b', len2);
1246 size_t iovCount = 4;
1247 struct iovec iov[iovCount];
1248 iov[0].iov_base = buf.get();
1249 iov[0].iov_len = len1;
1250 iov[1].iov_base = buf.get() + len1;
1252 iov[2].iov_base = buf.get() + len1;
1253 iov[2].iov_len = len2;
1254 iov[3].iov_base = buf.get() + len1 + len2;
1257 socket->writev(&wcb, iov, iovCount);
1259 evb.loop(); // loop until the data is sent
1261 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1262 rcb.verifyData(buf.get(), len1 + len2);
1264 ASSERT_TRUE(socket->isClosedBySelf());
1265 ASSERT_FALSE(socket->isClosedByPeer());
1268 ///////////////////////////////////////////////////////////////////////////
1269 // close() related tests
1270 ///////////////////////////////////////////////////////////////////////////
1273 * Test calling close() with pending writes when the socket is already closing.
1275 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1280 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1282 socket->connect(&ccb, server.getAddress(), 30);
1284 // accept the socket on the server side
1285 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1287 // Loop to ensure the connect has completed
1290 // Make sure we are connected
1291 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1293 // Schedule pending writes, until several write attempts have blocked
1295 memset(buf, 'a', sizeof(buf));
1296 typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1297 WriteCallbackVector writeCallbacks;
1299 writeCallbacks.reserve(5);
1300 while (writeCallbacks.size() < 5) {
1301 std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1303 socket->write(wcb.get(), buf, sizeof(buf));
1304 if (wcb->state == STATE_SUCCEEDED) {
1305 // Succeeded immediately. Keep performing more writes
1309 // This write is blocked.
1310 // Have the write callback call close() when writeError() is invoked
1311 wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1312 writeCallbacks.push_back(wcb);
1315 // Call closeNow() to immediately fail the pending writes
1318 // Make sure writeError() was invoked on all of the pending write callbacks
1319 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1320 it != writeCallbacks.end();
1322 CHECK_EQ((*it)->state, STATE_FAILED);
1325 ASSERT_TRUE(socket->isClosedBySelf());
1326 ASSERT_FALSE(socket->isClosedByPeer());
1329 ///////////////////////////////////////////////////////////////////////////
1330 // ImmediateRead related tests
1331 ///////////////////////////////////////////////////////////////////////////
1333 /* AsyncSocket use to verify immediate read works */
1334 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1336 bool immediateReadCalled = false;
1337 explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1339 void checkForImmediateRead() noexcept override {
1340 immediateReadCalled = true;
1341 AsyncSocket::handleRead();
1345 TEST(AsyncSocket, ConnectReadImmediateRead) {
1348 const size_t maxBufferSz = 100;
1349 const size_t maxReadsPerEvent = 1;
1350 const size_t expectedDataSz = maxBufferSz * 3;
1351 char expectedData[expectedDataSz];
1352 memset(expectedData, 'j', expectedDataSz);
1355 ReadCallback rcb(maxBufferSz);
1356 AsyncSocketImmediateRead socket(&evb);
1357 socket.connect(nullptr, server.getAddress(), 30);
1359 evb.loop(); // loop until the socket is connected
1361 socket.setReadCB(&rcb);
1362 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1363 socket.immediateReadCalled = false;
1365 auto acceptedSocket = server.acceptAsync(&evb);
1367 ReadCallback rcbServer;
1368 WriteCallback wcbServer;
1369 rcbServer.dataAvailableCallback = [&]() {
1370 if (rcbServer.dataRead() == expectedDataSz) {
1371 // write back all data read
1372 rcbServer.verifyData(expectedData, expectedDataSz);
1373 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1374 acceptedSocket->close();
1377 acceptedSocket->setReadCB(&rcbServer);
1381 socket.write(&wcb1, expectedData, expectedDataSz);
1383 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1384 rcb.verifyData(expectedData, expectedDataSz);
1385 CHECK_EQ(socket.immediateReadCalled, true);
1387 ASSERT_FALSE(socket.isClosedBySelf());
1388 ASSERT_FALSE(socket.isClosedByPeer());
1391 TEST(AsyncSocket, ConnectReadUninstallRead) {
1394 const size_t maxBufferSz = 100;
1395 const size_t maxReadsPerEvent = 1;
1396 const size_t expectedDataSz = maxBufferSz * 3;
1397 char expectedData[expectedDataSz];
1398 memset(expectedData, 'k', expectedDataSz);
1401 ReadCallback rcb(maxBufferSz);
1402 AsyncSocketImmediateRead socket(&evb);
1403 socket.connect(nullptr, server.getAddress(), 30);
1405 evb.loop(); // loop until the socket is connected
1407 socket.setReadCB(&rcb);
1408 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1409 socket.immediateReadCalled = false;
1411 auto acceptedSocket = server.acceptAsync(&evb);
1413 ReadCallback rcbServer;
1414 WriteCallback wcbServer;
1415 rcbServer.dataAvailableCallback = [&]() {
1416 if (rcbServer.dataRead() == expectedDataSz) {
1417 // write back all data read
1418 rcbServer.verifyData(expectedData, expectedDataSz);
1419 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1420 acceptedSocket->close();
1423 acceptedSocket->setReadCB(&rcbServer);
1425 rcb.dataAvailableCallback = [&]() {
1426 // we read data and reset readCB
1427 socket.setReadCB(nullptr);
1432 socket.write(&wcb, expectedData, expectedDataSz);
1434 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1436 /* we shoud've only read maxBufferSz data since readCallback_
1437 * was reset in dataAvailableCallback */
1438 CHECK_EQ(rcb.dataRead(), maxBufferSz);
1439 CHECK_EQ(socket.immediateReadCalled, false);
1441 ASSERT_FALSE(socket.isClosedBySelf());
1442 ASSERT_FALSE(socket.isClosedByPeer());
1446 // - Test connect() and have the connect callback set the read callback
1447 // - Test connect() and have the connect callback unset the read callback
1448 // - Test reading/writing/closing/destroying the socket in the connect callback
1449 // - Test reading/writing/closing/destroying the socket in the read callback
1450 // - Test reading/writing/closing/destroying the socket in the write callback
1451 // - Test one-way shutdown behavior
1452 // - Test changing the EventBase
1454 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1455 // in connectSuccess(), readDataAvailable(), writeSuccess()
1458 ///////////////////////////////////////////////////////////////////////////
1459 // AsyncServerSocket tests
1460 ///////////////////////////////////////////////////////////////////////////
1463 * Helper ConnectionEventCallback class for the test code.
1464 * It maintains counters protected by a spin lock.
1466 class TestConnectionEventCallback :
1467 public AsyncServerSocket::ConnectionEventCallback {
1469 virtual void onConnectionAccepted(
1470 const int /* socket */,
1471 const SocketAddress& /* addr */) noexcept override {
1472 folly::RWSpinLock::WriteHolder holder(spinLock_);
1473 connectionAccepted_++;
1476 virtual void onConnectionAcceptError(const int /* err */) noexcept override {
1477 folly::RWSpinLock::WriteHolder holder(spinLock_);
1478 connectionAcceptedError_++;
1481 virtual void onConnectionDropped(
1482 const int /* socket */,
1483 const SocketAddress& /* addr */) noexcept override {
1484 folly::RWSpinLock::WriteHolder holder(spinLock_);
1485 connectionDropped_++;
1488 virtual void onConnectionEnqueuedForAcceptorCallback(
1489 const int /* socket */,
1490 const SocketAddress& /* addr */) noexcept override {
1491 folly::RWSpinLock::WriteHolder holder(spinLock_);
1492 connectionEnqueuedForAcceptCallback_++;
1495 virtual void onConnectionDequeuedByAcceptorCallback(
1496 const int /* socket */,
1497 const SocketAddress& /* addr */) noexcept override {
1498 folly::RWSpinLock::WriteHolder holder(spinLock_);
1499 connectionDequeuedByAcceptCallback_++;
1502 virtual void onBackoffStarted() noexcept override {
1503 folly::RWSpinLock::WriteHolder holder(spinLock_);
1507 virtual void onBackoffEnded() noexcept override {
1508 folly::RWSpinLock::WriteHolder holder(spinLock_);
1512 virtual void onBackoffError() noexcept override {
1513 folly::RWSpinLock::WriteHolder holder(spinLock_);
1517 unsigned int getConnectionAccepted() const {
1518 folly::RWSpinLock::ReadHolder holder(spinLock_);
1519 return connectionAccepted_;
1522 unsigned int getConnectionAcceptedError() const {
1523 folly::RWSpinLock::ReadHolder holder(spinLock_);
1524 return connectionAcceptedError_;
1527 unsigned int getConnectionDropped() const {
1528 folly::RWSpinLock::ReadHolder holder(spinLock_);
1529 return connectionDropped_;
1532 unsigned int getConnectionEnqueuedForAcceptCallback() const {
1533 folly::RWSpinLock::ReadHolder holder(spinLock_);
1534 return connectionEnqueuedForAcceptCallback_;
1537 unsigned int getConnectionDequeuedByAcceptCallback() const {
1538 folly::RWSpinLock::ReadHolder holder(spinLock_);
1539 return connectionDequeuedByAcceptCallback_;
1542 unsigned int getBackoffStarted() const {
1543 folly::RWSpinLock::ReadHolder holder(spinLock_);
1544 return backoffStarted_;
1547 unsigned int getBackoffEnded() const {
1548 folly::RWSpinLock::ReadHolder holder(spinLock_);
1549 return backoffEnded_;
1552 unsigned int getBackoffError() const {
1553 folly::RWSpinLock::ReadHolder holder(spinLock_);
1554 return backoffError_;
1558 mutable folly::RWSpinLock spinLock_;
1559 unsigned int connectionAccepted_{0};
1560 unsigned int connectionAcceptedError_{0};
1561 unsigned int connectionDropped_{0};
1562 unsigned int connectionEnqueuedForAcceptCallback_{0};
1563 unsigned int connectionDequeuedByAcceptCallback_{0};
1564 unsigned int backoffStarted_{0};
1565 unsigned int backoffEnded_{0};
1566 unsigned int backoffError_{0};
1570 * Helper AcceptCallback class for the test code
1571 * It records the callbacks that were invoked, and also supports calling
1572 * generic std::function objects in each callback.
1574 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1583 EventInfo(int fd, const folly::SocketAddress& addr)
1584 : type(TYPE_ACCEPT),
1588 explicit EventInfo(const std::string& msg)
1593 explicit EventInfo(EventType et)
1600 int fd; // valid for TYPE_ACCEPT
1601 folly::SocketAddress address; // valid for TYPE_ACCEPT
1602 string errorMsg; // valid for TYPE_ERROR
1604 typedef std::deque<EventInfo> EventList;
1606 TestAcceptCallback()
1607 : connectionAcceptedFn_(),
1612 std::deque<EventInfo>* getEvents() {
1616 void setConnectionAcceptedFn(
1617 const std::function<void(int, const folly::SocketAddress&)>& fn) {
1618 connectionAcceptedFn_ = fn;
1620 void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1621 acceptErrorFn_ = fn;
1623 void setAcceptStartedFn(const std::function<void()>& fn) {
1624 acceptStartedFn_ = fn;
1626 void setAcceptStoppedFn(const std::function<void()>& fn) {
1627 acceptStoppedFn_ = fn;
1630 void connectionAccepted(
1631 int fd, const folly::SocketAddress& clientAddr) noexcept override {
1632 events_.emplace_back(fd, clientAddr);
1634 if (connectionAcceptedFn_) {
1635 connectionAcceptedFn_(fd, clientAddr);
1638 void acceptError(const std::exception& ex) noexcept override {
1639 events_.emplace_back(ex.what());
1641 if (acceptErrorFn_) {
1645 void acceptStarted() noexcept override {
1646 events_.emplace_back(TYPE_START);
1648 if (acceptStartedFn_) {
1652 void acceptStopped() noexcept override {
1653 events_.emplace_back(TYPE_STOP);
1655 if (acceptStoppedFn_) {
1661 std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1662 std::function<void(const std::exception&)> acceptErrorFn_;
1663 std::function<void()> acceptStartedFn_;
1664 std::function<void()> acceptStoppedFn_;
1666 std::deque<EventInfo> events_;
1671 * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1673 TEST(AsyncSocketTest, ServerAcceptOptions) {
1674 EventBase eventBase;
1676 // Create a server socket
1677 std::shared_ptr<AsyncServerSocket> serverSocket(
1678 AsyncServerSocket::newSocket(&eventBase));
1679 serverSocket->bind(0);
1680 serverSocket->listen(16);
1681 folly::SocketAddress serverAddress;
1682 serverSocket->getAddress(&serverAddress);
1684 // Add a callback to accept one connection then stop the loop
1685 TestAcceptCallback acceptCallback;
1686 acceptCallback.setConnectionAcceptedFn(
1687 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1688 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1690 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1691 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1693 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1694 serverSocket->startAccepting();
1696 // Connect to the server socket
1697 std::shared_ptr<AsyncSocket> socket(
1698 AsyncSocket::newSocket(&eventBase, serverAddress));
1702 // Verify that the server accepted a connection
1703 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1704 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1705 TestAcceptCallback::TYPE_START);
1706 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1707 TestAcceptCallback::TYPE_ACCEPT);
1708 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1709 TestAcceptCallback::TYPE_STOP);
1710 int fd = acceptCallback.getEvents()->at(1).fd;
1712 // The accepted connection should already be in non-blocking mode
1713 int flags = fcntl(fd, F_GETFL, 0);
1714 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1717 // The accepted connection should already have TCP_NODELAY set
1719 socklen_t valueLength = sizeof(value);
1720 int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1727 * Test AsyncServerSocket::removeAcceptCallback()
1729 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1730 // Create a new AsyncServerSocket
1731 EventBase eventBase;
1732 std::shared_ptr<AsyncServerSocket> serverSocket(
1733 AsyncServerSocket::newSocket(&eventBase));
1734 serverSocket->bind(0);
1735 serverSocket->listen(16);
1736 folly::SocketAddress serverAddress;
1737 serverSocket->getAddress(&serverAddress);
1739 // Add several accept callbacks
1740 TestAcceptCallback cb1;
1741 TestAcceptCallback cb2;
1742 TestAcceptCallback cb3;
1743 TestAcceptCallback cb4;
1744 TestAcceptCallback cb5;
1745 TestAcceptCallback cb6;
1746 TestAcceptCallback cb7;
1748 // Test having callbacks remove other callbacks before them on the list,
1749 // after them on the list, or removing themselves.
1751 // Have callback 2 remove callback 3 and callback 5 the first time it is
1754 cb1.setConnectionAcceptedFn([&](int /* fd */,
1755 const folly::SocketAddress& /* addr */) {
1756 std::shared_ptr<AsyncSocket> sock2(
1757 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1759 cb3.setConnectionAcceptedFn(
1760 [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1761 cb4.setConnectionAcceptedFn(
1762 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1763 std::shared_ptr<AsyncSocket> sock3(
1764 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1766 cb5.setConnectionAcceptedFn(
1767 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1768 std::shared_ptr<AsyncSocket> sock5(
1769 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1772 cb2.setConnectionAcceptedFn(
1773 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1774 if (cb2Count == 0) {
1775 serverSocket->removeAcceptCallback(&cb3, nullptr);
1776 serverSocket->removeAcceptCallback(&cb5, nullptr);
1780 // Have callback 6 remove callback 4 the first time it is called,
1781 // and destroy the server socket the second time it is called
1783 cb6.setConnectionAcceptedFn(
1784 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1785 if (cb6Count == 0) {
1786 serverSocket->removeAcceptCallback(&cb4, nullptr);
1787 std::shared_ptr<AsyncSocket> sock6(
1788 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1789 std::shared_ptr<AsyncSocket> sock7(
1790 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1791 std::shared_ptr<AsyncSocket> sock8(
1792 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1795 serverSocket.reset();
1799 // Have callback 7 remove itself
1800 cb7.setConnectionAcceptedFn(
1801 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1802 serverSocket->removeAcceptCallback(&cb7, nullptr);
1805 serverSocket->addAcceptCallback(&cb1, nullptr);
1806 serverSocket->addAcceptCallback(&cb2, nullptr);
1807 serverSocket->addAcceptCallback(&cb3, nullptr);
1808 serverSocket->addAcceptCallback(&cb4, nullptr);
1809 serverSocket->addAcceptCallback(&cb5, nullptr);
1810 serverSocket->addAcceptCallback(&cb6, nullptr);
1811 serverSocket->addAcceptCallback(&cb7, nullptr);
1812 serverSocket->startAccepting();
1814 // Make several connections to the socket
1815 std::shared_ptr<AsyncSocket> sock1(
1816 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1817 std::shared_ptr<AsyncSocket> sock4(
1818 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1820 // Loop until we are stopped
1823 // Check to make sure that the expected callbacks were invoked.
1825 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1826 // the AcceptCallbacks in round-robin fashion, in the order that they were
1827 // added. The code is implemented this way right now, but the API doesn't
1828 // explicitly require it be done this way. If we change the code not to be
1829 // exactly round robin in the future, we can simplify the test checks here.
1830 // (We'll also need to update the termination code, since we expect cb6 to
1831 // get called twice to terminate the loop.)
1832 CHECK_EQ(cb1.getEvents()->size(), 4);
1833 CHECK_EQ(cb1.getEvents()->at(0).type,
1834 TestAcceptCallback::TYPE_START);
1835 CHECK_EQ(cb1.getEvents()->at(1).type,
1836 TestAcceptCallback::TYPE_ACCEPT);
1837 CHECK_EQ(cb1.getEvents()->at(2).type,
1838 TestAcceptCallback::TYPE_ACCEPT);
1839 CHECK_EQ(cb1.getEvents()->at(3).type,
1840 TestAcceptCallback::TYPE_STOP);
1842 CHECK_EQ(cb2.getEvents()->size(), 4);
1843 CHECK_EQ(cb2.getEvents()->at(0).type,
1844 TestAcceptCallback::TYPE_START);
1845 CHECK_EQ(cb2.getEvents()->at(1).type,
1846 TestAcceptCallback::TYPE_ACCEPT);
1847 CHECK_EQ(cb2.getEvents()->at(2).type,
1848 TestAcceptCallback::TYPE_ACCEPT);
1849 CHECK_EQ(cb2.getEvents()->at(3).type,
1850 TestAcceptCallback::TYPE_STOP);
1852 CHECK_EQ(cb3.getEvents()->size(), 2);
1853 CHECK_EQ(cb3.getEvents()->at(0).type,
1854 TestAcceptCallback::TYPE_START);
1855 CHECK_EQ(cb3.getEvents()->at(1).type,
1856 TestAcceptCallback::TYPE_STOP);
1858 CHECK_EQ(cb4.getEvents()->size(), 3);
1859 CHECK_EQ(cb4.getEvents()->at(0).type,
1860 TestAcceptCallback::TYPE_START);
1861 CHECK_EQ(cb4.getEvents()->at(1).type,
1862 TestAcceptCallback::TYPE_ACCEPT);
1863 CHECK_EQ(cb4.getEvents()->at(2).type,
1864 TestAcceptCallback::TYPE_STOP);
1866 CHECK_EQ(cb5.getEvents()->size(), 2);
1867 CHECK_EQ(cb5.getEvents()->at(0).type,
1868 TestAcceptCallback::TYPE_START);
1869 CHECK_EQ(cb5.getEvents()->at(1).type,
1870 TestAcceptCallback::TYPE_STOP);
1872 CHECK_EQ(cb6.getEvents()->size(), 4);
1873 CHECK_EQ(cb6.getEvents()->at(0).type,
1874 TestAcceptCallback::TYPE_START);
1875 CHECK_EQ(cb6.getEvents()->at(1).type,
1876 TestAcceptCallback::TYPE_ACCEPT);
1877 CHECK_EQ(cb6.getEvents()->at(2).type,
1878 TestAcceptCallback::TYPE_ACCEPT);
1879 CHECK_EQ(cb6.getEvents()->at(3).type,
1880 TestAcceptCallback::TYPE_STOP);
1882 CHECK_EQ(cb7.getEvents()->size(), 3);
1883 CHECK_EQ(cb7.getEvents()->at(0).type,
1884 TestAcceptCallback::TYPE_START);
1885 CHECK_EQ(cb7.getEvents()->at(1).type,
1886 TestAcceptCallback::TYPE_ACCEPT);
1887 CHECK_EQ(cb7.getEvents()->at(2).type,
1888 TestAcceptCallback::TYPE_STOP);
1892 * Test AsyncServerSocket::removeAcceptCallback()
1894 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1895 // Create a new AsyncServerSocket
1896 EventBase eventBase;
1897 std::shared_ptr<AsyncServerSocket> serverSocket(
1898 AsyncServerSocket::newSocket(&eventBase));
1899 serverSocket->bind(0);
1900 serverSocket->listen(16);
1901 folly::SocketAddress serverAddress;
1902 serverSocket->getAddress(&serverAddress);
1904 // Add several accept callbacks
1905 TestAcceptCallback cb1;
1906 auto thread_id = pthread_self();
1907 cb1.setAcceptStartedFn([&](){
1908 CHECK_NE(thread_id, pthread_self());
1909 thread_id = pthread_self();
1911 cb1.setConnectionAcceptedFn(
1912 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1913 CHECK_EQ(thread_id, pthread_self());
1914 serverSocket->removeAcceptCallback(&cb1, nullptr);
1916 cb1.setAcceptStoppedFn([&](){
1917 CHECK_EQ(thread_id, pthread_self());
1920 // Test having callbacks remove other callbacks before them on the list,
1921 serverSocket->addAcceptCallback(&cb1, nullptr);
1922 serverSocket->startAccepting();
1924 // Make several connections to the socket
1925 std::shared_ptr<AsyncSocket> sock1(
1926 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1928 // Loop in another thread
1929 auto other = std::thread([&](){
1934 // Check to make sure that the expected callbacks were invoked.
1936 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1937 // the AcceptCallbacks in round-robin fashion, in the order that they were
1938 // added. The code is implemented this way right now, but the API doesn't
1939 // explicitly require it be done this way. If we change the code not to be
1940 // exactly round robin in the future, we can simplify the test checks here.
1941 // (We'll also need to update the termination code, since we expect cb6 to
1942 // get called twice to terminate the loop.)
1943 CHECK_EQ(cb1.getEvents()->size(), 3);
1944 CHECK_EQ(cb1.getEvents()->at(0).type,
1945 TestAcceptCallback::TYPE_START);
1946 CHECK_EQ(cb1.getEvents()->at(1).type,
1947 TestAcceptCallback::TYPE_ACCEPT);
1948 CHECK_EQ(cb1.getEvents()->at(2).type,
1949 TestAcceptCallback::TYPE_STOP);
1953 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1954 // Add a callback to accept one connection then stop accepting
1955 TestAcceptCallback acceptCallback;
1956 acceptCallback.setConnectionAcceptedFn(
1957 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1958 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1960 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1961 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1963 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1964 serverSocket->startAccepting();
1966 // Connect to the server socket
1967 EventBase* eventBase = serverSocket->getEventBase();
1968 folly::SocketAddress serverAddress;
1969 serverSocket->getAddress(&serverAddress);
1970 AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1972 // Loop to process all events
1975 // Verify that the server accepted a connection
1976 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1977 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1978 TestAcceptCallback::TYPE_START);
1979 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1980 TestAcceptCallback::TYPE_ACCEPT);
1981 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1982 TestAcceptCallback::TYPE_STOP);
1985 /* Verify that we don't leak sockets if we are destroyed()
1986 * and there are still writes pending
1988 * If destroy() only calls close() instead of closeNow(),
1989 * it would shutdown(writes) on the socket, but it would
1990 * never be close()'d, and the socket would leak
1992 TEST(AsyncSocketTest, DestroyCloseTest) {
1998 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
2000 socket->connect(&ccb, server.getAddress(), 30);
2002 // Accept the connection
2003 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
2005 acceptedSocket->setReadCB(&rcb);
2007 // Write a large buffer to the socket that is larger than kernel buffer
2008 size_t simpleBufLength = 5000000;
2009 char* simpleBuf = new char[simpleBufLength];
2010 memset(simpleBuf, 'a', simpleBufLength);
2013 // Let the reads and writes run to completion
2014 int fd = acceptedSocket->getFd();
2016 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2018 acceptedSocket.reset();
2020 // Test that server socket was closed
2021 ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2028 * Test AsyncServerSocket::useExistingSocket()
2030 TEST(AsyncSocketTest, ServerExistingSocket) {
2031 EventBase eventBase;
2033 // Test creating a socket, and letting AsyncServerSocket bind and listen
2035 // Manually create a socket
2036 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2039 // Create a server socket
2040 AsyncServerSocket::UniquePtr serverSocket(
2041 new AsyncServerSocket(&eventBase));
2042 serverSocket->useExistingSocket(fd);
2043 folly::SocketAddress address;
2044 serverSocket->getAddress(&address);
2046 serverSocket->bind(address);
2047 serverSocket->listen(16);
2049 // Make sure the socket works
2050 serverSocketSanityTest(serverSocket.get());
2053 // Test creating a socket and binding manually,
2054 // then letting AsyncServerSocket listen
2056 // Manually create a socket
2057 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2060 struct sockaddr_in addr;
2061 addr.sin_family = AF_INET;
2063 addr.sin_addr.s_addr = INADDR_ANY;
2064 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2066 // Look up the address that we bound to
2067 folly::SocketAddress boundAddress;
2068 boundAddress.setFromLocalAddress(fd);
2070 // Create a server socket
2071 AsyncServerSocket::UniquePtr serverSocket(
2072 new AsyncServerSocket(&eventBase));
2073 serverSocket->useExistingSocket(fd);
2074 serverSocket->listen(16);
2076 // Make sure AsyncServerSocket reports the same address that we bound to
2077 folly::SocketAddress serverSocketAddress;
2078 serverSocket->getAddress(&serverSocketAddress);
2079 CHECK_EQ(boundAddress, serverSocketAddress);
2081 // Make sure the socket works
2082 serverSocketSanityTest(serverSocket.get());
2085 // Test creating a socket, binding and listening manually,
2086 // then giving it to AsyncServerSocket
2088 // Manually create a socket
2089 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2092 struct sockaddr_in addr;
2093 addr.sin_family = AF_INET;
2095 addr.sin_addr.s_addr = INADDR_ANY;
2096 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2098 // Look up the address that we bound to
2099 folly::SocketAddress boundAddress;
2100 boundAddress.setFromLocalAddress(fd);
2102 CHECK_EQ(listen(fd, 16), 0);
2104 // Create a server socket
2105 AsyncServerSocket::UniquePtr serverSocket(
2106 new AsyncServerSocket(&eventBase));
2107 serverSocket->useExistingSocket(fd);
2109 // Make sure AsyncServerSocket reports the same address that we bound to
2110 folly::SocketAddress serverSocketAddress;
2111 serverSocket->getAddress(&serverSocketAddress);
2112 CHECK_EQ(boundAddress, serverSocketAddress);
2114 // Make sure the socket works
2115 serverSocketSanityTest(serverSocket.get());
2119 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2120 EventBase eventBase;
2122 // Create a server socket
2123 std::shared_ptr<AsyncServerSocket> serverSocket(
2124 AsyncServerSocket::newSocket(&eventBase));
2126 path.append(folly::to<string>("/anonymous", folly::Random::rand64()));
2127 folly::SocketAddress serverAddress;
2128 serverAddress.setFromPath(path);
2129 serverSocket->bind(serverAddress);
2130 serverSocket->listen(16);
2132 // Add a callback to accept one connection then stop the loop
2133 TestAcceptCallback acceptCallback;
2134 acceptCallback.setConnectionAcceptedFn(
2135 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2136 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2138 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2139 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2141 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2142 serverSocket->startAccepting();
2144 // Connect to the server socket
2145 std::shared_ptr<AsyncSocket> socket(
2146 AsyncSocket::newSocket(&eventBase, serverAddress));
2150 // Verify that the server accepted a connection
2151 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2152 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2153 TestAcceptCallback::TYPE_START);
2154 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2155 TestAcceptCallback::TYPE_ACCEPT);
2156 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2157 TestAcceptCallback::TYPE_STOP);
2158 int fd = acceptCallback.getEvents()->at(1).fd;
2160 // The accepted connection should already be in non-blocking mode
2161 int flags = fcntl(fd, F_GETFL, 0);
2162 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2165 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2166 EventBase eventBase;
2167 TestConnectionEventCallback connectionEventCallback;
2169 // Create a server socket
2170 std::shared_ptr<AsyncServerSocket> serverSocket(
2171 AsyncServerSocket::newSocket(&eventBase));
2172 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2173 serverSocket->bind(0);
2174 serverSocket->listen(16);
2175 folly::SocketAddress serverAddress;
2176 serverSocket->getAddress(&serverAddress);
2178 // Add a callback to accept one connection then stop the loop
2179 TestAcceptCallback acceptCallback;
2180 acceptCallback.setConnectionAcceptedFn(
2181 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2182 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2184 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2185 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2187 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2188 serverSocket->startAccepting();
2190 // Connect to the server socket
2191 std::shared_ptr<AsyncSocket> socket(
2192 AsyncSocket::newSocket(&eventBase, serverAddress));
2196 // Validate the connection event counters
2197 ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2198 ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2199 ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2201 connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2202 ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2203 ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2204 ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2205 ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2209 * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2211 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2212 EventBase eventBase;
2214 // Counter of how many connections have been accepted
2217 // Create a server socket
2218 auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2219 serverSocket->bind(0);
2220 serverSocket->listen(16);
2221 folly::SocketAddress serverAddress;
2222 serverSocket->getAddress(&serverAddress);
2224 // Add a callback to accept connections
2225 TestAcceptCallback acceptCallback;
2226 acceptCallback.setConnectionAcceptedFn(
2227 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2229 CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2232 // all messages are processed, remove accept callback
2233 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2236 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2237 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2239 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2240 serverSocket->startAccepting();
2242 // Connect to the server socket, 4 clients, there are 4 connections
2243 auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2244 auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2245 auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2246 auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2252 * Test AsyncTransport::BufferCallback
2254 TEST(AsyncSocketTest, BufferTest) {
2258 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2259 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2261 socket->connect(&ccb, server.getAddress(), 30, option);
2263 char buf[100 * 1024];
2264 memset(buf, 'c', sizeof(buf));
2267 socket->setBufferCallback(&bcb);
2268 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2271 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2272 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
2274 ASSERT_TRUE(bcb.hasBuffered());
2275 ASSERT_TRUE(bcb.hasBufferCleared());
2278 server.verifyConnection(buf, sizeof(buf));
2280 ASSERT_TRUE(socket->isClosedBySelf());
2281 ASSERT_FALSE(socket->isClosedByPeer());
2284 TEST(AsyncSocketTest, BufferCallbackKill) {
2287 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2288 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2290 socket->connect(&ccb, server.getAddress(), 30, option);
2293 char buf[100 * 1024];
2294 memset(buf, 'c', sizeof(buf));
2295 BufferCallback* bcb = new BufferCallback;
2296 socket->setBufferCallback(bcb);
2298 wcb.successCallback = [&] {
2299 ASSERT_TRUE(socket.unique());
2303 // This will trigger AsyncSocket::handleWrite,
2304 // which calls WriteCallback::writeSuccess,
2305 // which calls wcb.successCallback above,
2306 // which tries to delete socket
2307 // Then, the socket will also try to use this BufferCallback
2308 // And that should crash us, if there is no DestructorGuard on the stack
2309 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2312 CHECK_EQ(ccb.state, STATE_SUCCEEDED);