2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/RWSpinLock.h>
21 #include <folly/SocketAddress.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/async/test/AsyncSocketTest.h>
25 #include <folly/io/async/test/Util.h>
26 #include <folly/test/SocketAddressTestHelper.h>
28 #include <gtest/gtest.h>
29 #include <boost/scoped_array.hpp>
34 #include <sys/types.h>
35 #include <sys/socket.h>
36 #include <netinet/tcp.h>
39 using namespace boost;
46 using std::unique_ptr;
47 using std::chrono::milliseconds;
48 using boost::scoped_array;
50 using namespace folly;
52 class DelayedWrite: public AsyncTimeout {
54 DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
55 unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
56 bool cork, bool lastWrite = false):
57 AsyncTimeout(socket->getEventBase()),
59 bufs_(std::move(bufs)),
62 lastWrite_(lastWrite) {}
65 void timeoutExpired() noexcept override {
66 WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
67 socket_->writeChain(wcb_, std::move(bufs_), flags);
69 socket_->shutdownWrite();
73 std::shared_ptr<AsyncSocket> socket_;
74 unique_ptr<IOBuf> bufs_;
75 AsyncTransportWrapper::WriteCallback* wcb_;
80 ///////////////////////////////////////////////////////////////////////////
82 ///////////////////////////////////////////////////////////////////////////
85 * Test connecting to a server
87 TEST(AsyncSocketTest, Connect) {
88 // Start listening on a local port
91 // Connect using a AsyncSocket
93 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
95 socket->connect(&cb, server.getAddress(), 30);
99 CHECK_EQ(cb.state, STATE_SUCCEEDED);
100 EXPECT_LE(0, socket->getConnectTime().count());
101 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
105 * Test connecting to a server that isn't listening
107 TEST(AsyncSocketTest, ConnectRefused) {
110 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
112 // Hopefully nothing is actually listening on this address
113 folly::SocketAddress addr("127.0.0.1", 65535);
115 socket->connect(&cb, addr, 30);
119 CHECK_EQ(cb.state, STATE_FAILED);
120 CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
121 EXPECT_LE(0, socket->getConnectTime().count());
122 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
126 * Test connection timeout
128 TEST(AsyncSocketTest, ConnectTimeout) {
131 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
133 // Try connecting to server that won't respond.
135 // This depends somewhat on the network where this test is run.
136 // Hopefully this IP will be routable but unresponsive.
137 // (Alternatively, we could try listening on a local raw socket, but that
138 // normally requires root privileges.)
140 SocketAddressTestHelper::isIPv6Enabled() ?
141 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
142 SocketAddressTestHelper::isIPv4Enabled() ?
143 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
145 SocketAddress addr(host, 65535);
147 socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
151 CHECK_EQ(cb.state, STATE_FAILED);
152 CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
154 // Verify that we can still get the peer address after a timeout.
155 // Use case is if the client was created from a client pool, and we want
156 // to log which peer failed.
157 folly::SocketAddress peer;
158 socket->getPeerAddress(&peer);
159 CHECK_EQ(peer, addr);
160 EXPECT_LE(0, socket->getConnectTime().count());
161 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(1));
165 * Test writing immediately after connecting, without waiting for connect
168 TEST(AsyncSocketTest, ConnectAndWrite) {
173 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
175 socket->connect(&ccb, server.getAddress(), 30);
179 memset(buf, 'a', sizeof(buf));
181 socket->write(&wcb, buf, sizeof(buf));
183 // Loop. We don't bother accepting on the server socket yet.
184 // The kernel should be able to buffer the write request so it can succeed.
187 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
188 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
190 // Make sure the server got a connection and received the data
192 server.verifyConnection(buf, sizeof(buf));
194 ASSERT_TRUE(socket->isClosedBySelf());
195 ASSERT_FALSE(socket->isClosedByPeer());
196 EXPECT_EQ(socket->getConnectTimeout(), std::chrono::milliseconds(30));
200 * Test connecting using a nullptr connect callback.
202 TEST(AsyncSocketTest, ConnectNullCallback) {
207 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
208 socket->connect(nullptr, server.getAddress(), 30);
210 // write some data, just so we have some way of verifing
211 // that the socket works correctly after connecting
213 memset(buf, 'a', sizeof(buf));
215 socket->write(&wcb, buf, sizeof(buf));
219 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
221 // Make sure the server got a connection and received the data
223 server.verifyConnection(buf, sizeof(buf));
225 ASSERT_TRUE(socket->isClosedBySelf());
226 ASSERT_FALSE(socket->isClosedByPeer());
230 * Test calling both write() and close() immediately after connecting, without
231 * waiting for connect to finish.
233 * This exercises the STATE_CONNECTING_CLOSING code.
235 TEST(AsyncSocketTest, ConnectWriteAndClose) {
240 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
242 socket->connect(&ccb, server.getAddress(), 30);
246 memset(buf, 'a', sizeof(buf));
248 socket->write(&wcb, buf, sizeof(buf));
253 // Loop. We don't bother accepting on the server socket yet.
254 // The kernel should be able to buffer the write request so it can succeed.
257 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
258 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
260 // Make sure the server got a connection and received the data
261 server.verifyConnection(buf, sizeof(buf));
263 ASSERT_TRUE(socket->isClosedBySelf());
264 ASSERT_FALSE(socket->isClosedByPeer());
268 * Test calling close() immediately after connect()
270 TEST(AsyncSocketTest, ConnectAndClose) {
273 // Connect using a AsyncSocket
275 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
277 socket->connect(&ccb, server.getAddress(), 30);
279 // Hopefully the connect didn't succeed immediately.
280 // If it did, we can't exercise the close-while-connecting code path.
281 if (ccb.state == STATE_SUCCEEDED) {
282 LOG(INFO) << "connect() succeeded immediately; aborting test "
283 "of close-during-connect behavior";
289 // Loop, although there shouldn't be anything to do.
292 // Make sure the connection was aborted
293 CHECK_EQ(ccb.state, STATE_FAILED);
295 ASSERT_TRUE(socket->isClosedBySelf());
296 ASSERT_FALSE(socket->isClosedByPeer());
300 * Test calling closeNow() immediately after connect()
302 * This should be identical to the normal close behavior.
304 TEST(AsyncSocketTest, ConnectAndCloseNow) {
307 // Connect using a AsyncSocket
309 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
311 socket->connect(&ccb, server.getAddress(), 30);
313 // Hopefully the connect didn't succeed immediately.
314 // If it did, we can't exercise the close-while-connecting code path.
315 if (ccb.state == STATE_SUCCEEDED) {
316 LOG(INFO) << "connect() succeeded immediately; aborting test "
317 "of closeNow()-during-connect behavior";
323 // Loop, although there shouldn't be anything to do.
326 // Make sure the connection was aborted
327 CHECK_EQ(ccb.state, STATE_FAILED);
329 ASSERT_TRUE(socket->isClosedBySelf());
330 ASSERT_FALSE(socket->isClosedByPeer());
334 * Test calling both write() and closeNow() immediately after connecting,
335 * without waiting for connect to finish.
337 * This should abort the pending write.
339 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
344 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
346 socket->connect(&ccb, server.getAddress(), 30);
348 // Hopefully the connect didn't succeed immediately.
349 // If it did, we can't exercise the close-while-connecting code path.
350 if (ccb.state == STATE_SUCCEEDED) {
351 LOG(INFO) << "connect() succeeded immediately; aborting test "
352 "of write-during-connect behavior";
358 memset(buf, 'a', sizeof(buf));
360 socket->write(&wcb, buf, sizeof(buf));
365 // Loop, although there shouldn't be anything to do.
368 CHECK_EQ(ccb.state, STATE_FAILED);
369 CHECK_EQ(wcb.state, STATE_FAILED);
371 ASSERT_TRUE(socket->isClosedBySelf());
372 ASSERT_FALSE(socket->isClosedByPeer());
376 * Test installing a read callback immediately, before connect() finishes.
378 TEST(AsyncSocketTest, ConnectAndRead) {
383 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
385 socket->connect(&ccb, server.getAddress(), 30);
388 socket->setReadCB(&rcb);
390 // Even though we haven't looped yet, we should be able to accept
391 // the connection and send data to it.
392 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
394 memset(buf, 'a', sizeof(buf));
395 acceptedSocket->write(buf, sizeof(buf));
396 acceptedSocket->flush();
397 acceptedSocket->close();
399 // Loop, although there shouldn't be anything to do.
402 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
403 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
404 CHECK_EQ(rcb.buffers.size(), 1);
405 CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
406 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
408 ASSERT_FALSE(socket->isClosedBySelf());
409 ASSERT_FALSE(socket->isClosedByPeer());
413 * Test installing a read callback and then closing immediately before the
414 * connect attempt finishes.
416 TEST(AsyncSocketTest, ConnectReadAndClose) {
421 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
423 socket->connect(&ccb, server.getAddress(), 30);
425 // Hopefully the connect didn't succeed immediately.
426 // If it did, we can't exercise the close-while-connecting code path.
427 if (ccb.state == STATE_SUCCEEDED) {
428 LOG(INFO) << "connect() succeeded immediately; aborting test "
429 "of read-during-connect behavior";
434 socket->setReadCB(&rcb);
439 // Loop, although there shouldn't be anything to do.
442 CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
443 CHECK_EQ(rcb.buffers.size(), 0);
444 CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
446 ASSERT_TRUE(socket->isClosedBySelf());
447 ASSERT_FALSE(socket->isClosedByPeer());
451 * Test both writing and installing a read callback immediately,
452 * before connect() finishes.
454 TEST(AsyncSocketTest, ConnectWriteAndRead) {
459 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
461 socket->connect(&ccb, server.getAddress(), 30);
465 memset(buf1, 'a', sizeof(buf1));
467 socket->write(&wcb, buf1, sizeof(buf1));
469 // set a read callback
471 socket->setReadCB(&rcb);
473 // Even though we haven't looped yet, we should be able to accept
474 // the connection and send data to it.
475 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
477 memset(buf2, 'b', sizeof(buf2));
478 acceptedSocket->write(buf2, sizeof(buf2));
479 acceptedSocket->flush();
481 // shut down the write half of acceptedSocket, so that the AsyncSocket
482 // will stop reading and we can break out of the event loop.
483 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
488 // Make sure the connect succeeded
489 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
491 // Make sure the AsyncSocket read the data written by the accepted socket
492 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
493 CHECK_EQ(rcb.buffers.size(), 1);
494 CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
495 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
497 // Close the AsyncSocket so we'll see EOF on acceptedSocket
500 // Make sure the accepted socket saw the data written by the AsyncSocket
501 uint8_t readbuf[sizeof(buf1)];
502 acceptedSocket->readAll(readbuf, sizeof(readbuf));
503 CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
504 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
505 CHECK_EQ(bytesRead, 0);
507 ASSERT_FALSE(socket->isClosedBySelf());
508 ASSERT_TRUE(socket->isClosedByPeer());
512 * Test writing to the socket then shutting down writes before the connect
515 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
520 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
522 socket->connect(&ccb, server.getAddress(), 30);
524 // Hopefully the connect didn't succeed immediately.
525 // If it did, we can't exercise the write-while-connecting code path.
526 if (ccb.state == STATE_SUCCEEDED) {
527 LOG(INFO) << "connect() succeeded immediately; skipping test";
531 // Ask to write some data
533 memset(wbuf, 'a', sizeof(wbuf));
535 socket->write(&wcb, wbuf, sizeof(wbuf));
536 socket->shutdownWrite();
539 socket->shutdownWrite();
541 // Even though we haven't looped yet, we should be able to accept
543 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
545 // Since the connection is still in progress, there should be no data to
546 // read yet. Verify that the accepted socket is not readable.
547 struct pollfd fds[1];
548 fds[0].fd = acceptedSocket->getSocketFD();
549 fds[0].events = POLLIN;
551 int rc = poll(fds, 1, 0);
554 // Write data to the accepted socket
555 uint8_t acceptedWbuf[192];
556 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
557 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
558 acceptedSocket->flush();
563 // The loop should have completed the connection, written the queued data,
564 // and shutdown writes on the socket.
566 // Check that the connection was completed successfully and that the write
567 // callback succeeded.
568 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
569 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
571 // Check that we can read the data that was written to the socket, and that
572 // we see an EOF, since its socket was half-shutdown.
573 uint8_t readbuf[sizeof(wbuf)];
574 acceptedSocket->readAll(readbuf, sizeof(readbuf));
575 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
576 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
577 CHECK_EQ(bytesRead, 0);
579 // Close the accepted socket. This will cause it to see EOF
580 // and uninstall the read callback when we loop next.
581 acceptedSocket->close();
583 // Install a read callback, then loop again.
585 socket->setReadCB(&rcb);
588 // This loop should have read the data and seen the EOF
589 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
590 CHECK_EQ(rcb.buffers.size(), 1);
591 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
592 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
593 acceptedWbuf, sizeof(acceptedWbuf)), 0);
595 ASSERT_FALSE(socket->isClosedBySelf());
596 ASSERT_FALSE(socket->isClosedByPeer());
600 * Test reading, writing, and shutting down writes before the connect attempt
603 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
608 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
610 socket->connect(&ccb, server.getAddress(), 30);
612 // Hopefully the connect didn't succeed immediately.
613 // If it did, we can't exercise the write-while-connecting code path.
614 if (ccb.state == STATE_SUCCEEDED) {
615 LOG(INFO) << "connect() succeeded immediately; skipping test";
619 // Install a read callback
621 socket->setReadCB(&rcb);
623 // Ask to write some data
625 memset(wbuf, 'a', sizeof(wbuf));
627 socket->write(&wcb, wbuf, sizeof(wbuf));
630 socket->shutdownWrite();
632 // Even though we haven't looped yet, we should be able to accept
634 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
636 // Since the connection is still in progress, there should be no data to
637 // read yet. Verify that the accepted socket is not readable.
638 struct pollfd fds[1];
639 fds[0].fd = acceptedSocket->getSocketFD();
640 fds[0].events = POLLIN;
642 int rc = poll(fds, 1, 0);
645 // Write data to the accepted socket
646 uint8_t acceptedWbuf[192];
647 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
648 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
649 acceptedSocket->flush();
650 // Shutdown writes to the accepted socket. This will cause it to see EOF
651 // and uninstall the read callback.
652 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
657 // The loop should have completed the connection, written the queued data,
658 // shutdown writes on the socket, read the data we wrote to it, and see the
661 // Check that the connection was completed successfully and that the read
662 // and write callbacks were invoked as expected.
663 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
664 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
665 CHECK_EQ(rcb.buffers.size(), 1);
666 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
667 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
668 acceptedWbuf, sizeof(acceptedWbuf)), 0);
669 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
671 // Check that we can read the data that was written to the socket, and that
672 // we see an EOF, since its socket was half-shutdown.
673 uint8_t readbuf[sizeof(wbuf)];
674 acceptedSocket->readAll(readbuf, sizeof(readbuf));
675 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
676 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
677 CHECK_EQ(bytesRead, 0);
679 // Fully close both sockets
680 acceptedSocket->close();
683 ASSERT_FALSE(socket->isClosedBySelf());
684 ASSERT_TRUE(socket->isClosedByPeer());
688 * Test reading, writing, and calling shutdownWriteNow() before the
689 * connect attempt finishes.
691 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
696 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
698 socket->connect(&ccb, server.getAddress(), 30);
700 // Hopefully the connect didn't succeed immediately.
701 // If it did, we can't exercise the write-while-connecting code path.
702 if (ccb.state == STATE_SUCCEEDED) {
703 LOG(INFO) << "connect() succeeded immediately; skipping test";
707 // Install a read callback
709 socket->setReadCB(&rcb);
711 // Ask to write some data
713 memset(wbuf, 'a', sizeof(wbuf));
715 socket->write(&wcb, wbuf, sizeof(wbuf));
717 // Shutdown writes immediately.
718 // This should immediately discard the data that we just tried to write.
719 socket->shutdownWriteNow();
721 // Verify that writeError() was invoked on the write callback.
722 CHECK_EQ(wcb.state, STATE_FAILED);
723 CHECK_EQ(wcb.bytesWritten, 0);
725 // Even though we haven't looped yet, we should be able to accept
727 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
729 // Since the connection is still in progress, there should be no data to
730 // read yet. Verify that the accepted socket is not readable.
731 struct pollfd fds[1];
732 fds[0].fd = acceptedSocket->getSocketFD();
733 fds[0].events = POLLIN;
735 int rc = poll(fds, 1, 0);
738 // Write data to the accepted socket
739 uint8_t acceptedWbuf[192];
740 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
741 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
742 acceptedSocket->flush();
743 // Shutdown writes to the accepted socket. This will cause it to see EOF
744 // and uninstall the read callback.
745 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
750 // The loop should have completed the connection, written the queued data,
751 // shutdown writes on the socket, read the data we wrote to it, and see the
754 // Check that the connection was completed successfully and that the read
755 // callback was invoked as expected.
756 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
757 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
758 CHECK_EQ(rcb.buffers.size(), 1);
759 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
760 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
761 acceptedWbuf, sizeof(acceptedWbuf)), 0);
763 // Since we used shutdownWriteNow(), it should have discarded all pending
764 // write data. Verify we see an immediate EOF when reading from the accepted
766 uint8_t readbuf[sizeof(wbuf)];
767 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
768 CHECK_EQ(bytesRead, 0);
770 // Fully close both sockets
771 acceptedSocket->close();
774 ASSERT_FALSE(socket->isClosedBySelf());
775 ASSERT_TRUE(socket->isClosedByPeer());
778 // Helper function for use in testConnectOptWrite()
779 // Temporarily disable the read callback
780 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
781 // Uninstall the read callback
782 socket->setReadCB(nullptr);
783 // Schedule the read callback to be reinstalled after 1ms
784 socket->getEventBase()->runInLoop(
785 std::bind(&AsyncSocket::setReadCB, socket, rcb));
789 * Test connect+write, then have the connect callback perform another write.
791 * This tests interaction of the optimistic writing after connect with
792 * additional write attempts that occur in the connect callback.
794 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
797 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
801 socket->connect(&ccb, server.getAddress(), 30);
803 // Hopefully the connect didn't succeed immediately.
804 // If it did, we can't exercise the optimistic write code path.
805 if (ccb.state == STATE_SUCCEEDED) {
806 LOG(INFO) << "connect() succeeded immediately; aborting test "
807 "of optimistic write behavior";
811 // Tell the connect callback to perform a write when the connect succeeds
813 scoped_array<char> buf2(new char[size2]);
814 memset(buf2.get(), 'b', size2);
816 ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
817 // Tell the second write callback to close the connection when it is done
818 wcb2.successCallback = [&] { socket->closeNow(); };
821 // Schedule one write() immediately, before the connect finishes
822 scoped_array<char> buf1(new char[size1]);
823 memset(buf1.get(), 'a', size1);
826 socket->write(&wcb1, buf1.get(), size1);
830 // immediately perform a close, before connect() completes
834 // Start reading from the other endpoint after 10ms.
835 // If we're using large buffers, we have to read so that the writes don't
837 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
839 rcb.dataAvailableCallback = std::bind(tmpDisableReads,
840 acceptedSocket.get(), &rcb);
841 socket->getEventBase()->tryRunAfterDelay(
842 std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
845 // Loop. We don't bother accepting on the server socket yet.
846 // The kernel should be able to buffer the write request so it can succeed.
849 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
851 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
854 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
859 // Make sure the read callback received all of the data
860 size_t bytesRead = 0;
861 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
862 it != rcb.buffers.end();
864 size_t start = bytesRead;
865 bytesRead += it->length;
866 size_t end = bytesRead;
868 size_t cmpLen = min(size1, end) - start;
869 CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
871 if (end > size1 && end <= size1 + size2) {
875 if (start >= size1) {
877 buf2Offset = start - size1;
878 cmpLen = end - start;
880 itOffset = size1 - start;
882 cmpLen = end - size1;
884 CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
889 CHECK_EQ(bytesRead, size1 + size2);
892 TEST(AsyncSocketTest, ConnectCallbackWrite) {
893 // Test using small writes that should both succeed immediately
894 testConnectOptWrite(100, 200);
896 // Test using a large buffer in the connect callback, that should block
897 const size_t largeSize = 8*1024*1024;
898 testConnectOptWrite(100, largeSize);
900 // Test using a large initial write
901 testConnectOptWrite(largeSize, 100);
903 // Test using two large buffers
904 testConnectOptWrite(largeSize, largeSize);
906 // Test a small write in the connect callback,
907 // but no immediate write before connect completes
908 testConnectOptWrite(0, 64);
910 // Test a large write in the connect callback,
911 // but no immediate write before connect completes
912 testConnectOptWrite(0, largeSize);
914 // Test connect, a small write, then immediately call close() before connect
916 testConnectOptWrite(211, 0, true);
918 // Test connect, a large immediate write (that will block), then immediately
919 // call close() before connect completes
920 testConnectOptWrite(largeSize, 0, true);
923 ///////////////////////////////////////////////////////////////////////////
924 // write() related tests
925 ///////////////////////////////////////////////////////////////////////////
928 * Test writing using a nullptr callback
930 TEST(AsyncSocketTest, WriteNullCallback) {
935 std::shared_ptr<AsyncSocket> socket =
936 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
937 evb.loop(); // loop until the socket is connected
939 // write() with a nullptr callback
941 memset(buf, 'a', sizeof(buf));
942 socket->write(nullptr, buf, sizeof(buf));
944 evb.loop(); // loop until the data is sent
946 // Make sure the server got a connection and received the data
948 server.verifyConnection(buf, sizeof(buf));
950 ASSERT_TRUE(socket->isClosedBySelf());
951 ASSERT_FALSE(socket->isClosedByPeer());
955 * Test writing with a send timeout
957 TEST(AsyncSocketTest, WriteTimeout) {
962 std::shared_ptr<AsyncSocket> socket =
963 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
964 evb.loop(); // loop until the socket is connected
966 // write() a large chunk of data, with no-one on the other end reading
967 size_t writeLength = 8*1024*1024;
968 uint32_t timeout = 200;
969 socket->setSendTimeout(timeout);
970 scoped_array<char> buf(new char[writeLength]);
971 memset(buf.get(), 'a', writeLength);
973 socket->write(&wcb, buf.get(), writeLength);
979 // Make sure the write attempt timed out as requested
980 CHECK_EQ(wcb.state, STATE_FAILED);
981 CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
983 // Check that the write timed out within a reasonable period of time.
984 // We don't check for exactly the specified timeout, since AsyncSocket only
985 // times out when it hasn't made progress for that period of time.
987 // On linux, the first write sends a few hundred kb of data, then blocks for
988 // writability, and then unblocks again after 40ms and is able to write
989 // another smaller of data before blocking permanently. Therefore it doesn't
990 // time out until 40ms + timeout.
992 // I haven't fully verified the cause of this, but I believe it probably
993 // occurs because the receiving end delays sending an ack for up to 40ms.
994 // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
995 // the ack, it can send some more data. However, after that point the
996 // receiver's kernel buffer is full. This 40ms delay happens even with
997 // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
998 // kernel may be automatically disabling TCP_QUICKACK after receiving some
1001 // For now, we simply check that the timeout occurred within 160ms of
1002 // the requested value.
1003 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1007 * Test writing to a socket that the remote endpoint has closed
1009 TEST(AsyncSocketTest, WritePipeError) {
1014 std::shared_ptr<AsyncSocket> socket =
1015 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1016 socket->setSendTimeout(1000);
1017 evb.loop(); // loop until the socket is connected
1019 // accept and immediately close the socket
1020 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1021 acceptedSocket.reset();
1023 // write() a large chunk of data
1024 size_t writeLength = 8*1024*1024;
1025 scoped_array<char> buf(new char[writeLength]);
1026 memset(buf.get(), 'a', writeLength);
1028 socket->write(&wcb, buf.get(), writeLength);
1032 // Make sure the write failed.
1033 // It would be nice if AsyncSocketException could convey the errno value,
1034 // so that we could check for EPIPE
1035 CHECK_EQ(wcb.state, STATE_FAILED);
1036 CHECK_EQ(wcb.exception.getType(),
1037 AsyncSocketException::INTERNAL_ERROR);
1039 ASSERT_FALSE(socket->isClosedBySelf());
1040 ASSERT_FALSE(socket->isClosedByPeer());
1044 * Test writing a mix of simple buffers and IOBufs
1046 TEST(AsyncSocketTest, WriteIOBuf) {
1051 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1053 socket->connect(&ccb, server.getAddress(), 30);
1055 // Accept the connection
1056 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1058 acceptedSocket->setReadCB(&rcb);
1060 // Write a simple buffer to the socket
1061 size_t simpleBufLength = 5;
1062 char simpleBuf[simpleBufLength];
1063 memset(simpleBuf, 'a', simpleBufLength);
1065 socket->write(&wcb, simpleBuf, simpleBufLength);
1067 // Write a single-element IOBuf chain
1068 size_t buf1Length = 7;
1069 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1070 memset(buf1->writableData(), 'b', buf1Length);
1071 buf1->append(buf1Length);
1072 unique_ptr<IOBuf> buf1Copy(buf1->clone());
1074 socket->writeChain(&wcb2, std::move(buf1));
1076 // Write a multiple-element IOBuf chain
1077 size_t buf2Length = 11;
1078 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1079 memset(buf2->writableData(), 'c', buf2Length);
1080 buf2->append(buf2Length);
1081 size_t buf3Length = 13;
1082 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1083 memset(buf3->writableData(), 'd', buf3Length);
1084 buf3->append(buf3Length);
1085 buf2->appendChain(std::move(buf3));
1086 unique_ptr<IOBuf> buf2Copy(buf2->clone());
1087 buf2Copy->coalesce();
1089 socket->writeChain(&wcb3, std::move(buf2));
1090 socket->shutdownWrite();
1092 // Let the reads and writes run to completion
1095 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1096 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1097 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1099 // Make sure the reader got the right data in the right order
1100 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1101 CHECK_EQ(rcb.buffers.size(), 1);
1102 CHECK_EQ(rcb.buffers[0].length,
1103 simpleBufLength + buf1Length + buf2Length + buf3Length);
1105 memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1107 memcmp(rcb.buffers[0].buffer + simpleBufLength,
1108 buf1Copy->data(), buf1Copy->length()), 0);
1110 memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1111 buf2Copy->data(), buf2Copy->length()), 0);
1113 acceptedSocket->close();
1116 ASSERT_TRUE(socket->isClosedBySelf());
1117 ASSERT_FALSE(socket->isClosedByPeer());
1120 TEST(AsyncSocketTest, WriteIOBufCorked) {
1125 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1127 socket->connect(&ccb, server.getAddress(), 30);
1129 // Accept the connection
1130 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1132 acceptedSocket->setReadCB(&rcb);
1134 // Do three writes, 100ms apart, with the "cork" flag set
1135 // on the second write. The reader should see the first write
1136 // arrive by itself, followed by the second and third writes
1137 // arriving together.
1138 size_t buf1Length = 5;
1139 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1140 memset(buf1->writableData(), 'a', buf1Length);
1141 buf1->append(buf1Length);
1142 size_t buf2Length = 7;
1143 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1144 memset(buf2->writableData(), 'b', buf2Length);
1145 buf2->append(buf2Length);
1146 size_t buf3Length = 11;
1147 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1148 memset(buf3->writableData(), 'c', buf3Length);
1149 buf3->append(buf3Length);
1151 socket->writeChain(&wcb1, std::move(buf1));
1153 DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1154 write2.scheduleTimeout(100);
1156 DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1157 write3.scheduleTimeout(200);
1160 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1161 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1162 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1163 if (wcb3.state != STATE_SUCCEEDED) {
1164 throw(wcb3.exception);
1166 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1168 // Make sure the reader got the data with the right grouping
1169 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1170 CHECK_EQ(rcb.buffers.size(), 2);
1171 CHECK_EQ(rcb.buffers[0].length, buf1Length);
1172 CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1174 acceptedSocket->close();
1177 ASSERT_TRUE(socket->isClosedBySelf());
1178 ASSERT_FALSE(socket->isClosedByPeer());
1182 * Test performing a zero-length write
1184 TEST(AsyncSocketTest, ZeroLengthWrite) {
1189 std::shared_ptr<AsyncSocket> socket =
1190 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1191 evb.loop(); // loop until the socket is connected
1193 auto acceptedSocket = server.acceptAsync(&evb);
1195 acceptedSocket->setReadCB(&rcb);
1197 size_t len1 = 1024*1024;
1198 size_t len2 = 1024*1024;
1199 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1200 memset(buf.get(), 'a', len1);
1201 memset(buf.get(), 'b', len2);
1207 socket->write(&wcb1, buf.get(), 0);
1208 socket->write(&wcb2, buf.get(), len1);
1209 socket->write(&wcb3, buf.get() + len1, 0);
1210 socket->write(&wcb4, buf.get() + len1, len2);
1213 evb.loop(); // loop until the data is sent
1215 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1216 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1217 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1218 CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1219 rcb.verifyData(buf.get(), len1 + len2);
1221 ASSERT_TRUE(socket->isClosedBySelf());
1222 ASSERT_FALSE(socket->isClosedByPeer());
1225 TEST(AsyncSocketTest, ZeroLengthWritev) {
1230 std::shared_ptr<AsyncSocket> socket =
1231 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1232 evb.loop(); // loop until the socket is connected
1234 auto acceptedSocket = server.acceptAsync(&evb);
1236 acceptedSocket->setReadCB(&rcb);
1238 size_t len1 = 1024*1024;
1239 size_t len2 = 1024*1024;
1240 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1241 memset(buf.get(), 'a', len1);
1242 memset(buf.get(), 'b', len2);
1245 size_t iovCount = 4;
1246 struct iovec iov[iovCount];
1247 iov[0].iov_base = buf.get();
1248 iov[0].iov_len = len1;
1249 iov[1].iov_base = buf.get() + len1;
1251 iov[2].iov_base = buf.get() + len1;
1252 iov[2].iov_len = len2;
1253 iov[3].iov_base = buf.get() + len1 + len2;
1256 socket->writev(&wcb, iov, iovCount);
1258 evb.loop(); // loop until the data is sent
1260 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1261 rcb.verifyData(buf.get(), len1 + len2);
1263 ASSERT_TRUE(socket->isClosedBySelf());
1264 ASSERT_FALSE(socket->isClosedByPeer());
1267 ///////////////////////////////////////////////////////////////////////////
1268 // close() related tests
1269 ///////////////////////////////////////////////////////////////////////////
1272 * Test calling close() with pending writes when the socket is already closing.
1274 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1279 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1281 socket->connect(&ccb, server.getAddress(), 30);
1283 // accept the socket on the server side
1284 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1286 // Loop to ensure the connect has completed
1289 // Make sure we are connected
1290 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1292 // Schedule pending writes, until several write attempts have blocked
1294 memset(buf, 'a', sizeof(buf));
1295 typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1296 WriteCallbackVector writeCallbacks;
1298 writeCallbacks.reserve(5);
1299 while (writeCallbacks.size() < 5) {
1300 std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1302 socket->write(wcb.get(), buf, sizeof(buf));
1303 if (wcb->state == STATE_SUCCEEDED) {
1304 // Succeeded immediately. Keep performing more writes
1308 // This write is blocked.
1309 // Have the write callback call close() when writeError() is invoked
1310 wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1311 writeCallbacks.push_back(wcb);
1314 // Call closeNow() to immediately fail the pending writes
1317 // Make sure writeError() was invoked on all of the pending write callbacks
1318 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1319 it != writeCallbacks.end();
1321 CHECK_EQ((*it)->state, STATE_FAILED);
1324 ASSERT_TRUE(socket->isClosedBySelf());
1325 ASSERT_FALSE(socket->isClosedByPeer());
1328 ///////////////////////////////////////////////////////////////////////////
1329 // ImmediateRead related tests
1330 ///////////////////////////////////////////////////////////////////////////
1332 /* AsyncSocket use to verify immediate read works */
1333 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1335 bool immediateReadCalled = false;
1336 explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1338 void checkForImmediateRead() noexcept override {
1339 immediateReadCalled = true;
1340 AsyncSocket::handleRead();
1344 TEST(AsyncSocket, ConnectReadImmediateRead) {
1347 const size_t maxBufferSz = 100;
1348 const size_t maxReadsPerEvent = 1;
1349 const size_t expectedDataSz = maxBufferSz * 3;
1350 char expectedData[expectedDataSz];
1351 memset(expectedData, 'j', expectedDataSz);
1354 ReadCallback rcb(maxBufferSz);
1355 AsyncSocketImmediateRead socket(&evb);
1356 socket.connect(nullptr, server.getAddress(), 30);
1358 evb.loop(); // loop until the socket is connected
1360 socket.setReadCB(&rcb);
1361 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1362 socket.immediateReadCalled = false;
1364 auto acceptedSocket = server.acceptAsync(&evb);
1366 ReadCallback rcbServer;
1367 WriteCallback wcbServer;
1368 rcbServer.dataAvailableCallback = [&]() {
1369 if (rcbServer.dataRead() == expectedDataSz) {
1370 // write back all data read
1371 rcbServer.verifyData(expectedData, expectedDataSz);
1372 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1373 acceptedSocket->close();
1376 acceptedSocket->setReadCB(&rcbServer);
1380 socket.write(&wcb1, expectedData, expectedDataSz);
1382 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1383 rcb.verifyData(expectedData, expectedDataSz);
1384 CHECK_EQ(socket.immediateReadCalled, true);
1386 ASSERT_FALSE(socket.isClosedBySelf());
1387 ASSERT_FALSE(socket.isClosedByPeer());
1390 TEST(AsyncSocket, ConnectReadUninstallRead) {
1393 const size_t maxBufferSz = 100;
1394 const size_t maxReadsPerEvent = 1;
1395 const size_t expectedDataSz = maxBufferSz * 3;
1396 char expectedData[expectedDataSz];
1397 memset(expectedData, 'k', expectedDataSz);
1400 ReadCallback rcb(maxBufferSz);
1401 AsyncSocketImmediateRead socket(&evb);
1402 socket.connect(nullptr, server.getAddress(), 30);
1404 evb.loop(); // loop until the socket is connected
1406 socket.setReadCB(&rcb);
1407 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1408 socket.immediateReadCalled = false;
1410 auto acceptedSocket = server.acceptAsync(&evb);
1412 ReadCallback rcbServer;
1413 WriteCallback wcbServer;
1414 rcbServer.dataAvailableCallback = [&]() {
1415 if (rcbServer.dataRead() == expectedDataSz) {
1416 // write back all data read
1417 rcbServer.verifyData(expectedData, expectedDataSz);
1418 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1419 acceptedSocket->close();
1422 acceptedSocket->setReadCB(&rcbServer);
1424 rcb.dataAvailableCallback = [&]() {
1425 // we read data and reset readCB
1426 socket.setReadCB(nullptr);
1431 socket.write(&wcb, expectedData, expectedDataSz);
1433 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1435 /* we shoud've only read maxBufferSz data since readCallback_
1436 * was reset in dataAvailableCallback */
1437 CHECK_EQ(rcb.dataRead(), maxBufferSz);
1438 CHECK_EQ(socket.immediateReadCalled, false);
1440 ASSERT_FALSE(socket.isClosedBySelf());
1441 ASSERT_FALSE(socket.isClosedByPeer());
1445 // - Test connect() and have the connect callback set the read callback
1446 // - Test connect() and have the connect callback unset the read callback
1447 // - Test reading/writing/closing/destroying the socket in the connect callback
1448 // - Test reading/writing/closing/destroying the socket in the read callback
1449 // - Test reading/writing/closing/destroying the socket in the write callback
1450 // - Test one-way shutdown behavior
1451 // - Test changing the EventBase
1453 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1454 // in connectSuccess(), readDataAvailable(), writeSuccess()
1457 ///////////////////////////////////////////////////////////////////////////
1458 // AsyncServerSocket tests
1459 ///////////////////////////////////////////////////////////////////////////
1462 * Helper ConnectionEventCallback class for the test code.
1463 * It maintains counters protected by a spin lock.
1465 class TestConnectionEventCallback :
1466 public AsyncServerSocket::ConnectionEventCallback {
1468 virtual void onConnectionAccepted(
1469 const int /* socket */,
1470 const SocketAddress& /* addr */) noexcept override {
1471 folly::RWSpinLock::WriteHolder holder(spinLock_);
1472 connectionAccepted_++;
1475 virtual void onConnectionAcceptError(const int /* err */) noexcept override {
1476 folly::RWSpinLock::WriteHolder holder(spinLock_);
1477 connectionAcceptedError_++;
1480 virtual void onConnectionDropped(
1481 const int /* socket */,
1482 const SocketAddress& /* addr */) noexcept override {
1483 folly::RWSpinLock::WriteHolder holder(spinLock_);
1484 connectionDropped_++;
1487 virtual void onConnectionEnqueuedForAcceptorCallback(
1488 const int /* socket */,
1489 const SocketAddress& /* addr */) noexcept override {
1490 folly::RWSpinLock::WriteHolder holder(spinLock_);
1491 connectionEnqueuedForAcceptCallback_++;
1494 virtual void onConnectionDequeuedByAcceptorCallback(
1495 const int /* socket */,
1496 const SocketAddress& /* addr */) noexcept override {
1497 folly::RWSpinLock::WriteHolder holder(spinLock_);
1498 connectionDequeuedByAcceptCallback_++;
1501 virtual void onBackoffStarted() noexcept override {
1502 folly::RWSpinLock::WriteHolder holder(spinLock_);
1506 virtual void onBackoffEnded() noexcept override {
1507 folly::RWSpinLock::WriteHolder holder(spinLock_);
1511 virtual void onBackoffError() noexcept override {
1512 folly::RWSpinLock::WriteHolder holder(spinLock_);
1516 unsigned int getConnectionAccepted() const {
1517 folly::RWSpinLock::ReadHolder holder(spinLock_);
1518 return connectionAccepted_;
1521 unsigned int getConnectionAcceptedError() const {
1522 folly::RWSpinLock::ReadHolder holder(spinLock_);
1523 return connectionAcceptedError_;
1526 unsigned int getConnectionDropped() const {
1527 folly::RWSpinLock::ReadHolder holder(spinLock_);
1528 return connectionDropped_;
1531 unsigned int getConnectionEnqueuedForAcceptCallback() const {
1532 folly::RWSpinLock::ReadHolder holder(spinLock_);
1533 return connectionEnqueuedForAcceptCallback_;
1536 unsigned int getConnectionDequeuedByAcceptCallback() const {
1537 folly::RWSpinLock::ReadHolder holder(spinLock_);
1538 return connectionDequeuedByAcceptCallback_;
1541 unsigned int getBackoffStarted() const {
1542 folly::RWSpinLock::ReadHolder holder(spinLock_);
1543 return backoffStarted_;
1546 unsigned int getBackoffEnded() const {
1547 folly::RWSpinLock::ReadHolder holder(spinLock_);
1548 return backoffEnded_;
1551 unsigned int getBackoffError() const {
1552 folly::RWSpinLock::ReadHolder holder(spinLock_);
1553 return backoffError_;
1557 mutable folly::RWSpinLock spinLock_;
1558 unsigned int connectionAccepted_{0};
1559 unsigned int connectionAcceptedError_{0};
1560 unsigned int connectionDropped_{0};
1561 unsigned int connectionEnqueuedForAcceptCallback_{0};
1562 unsigned int connectionDequeuedByAcceptCallback_{0};
1563 unsigned int backoffStarted_{0};
1564 unsigned int backoffEnded_{0};
1565 unsigned int backoffError_{0};
1569 * Helper AcceptCallback class for the test code
1570 * It records the callbacks that were invoked, and also supports calling
1571 * generic std::function objects in each callback.
1573 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1582 EventInfo(int fd, const folly::SocketAddress& addr)
1583 : type(TYPE_ACCEPT),
1587 explicit EventInfo(const std::string& msg)
1592 explicit EventInfo(EventType et)
1599 int fd; // valid for TYPE_ACCEPT
1600 folly::SocketAddress address; // valid for TYPE_ACCEPT
1601 string errorMsg; // valid for TYPE_ERROR
1603 typedef std::deque<EventInfo> EventList;
1605 TestAcceptCallback()
1606 : connectionAcceptedFn_(),
1611 std::deque<EventInfo>* getEvents() {
1615 void setConnectionAcceptedFn(
1616 const std::function<void(int, const folly::SocketAddress&)>& fn) {
1617 connectionAcceptedFn_ = fn;
1619 void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1620 acceptErrorFn_ = fn;
1622 void setAcceptStartedFn(const std::function<void()>& fn) {
1623 acceptStartedFn_ = fn;
1625 void setAcceptStoppedFn(const std::function<void()>& fn) {
1626 acceptStoppedFn_ = fn;
1629 void connectionAccepted(
1630 int fd, const folly::SocketAddress& clientAddr) noexcept override {
1631 events_.emplace_back(fd, clientAddr);
1633 if (connectionAcceptedFn_) {
1634 connectionAcceptedFn_(fd, clientAddr);
1637 void acceptError(const std::exception& ex) noexcept override {
1638 events_.emplace_back(ex.what());
1640 if (acceptErrorFn_) {
1644 void acceptStarted() noexcept override {
1645 events_.emplace_back(TYPE_START);
1647 if (acceptStartedFn_) {
1651 void acceptStopped() noexcept override {
1652 events_.emplace_back(TYPE_STOP);
1654 if (acceptStoppedFn_) {
1660 std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1661 std::function<void(const std::exception&)> acceptErrorFn_;
1662 std::function<void()> acceptStartedFn_;
1663 std::function<void()> acceptStoppedFn_;
1665 std::deque<EventInfo> events_;
1670 * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1672 TEST(AsyncSocketTest, ServerAcceptOptions) {
1673 EventBase eventBase;
1675 // Create a server socket
1676 std::shared_ptr<AsyncServerSocket> serverSocket(
1677 AsyncServerSocket::newSocket(&eventBase));
1678 serverSocket->bind(0);
1679 serverSocket->listen(16);
1680 folly::SocketAddress serverAddress;
1681 serverSocket->getAddress(&serverAddress);
1683 // Add a callback to accept one connection then stop the loop
1684 TestAcceptCallback acceptCallback;
1685 acceptCallback.setConnectionAcceptedFn(
1686 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1687 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1689 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1690 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1692 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1693 serverSocket->startAccepting();
1695 // Connect to the server socket
1696 std::shared_ptr<AsyncSocket> socket(
1697 AsyncSocket::newSocket(&eventBase, serverAddress));
1701 // Verify that the server accepted a connection
1702 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1703 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1704 TestAcceptCallback::TYPE_START);
1705 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1706 TestAcceptCallback::TYPE_ACCEPT);
1707 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1708 TestAcceptCallback::TYPE_STOP);
1709 int fd = acceptCallback.getEvents()->at(1).fd;
1711 // The accepted connection should already be in non-blocking mode
1712 int flags = fcntl(fd, F_GETFL, 0);
1713 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1716 // The accepted connection should already have TCP_NODELAY set
1718 socklen_t valueLength = sizeof(value);
1719 int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1726 * Test AsyncServerSocket::removeAcceptCallback()
1728 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1729 // Create a new AsyncServerSocket
1730 EventBase eventBase;
1731 std::shared_ptr<AsyncServerSocket> serverSocket(
1732 AsyncServerSocket::newSocket(&eventBase));
1733 serverSocket->bind(0);
1734 serverSocket->listen(16);
1735 folly::SocketAddress serverAddress;
1736 serverSocket->getAddress(&serverAddress);
1738 // Add several accept callbacks
1739 TestAcceptCallback cb1;
1740 TestAcceptCallback cb2;
1741 TestAcceptCallback cb3;
1742 TestAcceptCallback cb4;
1743 TestAcceptCallback cb5;
1744 TestAcceptCallback cb6;
1745 TestAcceptCallback cb7;
1747 // Test having callbacks remove other callbacks before them on the list,
1748 // after them on the list, or removing themselves.
1750 // Have callback 2 remove callback 3 and callback 5 the first time it is
1753 cb1.setConnectionAcceptedFn([&](int /* fd */,
1754 const folly::SocketAddress& /* addr */) {
1755 std::shared_ptr<AsyncSocket> sock2(
1756 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1758 cb3.setConnectionAcceptedFn(
1759 [&](int /* fd */, const folly::SocketAddress& /* addr */) {});
1760 cb4.setConnectionAcceptedFn(
1761 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1762 std::shared_ptr<AsyncSocket> sock3(
1763 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1765 cb5.setConnectionAcceptedFn(
1766 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1767 std::shared_ptr<AsyncSocket> sock5(
1768 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1771 cb2.setConnectionAcceptedFn(
1772 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1773 if (cb2Count == 0) {
1774 serverSocket->removeAcceptCallback(&cb3, nullptr);
1775 serverSocket->removeAcceptCallback(&cb5, nullptr);
1779 // Have callback 6 remove callback 4 the first time it is called,
1780 // and destroy the server socket the second time it is called
1782 cb6.setConnectionAcceptedFn(
1783 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1784 if (cb6Count == 0) {
1785 serverSocket->removeAcceptCallback(&cb4, nullptr);
1786 std::shared_ptr<AsyncSocket> sock6(
1787 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1788 std::shared_ptr<AsyncSocket> sock7(
1789 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1790 std::shared_ptr<AsyncSocket> sock8(
1791 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1794 serverSocket.reset();
1798 // Have callback 7 remove itself
1799 cb7.setConnectionAcceptedFn(
1800 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1801 serverSocket->removeAcceptCallback(&cb7, nullptr);
1804 serverSocket->addAcceptCallback(&cb1, nullptr);
1805 serverSocket->addAcceptCallback(&cb2, nullptr);
1806 serverSocket->addAcceptCallback(&cb3, nullptr);
1807 serverSocket->addAcceptCallback(&cb4, nullptr);
1808 serverSocket->addAcceptCallback(&cb5, nullptr);
1809 serverSocket->addAcceptCallback(&cb6, nullptr);
1810 serverSocket->addAcceptCallback(&cb7, nullptr);
1811 serverSocket->startAccepting();
1813 // Make several connections to the socket
1814 std::shared_ptr<AsyncSocket> sock1(
1815 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1816 std::shared_ptr<AsyncSocket> sock4(
1817 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1819 // Loop until we are stopped
1822 // Check to make sure that the expected callbacks were invoked.
1824 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1825 // the AcceptCallbacks in round-robin fashion, in the order that they were
1826 // added. The code is implemented this way right now, but the API doesn't
1827 // explicitly require it be done this way. If we change the code not to be
1828 // exactly round robin in the future, we can simplify the test checks here.
1829 // (We'll also need to update the termination code, since we expect cb6 to
1830 // get called twice to terminate the loop.)
1831 CHECK_EQ(cb1.getEvents()->size(), 4);
1832 CHECK_EQ(cb1.getEvents()->at(0).type,
1833 TestAcceptCallback::TYPE_START);
1834 CHECK_EQ(cb1.getEvents()->at(1).type,
1835 TestAcceptCallback::TYPE_ACCEPT);
1836 CHECK_EQ(cb1.getEvents()->at(2).type,
1837 TestAcceptCallback::TYPE_ACCEPT);
1838 CHECK_EQ(cb1.getEvents()->at(3).type,
1839 TestAcceptCallback::TYPE_STOP);
1841 CHECK_EQ(cb2.getEvents()->size(), 4);
1842 CHECK_EQ(cb2.getEvents()->at(0).type,
1843 TestAcceptCallback::TYPE_START);
1844 CHECK_EQ(cb2.getEvents()->at(1).type,
1845 TestAcceptCallback::TYPE_ACCEPT);
1846 CHECK_EQ(cb2.getEvents()->at(2).type,
1847 TestAcceptCallback::TYPE_ACCEPT);
1848 CHECK_EQ(cb2.getEvents()->at(3).type,
1849 TestAcceptCallback::TYPE_STOP);
1851 CHECK_EQ(cb3.getEvents()->size(), 2);
1852 CHECK_EQ(cb3.getEvents()->at(0).type,
1853 TestAcceptCallback::TYPE_START);
1854 CHECK_EQ(cb3.getEvents()->at(1).type,
1855 TestAcceptCallback::TYPE_STOP);
1857 CHECK_EQ(cb4.getEvents()->size(), 3);
1858 CHECK_EQ(cb4.getEvents()->at(0).type,
1859 TestAcceptCallback::TYPE_START);
1860 CHECK_EQ(cb4.getEvents()->at(1).type,
1861 TestAcceptCallback::TYPE_ACCEPT);
1862 CHECK_EQ(cb4.getEvents()->at(2).type,
1863 TestAcceptCallback::TYPE_STOP);
1865 CHECK_EQ(cb5.getEvents()->size(), 2);
1866 CHECK_EQ(cb5.getEvents()->at(0).type,
1867 TestAcceptCallback::TYPE_START);
1868 CHECK_EQ(cb5.getEvents()->at(1).type,
1869 TestAcceptCallback::TYPE_STOP);
1871 CHECK_EQ(cb6.getEvents()->size(), 4);
1872 CHECK_EQ(cb6.getEvents()->at(0).type,
1873 TestAcceptCallback::TYPE_START);
1874 CHECK_EQ(cb6.getEvents()->at(1).type,
1875 TestAcceptCallback::TYPE_ACCEPT);
1876 CHECK_EQ(cb6.getEvents()->at(2).type,
1877 TestAcceptCallback::TYPE_ACCEPT);
1878 CHECK_EQ(cb6.getEvents()->at(3).type,
1879 TestAcceptCallback::TYPE_STOP);
1881 CHECK_EQ(cb7.getEvents()->size(), 3);
1882 CHECK_EQ(cb7.getEvents()->at(0).type,
1883 TestAcceptCallback::TYPE_START);
1884 CHECK_EQ(cb7.getEvents()->at(1).type,
1885 TestAcceptCallback::TYPE_ACCEPT);
1886 CHECK_EQ(cb7.getEvents()->at(2).type,
1887 TestAcceptCallback::TYPE_STOP);
1891 * Test AsyncServerSocket::removeAcceptCallback()
1893 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1894 // Create a new AsyncServerSocket
1895 EventBase eventBase;
1896 std::shared_ptr<AsyncServerSocket> serverSocket(
1897 AsyncServerSocket::newSocket(&eventBase));
1898 serverSocket->bind(0);
1899 serverSocket->listen(16);
1900 folly::SocketAddress serverAddress;
1901 serverSocket->getAddress(&serverAddress);
1903 // Add several accept callbacks
1904 TestAcceptCallback cb1;
1905 auto thread_id = pthread_self();
1906 cb1.setAcceptStartedFn([&](){
1907 CHECK_NE(thread_id, pthread_self());
1908 thread_id = pthread_self();
1910 cb1.setConnectionAcceptedFn(
1911 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1912 CHECK_EQ(thread_id, pthread_self());
1913 serverSocket->removeAcceptCallback(&cb1, nullptr);
1915 cb1.setAcceptStoppedFn([&](){
1916 CHECK_EQ(thread_id, pthread_self());
1919 // Test having callbacks remove other callbacks before them on the list,
1920 serverSocket->addAcceptCallback(&cb1, nullptr);
1921 serverSocket->startAccepting();
1923 // Make several connections to the socket
1924 std::shared_ptr<AsyncSocket> sock1(
1925 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1927 // Loop in another thread
1928 auto other = std::thread([&](){
1933 // Check to make sure that the expected callbacks were invoked.
1935 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1936 // the AcceptCallbacks in round-robin fashion, in the order that they were
1937 // added. The code is implemented this way right now, but the API doesn't
1938 // explicitly require it be done this way. If we change the code not to be
1939 // exactly round robin in the future, we can simplify the test checks here.
1940 // (We'll also need to update the termination code, since we expect cb6 to
1941 // get called twice to terminate the loop.)
1942 CHECK_EQ(cb1.getEvents()->size(), 3);
1943 CHECK_EQ(cb1.getEvents()->at(0).type,
1944 TestAcceptCallback::TYPE_START);
1945 CHECK_EQ(cb1.getEvents()->at(1).type,
1946 TestAcceptCallback::TYPE_ACCEPT);
1947 CHECK_EQ(cb1.getEvents()->at(2).type,
1948 TestAcceptCallback::TYPE_STOP);
1952 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1953 // Add a callback to accept one connection then stop accepting
1954 TestAcceptCallback acceptCallback;
1955 acceptCallback.setConnectionAcceptedFn(
1956 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
1957 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1959 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
1960 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1962 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1963 serverSocket->startAccepting();
1965 // Connect to the server socket
1966 EventBase* eventBase = serverSocket->getEventBase();
1967 folly::SocketAddress serverAddress;
1968 serverSocket->getAddress(&serverAddress);
1969 AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1971 // Loop to process all events
1974 // Verify that the server accepted a connection
1975 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1976 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1977 TestAcceptCallback::TYPE_START);
1978 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1979 TestAcceptCallback::TYPE_ACCEPT);
1980 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1981 TestAcceptCallback::TYPE_STOP);
1984 /* Verify that we don't leak sockets if we are destroyed()
1985 * and there are still writes pending
1987 * If destroy() only calls close() instead of closeNow(),
1988 * it would shutdown(writes) on the socket, but it would
1989 * never be close()'d, and the socket would leak
1991 TEST(AsyncSocketTest, DestroyCloseTest) {
1997 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1999 socket->connect(&ccb, server.getAddress(), 30);
2001 // Accept the connection
2002 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
2004 acceptedSocket->setReadCB(&rcb);
2006 // Write a large buffer to the socket that is larger than kernel buffer
2007 size_t simpleBufLength = 5000000;
2008 char* simpleBuf = new char[simpleBufLength];
2009 memset(simpleBuf, 'a', simpleBufLength);
2012 // Let the reads and writes run to completion
2013 int fd = acceptedSocket->getFd();
2015 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2017 acceptedSocket.reset();
2019 // Test that server socket was closed
2020 ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2027 * Test AsyncServerSocket::useExistingSocket()
2029 TEST(AsyncSocketTest, ServerExistingSocket) {
2030 EventBase eventBase;
2032 // Test creating a socket, and letting AsyncServerSocket bind and listen
2034 // Manually create a socket
2035 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2038 // Create a server socket
2039 AsyncServerSocket::UniquePtr serverSocket(
2040 new AsyncServerSocket(&eventBase));
2041 serverSocket->useExistingSocket(fd);
2042 folly::SocketAddress address;
2043 serverSocket->getAddress(&address);
2045 serverSocket->bind(address);
2046 serverSocket->listen(16);
2048 // Make sure the socket works
2049 serverSocketSanityTest(serverSocket.get());
2052 // Test creating a socket and binding manually,
2053 // then letting AsyncServerSocket listen
2055 // Manually create a socket
2056 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2059 struct sockaddr_in addr;
2060 addr.sin_family = AF_INET;
2062 addr.sin_addr.s_addr = INADDR_ANY;
2063 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2065 // Look up the address that we bound to
2066 folly::SocketAddress boundAddress;
2067 boundAddress.setFromLocalAddress(fd);
2069 // Create a server socket
2070 AsyncServerSocket::UniquePtr serverSocket(
2071 new AsyncServerSocket(&eventBase));
2072 serverSocket->useExistingSocket(fd);
2073 serverSocket->listen(16);
2075 // Make sure AsyncServerSocket reports the same address that we bound to
2076 folly::SocketAddress serverSocketAddress;
2077 serverSocket->getAddress(&serverSocketAddress);
2078 CHECK_EQ(boundAddress, serverSocketAddress);
2080 // Make sure the socket works
2081 serverSocketSanityTest(serverSocket.get());
2084 // Test creating a socket, binding and listening manually,
2085 // then giving it to AsyncServerSocket
2087 // Manually create a socket
2088 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2091 struct sockaddr_in addr;
2092 addr.sin_family = AF_INET;
2094 addr.sin_addr.s_addr = INADDR_ANY;
2095 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2097 // Look up the address that we bound to
2098 folly::SocketAddress boundAddress;
2099 boundAddress.setFromLocalAddress(fd);
2101 CHECK_EQ(listen(fd, 16), 0);
2103 // Create a server socket
2104 AsyncServerSocket::UniquePtr serverSocket(
2105 new AsyncServerSocket(&eventBase));
2106 serverSocket->useExistingSocket(fd);
2108 // Make sure AsyncServerSocket reports the same address that we bound to
2109 folly::SocketAddress serverSocketAddress;
2110 serverSocket->getAddress(&serverSocketAddress);
2111 CHECK_EQ(boundAddress, serverSocketAddress);
2113 // Make sure the socket works
2114 serverSocketSanityTest(serverSocket.get());
2118 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2119 EventBase eventBase;
2121 // Create a server socket
2122 std::shared_ptr<AsyncServerSocket> serverSocket(
2123 AsyncServerSocket::newSocket(&eventBase));
2125 path.append("/anonymous");
2126 folly::SocketAddress serverAddress;
2127 serverAddress.setFromPath(path);
2128 serverSocket->bind(serverAddress);
2129 serverSocket->listen(16);
2131 // Add a callback to accept one connection then stop the loop
2132 TestAcceptCallback acceptCallback;
2133 acceptCallback.setConnectionAcceptedFn(
2134 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2135 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2137 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2138 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2140 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2141 serverSocket->startAccepting();
2143 // Connect to the server socket
2144 std::shared_ptr<AsyncSocket> socket(
2145 AsyncSocket::newSocket(&eventBase, serverAddress));
2149 // Verify that the server accepted a connection
2150 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2151 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2152 TestAcceptCallback::TYPE_START);
2153 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2154 TestAcceptCallback::TYPE_ACCEPT);
2155 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2156 TestAcceptCallback::TYPE_STOP);
2157 int fd = acceptCallback.getEvents()->at(1).fd;
2159 // The accepted connection should already be in non-blocking mode
2160 int flags = fcntl(fd, F_GETFL, 0);
2161 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2164 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2165 EventBase eventBase;
2166 TestConnectionEventCallback connectionEventCallback;
2168 // Create a server socket
2169 std::shared_ptr<AsyncServerSocket> serverSocket(
2170 AsyncServerSocket::newSocket(&eventBase));
2171 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2172 serverSocket->bind(0);
2173 serverSocket->listen(16);
2174 folly::SocketAddress serverAddress;
2175 serverSocket->getAddress(&serverAddress);
2177 // Add a callback to accept one connection then stop the loop
2178 TestAcceptCallback acceptCallback;
2179 acceptCallback.setConnectionAcceptedFn(
2180 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2181 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2183 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2184 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2186 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2187 serverSocket->startAccepting();
2189 // Connect to the server socket
2190 std::shared_ptr<AsyncSocket> socket(
2191 AsyncSocket::newSocket(&eventBase, serverAddress));
2195 // Validate the connection event counters
2196 ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2197 ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2198 ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2200 connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2201 ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2202 ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2203 ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2204 ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2208 * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2210 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2211 EventBase eventBase;
2213 // Counter of how many connections have been accepted
2216 // Create a server socket
2217 auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2218 serverSocket->bind(0);
2219 serverSocket->listen(16);
2220 folly::SocketAddress serverAddress;
2221 serverSocket->getAddress(&serverAddress);
2223 // Add a callback to accept connections
2224 TestAcceptCallback acceptCallback;
2225 acceptCallback.setConnectionAcceptedFn(
2226 [&](int /* fd */, const folly::SocketAddress& /* addr */) {
2228 CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2231 // all messages are processed, remove accept callback
2232 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2235 acceptCallback.setAcceptErrorFn([&](const std::exception& /* ex */) {
2236 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2238 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2239 serverSocket->startAccepting();
2241 // Connect to the server socket, 4 clients, there are 4 connections
2242 auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2243 auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2244 auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2245 auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2251 * Test AsyncTransport::BufferCallback
2253 TEST(AsyncSocketTest, BufferTest) {
2257 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2258 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2260 socket->connect(&ccb, server.getAddress(), 30, option);
2262 char buf[100 * 1024];
2263 memset(buf, 'c', sizeof(buf));
2266 socket->setBufferCallback(&bcb);
2267 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE);
2270 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2271 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
2273 ASSERT_TRUE(bcb.hasBuffered());
2274 ASSERT_TRUE(bcb.hasBufferCleared());
2277 server.verifyConnection(buf, sizeof(buf));
2279 ASSERT_TRUE(socket->isClosedBySelf());
2280 ASSERT_FALSE(socket->isClosedByPeer());