2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/RWSpinLock.h>
21 #include <folly/SocketAddress.h>
23 #include <folly/io/IOBuf.h>
24 #include <folly/io/async/test/AsyncSocketTest.h>
25 #include <folly/io/async/test/Util.h>
26 #include <folly/test/SocketAddressTestHelper.h>
28 #include <gtest/gtest.h>
29 #include <boost/scoped_array.hpp>
34 #include <sys/types.h>
35 #include <sys/socket.h>
36 #include <netinet/tcp.h>
39 using namespace boost;
46 using std::unique_ptr;
47 using std::chrono::milliseconds;
48 using boost::scoped_array;
50 using namespace folly;
52 class DelayedWrite: public AsyncTimeout {
54 DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
55 unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
56 bool cork, bool lastWrite = false):
57 AsyncTimeout(socket->getEventBase()),
59 bufs_(std::move(bufs)),
62 lastWrite_(lastWrite) {}
65 void timeoutExpired() noexcept override {
66 WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
67 socket_->writeChain(wcb_, std::move(bufs_), flags);
69 socket_->shutdownWrite();
73 std::shared_ptr<AsyncSocket> socket_;
74 unique_ptr<IOBuf> bufs_;
75 AsyncTransportWrapper::WriteCallback* wcb_;
80 ///////////////////////////////////////////////////////////////////////////
82 ///////////////////////////////////////////////////////////////////////////
85 * Test connecting to a server
87 TEST(AsyncSocketTest, Connect) {
88 // Start listening on a local port
91 // Connect using a AsyncSocket
93 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
95 socket->connect(&cb, server.getAddress(), 30);
99 CHECK_EQ(cb.state, STATE_SUCCEEDED);
100 EXPECT_LE(0, socket->getConnectTime().count());
104 * Test connecting to a server that isn't listening
106 TEST(AsyncSocketTest, ConnectRefused) {
109 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
111 // Hopefully nothing is actually listening on this address
112 folly::SocketAddress addr("127.0.0.1", 65535);
114 socket->connect(&cb, addr, 30);
118 CHECK_EQ(cb.state, STATE_FAILED);
119 CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
120 EXPECT_LE(0, socket->getConnectTime().count());
124 * Test connection timeout
126 TEST(AsyncSocketTest, ConnectTimeout) {
129 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
131 // Try connecting to server that won't respond.
133 // This depends somewhat on the network where this test is run.
134 // Hopefully this IP will be routable but unresponsive.
135 // (Alternatively, we could try listening on a local raw socket, but that
136 // normally requires root privileges.)
138 SocketAddressTestHelper::isIPv6Enabled() ?
139 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
140 SocketAddressTestHelper::isIPv4Enabled() ?
141 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
143 SocketAddress addr(host, 65535);
145 socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
149 CHECK_EQ(cb.state, STATE_FAILED);
150 CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
152 // Verify that we can still get the peer address after a timeout.
153 // Use case is if the client was created from a client pool, and we want
154 // to log which peer failed.
155 folly::SocketAddress peer;
156 socket->getPeerAddress(&peer);
157 CHECK_EQ(peer, addr);
158 EXPECT_LE(0, socket->getConnectTime().count());
162 * Test writing immediately after connecting, without waiting for connect
165 TEST(AsyncSocketTest, ConnectAndWrite) {
170 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
172 socket->connect(&ccb, server.getAddress(), 30);
176 memset(buf, 'a', sizeof(buf));
178 socket->write(&wcb, buf, sizeof(buf));
180 // Loop. We don't bother accepting on the server socket yet.
181 // The kernel should be able to buffer the write request so it can succeed.
184 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
185 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
187 // Make sure the server got a connection and received the data
189 server.verifyConnection(buf, sizeof(buf));
191 ASSERT_TRUE(socket->isClosedBySelf());
192 ASSERT_FALSE(socket->isClosedByPeer());
196 * Test connecting using a nullptr connect callback.
198 TEST(AsyncSocketTest, ConnectNullCallback) {
203 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
204 socket->connect(nullptr, server.getAddress(), 30);
206 // write some data, just so we have some way of verifing
207 // that the socket works correctly after connecting
209 memset(buf, 'a', sizeof(buf));
211 socket->write(&wcb, buf, sizeof(buf));
215 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
217 // Make sure the server got a connection and received the data
219 server.verifyConnection(buf, sizeof(buf));
221 ASSERT_TRUE(socket->isClosedBySelf());
222 ASSERT_FALSE(socket->isClosedByPeer());
226 * Test calling both write() and close() immediately after connecting, without
227 * waiting for connect to finish.
229 * This exercises the STATE_CONNECTING_CLOSING code.
231 TEST(AsyncSocketTest, ConnectWriteAndClose) {
236 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
238 socket->connect(&ccb, server.getAddress(), 30);
242 memset(buf, 'a', sizeof(buf));
244 socket->write(&wcb, buf, sizeof(buf));
249 // Loop. We don't bother accepting on the server socket yet.
250 // The kernel should be able to buffer the write request so it can succeed.
253 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
254 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
256 // Make sure the server got a connection and received the data
257 server.verifyConnection(buf, sizeof(buf));
259 ASSERT_TRUE(socket->isClosedBySelf());
260 ASSERT_FALSE(socket->isClosedByPeer());
264 * Test calling close() immediately after connect()
266 TEST(AsyncSocketTest, ConnectAndClose) {
269 // Connect using a AsyncSocket
271 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
273 socket->connect(&ccb, server.getAddress(), 30);
275 // Hopefully the connect didn't succeed immediately.
276 // If it did, we can't exercise the close-while-connecting code path.
277 if (ccb.state == STATE_SUCCEEDED) {
278 LOG(INFO) << "connect() succeeded immediately; aborting test "
279 "of close-during-connect behavior";
285 // Loop, although there shouldn't be anything to do.
288 // Make sure the connection was aborted
289 CHECK_EQ(ccb.state, STATE_FAILED);
291 ASSERT_TRUE(socket->isClosedBySelf());
292 ASSERT_FALSE(socket->isClosedByPeer());
296 * Test calling closeNow() immediately after connect()
298 * This should be identical to the normal close behavior.
300 TEST(AsyncSocketTest, ConnectAndCloseNow) {
303 // Connect using a AsyncSocket
305 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
307 socket->connect(&ccb, server.getAddress(), 30);
309 // Hopefully the connect didn't succeed immediately.
310 // If it did, we can't exercise the close-while-connecting code path.
311 if (ccb.state == STATE_SUCCEEDED) {
312 LOG(INFO) << "connect() succeeded immediately; aborting test "
313 "of closeNow()-during-connect behavior";
319 // Loop, although there shouldn't be anything to do.
322 // Make sure the connection was aborted
323 CHECK_EQ(ccb.state, STATE_FAILED);
325 ASSERT_TRUE(socket->isClosedBySelf());
326 ASSERT_FALSE(socket->isClosedByPeer());
330 * Test calling both write() and closeNow() immediately after connecting,
331 * without waiting for connect to finish.
333 * This should abort the pending write.
335 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
340 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
342 socket->connect(&ccb, server.getAddress(), 30);
344 // Hopefully the connect didn't succeed immediately.
345 // If it did, we can't exercise the close-while-connecting code path.
346 if (ccb.state == STATE_SUCCEEDED) {
347 LOG(INFO) << "connect() succeeded immediately; aborting test "
348 "of write-during-connect behavior";
354 memset(buf, 'a', sizeof(buf));
356 socket->write(&wcb, buf, sizeof(buf));
361 // Loop, although there shouldn't be anything to do.
364 CHECK_EQ(ccb.state, STATE_FAILED);
365 CHECK_EQ(wcb.state, STATE_FAILED);
367 ASSERT_TRUE(socket->isClosedBySelf());
368 ASSERT_FALSE(socket->isClosedByPeer());
372 * Test installing a read callback immediately, before connect() finishes.
374 TEST(AsyncSocketTest, ConnectAndRead) {
379 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
381 socket->connect(&ccb, server.getAddress(), 30);
384 socket->setReadCB(&rcb);
386 // Even though we haven't looped yet, we should be able to accept
387 // the connection and send data to it.
388 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
390 memset(buf, 'a', sizeof(buf));
391 acceptedSocket->write(buf, sizeof(buf));
392 acceptedSocket->flush();
393 acceptedSocket->close();
395 // Loop, although there shouldn't be anything to do.
398 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
399 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
400 CHECK_EQ(rcb.buffers.size(), 1);
401 CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
402 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
404 ASSERT_FALSE(socket->isClosedBySelf());
405 ASSERT_FALSE(socket->isClosedByPeer());
409 * Test installing a read callback and then closing immediately before the
410 * connect attempt finishes.
412 TEST(AsyncSocketTest, ConnectReadAndClose) {
417 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
419 socket->connect(&ccb, server.getAddress(), 30);
421 // Hopefully the connect didn't succeed immediately.
422 // If it did, we can't exercise the close-while-connecting code path.
423 if (ccb.state == STATE_SUCCEEDED) {
424 LOG(INFO) << "connect() succeeded immediately; aborting test "
425 "of read-during-connect behavior";
430 socket->setReadCB(&rcb);
435 // Loop, although there shouldn't be anything to do.
438 CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
439 CHECK_EQ(rcb.buffers.size(), 0);
440 CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
442 ASSERT_TRUE(socket->isClosedBySelf());
443 ASSERT_FALSE(socket->isClosedByPeer());
447 * Test both writing and installing a read callback immediately,
448 * before connect() finishes.
450 TEST(AsyncSocketTest, ConnectWriteAndRead) {
455 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
457 socket->connect(&ccb, server.getAddress(), 30);
461 memset(buf1, 'a', sizeof(buf1));
463 socket->write(&wcb, buf1, sizeof(buf1));
465 // set a read callback
467 socket->setReadCB(&rcb);
469 // Even though we haven't looped yet, we should be able to accept
470 // the connection and send data to it.
471 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
473 memset(buf2, 'b', sizeof(buf2));
474 acceptedSocket->write(buf2, sizeof(buf2));
475 acceptedSocket->flush();
477 // shut down the write half of acceptedSocket, so that the AsyncSocket
478 // will stop reading and we can break out of the event loop.
479 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
484 // Make sure the connect succeeded
485 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
487 // Make sure the AsyncSocket read the data written by the accepted socket
488 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
489 CHECK_EQ(rcb.buffers.size(), 1);
490 CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
491 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
493 // Close the AsyncSocket so we'll see EOF on acceptedSocket
496 // Make sure the accepted socket saw the data written by the AsyncSocket
497 uint8_t readbuf[sizeof(buf1)];
498 acceptedSocket->readAll(readbuf, sizeof(readbuf));
499 CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
500 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
501 CHECK_EQ(bytesRead, 0);
503 ASSERT_FALSE(socket->isClosedBySelf());
504 ASSERT_TRUE(socket->isClosedByPeer());
508 * Test writing to the socket then shutting down writes before the connect
511 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
516 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
518 socket->connect(&ccb, server.getAddress(), 30);
520 // Hopefully the connect didn't succeed immediately.
521 // If it did, we can't exercise the write-while-connecting code path.
522 if (ccb.state == STATE_SUCCEEDED) {
523 LOG(INFO) << "connect() succeeded immediately; skipping test";
527 // Ask to write some data
529 memset(wbuf, 'a', sizeof(wbuf));
531 socket->write(&wcb, wbuf, sizeof(wbuf));
532 socket->shutdownWrite();
535 socket->shutdownWrite();
537 // Even though we haven't looped yet, we should be able to accept
539 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
541 // Since the connection is still in progress, there should be no data to
542 // read yet. Verify that the accepted socket is not readable.
543 struct pollfd fds[1];
544 fds[0].fd = acceptedSocket->getSocketFD();
545 fds[0].events = POLLIN;
547 int rc = poll(fds, 1, 0);
550 // Write data to the accepted socket
551 uint8_t acceptedWbuf[192];
552 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
553 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
554 acceptedSocket->flush();
559 // The loop should have completed the connection, written the queued data,
560 // and shutdown writes on the socket.
562 // Check that the connection was completed successfully and that the write
563 // callback succeeded.
564 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
565 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
567 // Check that we can read the data that was written to the socket, and that
568 // we see an EOF, since its socket was half-shutdown.
569 uint8_t readbuf[sizeof(wbuf)];
570 acceptedSocket->readAll(readbuf, sizeof(readbuf));
571 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
572 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
573 CHECK_EQ(bytesRead, 0);
575 // Close the accepted socket. This will cause it to see EOF
576 // and uninstall the read callback when we loop next.
577 acceptedSocket->close();
579 // Install a read callback, then loop again.
581 socket->setReadCB(&rcb);
584 // This loop should have read the data and seen the EOF
585 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
586 CHECK_EQ(rcb.buffers.size(), 1);
587 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
588 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
589 acceptedWbuf, sizeof(acceptedWbuf)), 0);
591 ASSERT_FALSE(socket->isClosedBySelf());
592 ASSERT_FALSE(socket->isClosedByPeer());
596 * Test reading, writing, and shutting down writes before the connect attempt
599 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
604 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
606 socket->connect(&ccb, server.getAddress(), 30);
608 // Hopefully the connect didn't succeed immediately.
609 // If it did, we can't exercise the write-while-connecting code path.
610 if (ccb.state == STATE_SUCCEEDED) {
611 LOG(INFO) << "connect() succeeded immediately; skipping test";
615 // Install a read callback
617 socket->setReadCB(&rcb);
619 // Ask to write some data
621 memset(wbuf, 'a', sizeof(wbuf));
623 socket->write(&wcb, wbuf, sizeof(wbuf));
626 socket->shutdownWrite();
628 // Even though we haven't looped yet, we should be able to accept
630 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
632 // Since the connection is still in progress, there should be no data to
633 // read yet. Verify that the accepted socket is not readable.
634 struct pollfd fds[1];
635 fds[0].fd = acceptedSocket->getSocketFD();
636 fds[0].events = POLLIN;
638 int rc = poll(fds, 1, 0);
641 // Write data to the accepted socket
642 uint8_t acceptedWbuf[192];
643 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
644 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
645 acceptedSocket->flush();
646 // Shutdown writes to the accepted socket. This will cause it to see EOF
647 // and uninstall the read callback.
648 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
653 // The loop should have completed the connection, written the queued data,
654 // shutdown writes on the socket, read the data we wrote to it, and see the
657 // Check that the connection was completed successfully and that the read
658 // and write callbacks were invoked as expected.
659 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
660 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
661 CHECK_EQ(rcb.buffers.size(), 1);
662 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
663 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
664 acceptedWbuf, sizeof(acceptedWbuf)), 0);
665 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
667 // Check that we can read the data that was written to the socket, and that
668 // we see an EOF, since its socket was half-shutdown.
669 uint8_t readbuf[sizeof(wbuf)];
670 acceptedSocket->readAll(readbuf, sizeof(readbuf));
671 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
672 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
673 CHECK_EQ(bytesRead, 0);
675 // Fully close both sockets
676 acceptedSocket->close();
679 ASSERT_FALSE(socket->isClosedBySelf());
680 ASSERT_TRUE(socket->isClosedByPeer());
684 * Test reading, writing, and calling shutdownWriteNow() before the
685 * connect attempt finishes.
687 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
692 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
694 socket->connect(&ccb, server.getAddress(), 30);
696 // Hopefully the connect didn't succeed immediately.
697 // If it did, we can't exercise the write-while-connecting code path.
698 if (ccb.state == STATE_SUCCEEDED) {
699 LOG(INFO) << "connect() succeeded immediately; skipping test";
703 // Install a read callback
705 socket->setReadCB(&rcb);
707 // Ask to write some data
709 memset(wbuf, 'a', sizeof(wbuf));
711 socket->write(&wcb, wbuf, sizeof(wbuf));
713 // Shutdown writes immediately.
714 // This should immediately discard the data that we just tried to write.
715 socket->shutdownWriteNow();
717 // Verify that writeError() was invoked on the write callback.
718 CHECK_EQ(wcb.state, STATE_FAILED);
719 CHECK_EQ(wcb.bytesWritten, 0);
721 // Even though we haven't looped yet, we should be able to accept
723 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
725 // Since the connection is still in progress, there should be no data to
726 // read yet. Verify that the accepted socket is not readable.
727 struct pollfd fds[1];
728 fds[0].fd = acceptedSocket->getSocketFD();
729 fds[0].events = POLLIN;
731 int rc = poll(fds, 1, 0);
734 // Write data to the accepted socket
735 uint8_t acceptedWbuf[192];
736 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
737 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
738 acceptedSocket->flush();
739 // Shutdown writes to the accepted socket. This will cause it to see EOF
740 // and uninstall the read callback.
741 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
746 // The loop should have completed the connection, written the queued data,
747 // shutdown writes on the socket, read the data we wrote to it, and see the
750 // Check that the connection was completed successfully and that the read
751 // callback was invoked as expected.
752 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
753 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
754 CHECK_EQ(rcb.buffers.size(), 1);
755 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
756 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
757 acceptedWbuf, sizeof(acceptedWbuf)), 0);
759 // Since we used shutdownWriteNow(), it should have discarded all pending
760 // write data. Verify we see an immediate EOF when reading from the accepted
762 uint8_t readbuf[sizeof(wbuf)];
763 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
764 CHECK_EQ(bytesRead, 0);
766 // Fully close both sockets
767 acceptedSocket->close();
770 ASSERT_FALSE(socket->isClosedBySelf());
771 ASSERT_TRUE(socket->isClosedByPeer());
774 // Helper function for use in testConnectOptWrite()
775 // Temporarily disable the read callback
776 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
777 // Uninstall the read callback
778 socket->setReadCB(nullptr);
779 // Schedule the read callback to be reinstalled after 1ms
780 socket->getEventBase()->runInLoop(
781 std::bind(&AsyncSocket::setReadCB, socket, rcb));
785 * Test connect+write, then have the connect callback perform another write.
787 * This tests interaction of the optimistic writing after connect with
788 * additional write attempts that occur in the connect callback.
790 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
793 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
797 socket->connect(&ccb, server.getAddress(), 30);
799 // Hopefully the connect didn't succeed immediately.
800 // If it did, we can't exercise the optimistic write code path.
801 if (ccb.state == STATE_SUCCEEDED) {
802 LOG(INFO) << "connect() succeeded immediately; aborting test "
803 "of optimistic write behavior";
807 // Tell the connect callback to perform a write when the connect succeeds
809 scoped_array<char> buf2(new char[size2]);
810 memset(buf2.get(), 'b', size2);
812 ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
813 // Tell the second write callback to close the connection when it is done
814 wcb2.successCallback = [&] { socket->closeNow(); };
817 // Schedule one write() immediately, before the connect finishes
818 scoped_array<char> buf1(new char[size1]);
819 memset(buf1.get(), 'a', size1);
822 socket->write(&wcb1, buf1.get(), size1);
826 // immediately perform a close, before connect() completes
830 // Start reading from the other endpoint after 10ms.
831 // If we're using large buffers, we have to read so that the writes don't
833 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
835 rcb.dataAvailableCallback = std::bind(tmpDisableReads,
836 acceptedSocket.get(), &rcb);
837 socket->getEventBase()->tryRunAfterDelay(
838 std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
841 // Loop. We don't bother accepting on the server socket yet.
842 // The kernel should be able to buffer the write request so it can succeed.
845 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
847 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
850 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
855 // Make sure the read callback received all of the data
856 size_t bytesRead = 0;
857 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
858 it != rcb.buffers.end();
860 size_t start = bytesRead;
861 bytesRead += it->length;
862 size_t end = bytesRead;
864 size_t cmpLen = min(size1, end) - start;
865 CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
867 if (end > size1 && end <= size1 + size2) {
871 if (start >= size1) {
873 buf2Offset = start - size1;
874 cmpLen = end - start;
876 itOffset = size1 - start;
878 cmpLen = end - size1;
880 CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
885 CHECK_EQ(bytesRead, size1 + size2);
888 TEST(AsyncSocketTest, ConnectCallbackWrite) {
889 // Test using small writes that should both succeed immediately
890 testConnectOptWrite(100, 200);
892 // Test using a large buffer in the connect callback, that should block
893 const size_t largeSize = 8*1024*1024;
894 testConnectOptWrite(100, largeSize);
896 // Test using a large initial write
897 testConnectOptWrite(largeSize, 100);
899 // Test using two large buffers
900 testConnectOptWrite(largeSize, largeSize);
902 // Test a small write in the connect callback,
903 // but no immediate write before connect completes
904 testConnectOptWrite(0, 64);
906 // Test a large write in the connect callback,
907 // but no immediate write before connect completes
908 testConnectOptWrite(0, largeSize);
910 // Test connect, a small write, then immediately call close() before connect
912 testConnectOptWrite(211, 0, true);
914 // Test connect, a large immediate write (that will block), then immediately
915 // call close() before connect completes
916 testConnectOptWrite(largeSize, 0, true);
919 ///////////////////////////////////////////////////////////////////////////
920 // write() related tests
921 ///////////////////////////////////////////////////////////////////////////
924 * Test writing using a nullptr callback
926 TEST(AsyncSocketTest, WriteNullCallback) {
931 std::shared_ptr<AsyncSocket> socket =
932 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
933 evb.loop(); // loop until the socket is connected
935 // write() with a nullptr callback
937 memset(buf, 'a', sizeof(buf));
938 socket->write(nullptr, buf, sizeof(buf));
940 evb.loop(); // loop until the data is sent
942 // Make sure the server got a connection and received the data
944 server.verifyConnection(buf, sizeof(buf));
946 ASSERT_TRUE(socket->isClosedBySelf());
947 ASSERT_FALSE(socket->isClosedByPeer());
951 * Test writing with a send timeout
953 TEST(AsyncSocketTest, WriteTimeout) {
958 std::shared_ptr<AsyncSocket> socket =
959 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
960 evb.loop(); // loop until the socket is connected
962 // write() a large chunk of data, with no-one on the other end reading
963 size_t writeLength = 8*1024*1024;
964 uint32_t timeout = 200;
965 socket->setSendTimeout(timeout);
966 scoped_array<char> buf(new char[writeLength]);
967 memset(buf.get(), 'a', writeLength);
969 socket->write(&wcb, buf.get(), writeLength);
975 // Make sure the write attempt timed out as requested
976 CHECK_EQ(wcb.state, STATE_FAILED);
977 CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
979 // Check that the write timed out within a reasonable period of time.
980 // We don't check for exactly the specified timeout, since AsyncSocket only
981 // times out when it hasn't made progress for that period of time.
983 // On linux, the first write sends a few hundred kb of data, then blocks for
984 // writability, and then unblocks again after 40ms and is able to write
985 // another smaller of data before blocking permanently. Therefore it doesn't
986 // time out until 40ms + timeout.
988 // I haven't fully verified the cause of this, but I believe it probably
989 // occurs because the receiving end delays sending an ack for up to 40ms.
990 // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
991 // the ack, it can send some more data. However, after that point the
992 // receiver's kernel buffer is full. This 40ms delay happens even with
993 // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
994 // kernel may be automatically disabling TCP_QUICKACK after receiving some
997 // For now, we simply check that the timeout occurred within 160ms of
998 // the requested value.
999 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
1003 * Test writing to a socket that the remote endpoint has closed
1005 TEST(AsyncSocketTest, WritePipeError) {
1010 std::shared_ptr<AsyncSocket> socket =
1011 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1012 socket->setSendTimeout(1000);
1013 evb.loop(); // loop until the socket is connected
1015 // accept and immediately close the socket
1016 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1017 acceptedSocket.reset();
1019 // write() a large chunk of data
1020 size_t writeLength = 8*1024*1024;
1021 scoped_array<char> buf(new char[writeLength]);
1022 memset(buf.get(), 'a', writeLength);
1024 socket->write(&wcb, buf.get(), writeLength);
1028 // Make sure the write failed.
1029 // It would be nice if AsyncSocketException could convey the errno value,
1030 // so that we could check for EPIPE
1031 CHECK_EQ(wcb.state, STATE_FAILED);
1032 CHECK_EQ(wcb.exception.getType(),
1033 AsyncSocketException::INTERNAL_ERROR);
1035 ASSERT_FALSE(socket->isClosedBySelf());
1036 ASSERT_FALSE(socket->isClosedByPeer());
1040 * Test writing a mix of simple buffers and IOBufs
1042 TEST(AsyncSocketTest, WriteIOBuf) {
1047 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1049 socket->connect(&ccb, server.getAddress(), 30);
1051 // Accept the connection
1052 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1054 acceptedSocket->setReadCB(&rcb);
1056 // Write a simple buffer to the socket
1057 size_t simpleBufLength = 5;
1058 char simpleBuf[simpleBufLength];
1059 memset(simpleBuf, 'a', simpleBufLength);
1061 socket->write(&wcb, simpleBuf, simpleBufLength);
1063 // Write a single-element IOBuf chain
1064 size_t buf1Length = 7;
1065 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1066 memset(buf1->writableData(), 'b', buf1Length);
1067 buf1->append(buf1Length);
1068 unique_ptr<IOBuf> buf1Copy(buf1->clone());
1070 socket->writeChain(&wcb2, std::move(buf1));
1072 // Write a multiple-element IOBuf chain
1073 size_t buf2Length = 11;
1074 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1075 memset(buf2->writableData(), 'c', buf2Length);
1076 buf2->append(buf2Length);
1077 size_t buf3Length = 13;
1078 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1079 memset(buf3->writableData(), 'd', buf3Length);
1080 buf3->append(buf3Length);
1081 buf2->appendChain(std::move(buf3));
1082 unique_ptr<IOBuf> buf2Copy(buf2->clone());
1083 buf2Copy->coalesce();
1085 socket->writeChain(&wcb3, std::move(buf2));
1086 socket->shutdownWrite();
1088 // Let the reads and writes run to completion
1091 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1092 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1093 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1095 // Make sure the reader got the right data in the right order
1096 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1097 CHECK_EQ(rcb.buffers.size(), 1);
1098 CHECK_EQ(rcb.buffers[0].length,
1099 simpleBufLength + buf1Length + buf2Length + buf3Length);
1101 memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1103 memcmp(rcb.buffers[0].buffer + simpleBufLength,
1104 buf1Copy->data(), buf1Copy->length()), 0);
1106 memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1107 buf2Copy->data(), buf2Copy->length()), 0);
1109 acceptedSocket->close();
1112 ASSERT_TRUE(socket->isClosedBySelf());
1113 ASSERT_FALSE(socket->isClosedByPeer());
1116 TEST(AsyncSocketTest, WriteIOBufCorked) {
1121 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1123 socket->connect(&ccb, server.getAddress(), 30);
1125 // Accept the connection
1126 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1128 acceptedSocket->setReadCB(&rcb);
1130 // Do three writes, 100ms apart, with the "cork" flag set
1131 // on the second write. The reader should see the first write
1132 // arrive by itself, followed by the second and third writes
1133 // arriving together.
1134 size_t buf1Length = 5;
1135 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1136 memset(buf1->writableData(), 'a', buf1Length);
1137 buf1->append(buf1Length);
1138 size_t buf2Length = 7;
1139 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1140 memset(buf2->writableData(), 'b', buf2Length);
1141 buf2->append(buf2Length);
1142 size_t buf3Length = 11;
1143 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1144 memset(buf3->writableData(), 'c', buf3Length);
1145 buf3->append(buf3Length);
1147 socket->writeChain(&wcb1, std::move(buf1));
1149 DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1150 write2.scheduleTimeout(100);
1152 DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1153 write3.scheduleTimeout(200);
1156 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1157 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1158 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1159 if (wcb3.state != STATE_SUCCEEDED) {
1160 throw(wcb3.exception);
1162 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1164 // Make sure the reader got the data with the right grouping
1165 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1166 CHECK_EQ(rcb.buffers.size(), 2);
1167 CHECK_EQ(rcb.buffers[0].length, buf1Length);
1168 CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1170 acceptedSocket->close();
1173 ASSERT_TRUE(socket->isClosedBySelf());
1174 ASSERT_FALSE(socket->isClosedByPeer());
1178 * Test performing a zero-length write
1180 TEST(AsyncSocketTest, ZeroLengthWrite) {
1185 std::shared_ptr<AsyncSocket> socket =
1186 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1187 evb.loop(); // loop until the socket is connected
1189 auto acceptedSocket = server.acceptAsync(&evb);
1191 acceptedSocket->setReadCB(&rcb);
1193 size_t len1 = 1024*1024;
1194 size_t len2 = 1024*1024;
1195 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1196 memset(buf.get(), 'a', len1);
1197 memset(buf.get(), 'b', len2);
1203 socket->write(&wcb1, buf.get(), 0);
1204 socket->write(&wcb2, buf.get(), len1);
1205 socket->write(&wcb3, buf.get() + len1, 0);
1206 socket->write(&wcb4, buf.get() + len1, len2);
1209 evb.loop(); // loop until the data is sent
1211 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1212 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1213 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1214 CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1215 rcb.verifyData(buf.get(), len1 + len2);
1217 ASSERT_TRUE(socket->isClosedBySelf());
1218 ASSERT_FALSE(socket->isClosedByPeer());
1221 TEST(AsyncSocketTest, ZeroLengthWritev) {
1226 std::shared_ptr<AsyncSocket> socket =
1227 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1228 evb.loop(); // loop until the socket is connected
1230 auto acceptedSocket = server.acceptAsync(&evb);
1232 acceptedSocket->setReadCB(&rcb);
1234 size_t len1 = 1024*1024;
1235 size_t len2 = 1024*1024;
1236 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1237 memset(buf.get(), 'a', len1);
1238 memset(buf.get(), 'b', len2);
1241 size_t iovCount = 4;
1242 struct iovec iov[iovCount];
1243 iov[0].iov_base = buf.get();
1244 iov[0].iov_len = len1;
1245 iov[1].iov_base = buf.get() + len1;
1247 iov[2].iov_base = buf.get() + len1;
1248 iov[2].iov_len = len2;
1249 iov[3].iov_base = buf.get() + len1 + len2;
1252 socket->writev(&wcb, iov, iovCount);
1254 evb.loop(); // loop until the data is sent
1256 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1257 rcb.verifyData(buf.get(), len1 + len2);
1259 ASSERT_TRUE(socket->isClosedBySelf());
1260 ASSERT_FALSE(socket->isClosedByPeer());
1263 ///////////////////////////////////////////////////////////////////////////
1264 // close() related tests
1265 ///////////////////////////////////////////////////////////////////////////
1268 * Test calling close() with pending writes when the socket is already closing.
1270 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1275 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1277 socket->connect(&ccb, server.getAddress(), 30);
1279 // accept the socket on the server side
1280 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1282 // Loop to ensure the connect has completed
1285 // Make sure we are connected
1286 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1288 // Schedule pending writes, until several write attempts have blocked
1290 memset(buf, 'a', sizeof(buf));
1291 typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1292 WriteCallbackVector writeCallbacks;
1294 writeCallbacks.reserve(5);
1295 while (writeCallbacks.size() < 5) {
1296 std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1298 socket->write(wcb.get(), buf, sizeof(buf));
1299 if (wcb->state == STATE_SUCCEEDED) {
1300 // Succeeded immediately. Keep performing more writes
1304 // This write is blocked.
1305 // Have the write callback call close() when writeError() is invoked
1306 wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1307 writeCallbacks.push_back(wcb);
1310 // Call closeNow() to immediately fail the pending writes
1313 // Make sure writeError() was invoked on all of the pending write callbacks
1314 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1315 it != writeCallbacks.end();
1317 CHECK_EQ((*it)->state, STATE_FAILED);
1320 ASSERT_TRUE(socket->isClosedBySelf());
1321 ASSERT_FALSE(socket->isClosedByPeer());
1324 ///////////////////////////////////////////////////////////////////////////
1325 // ImmediateRead related tests
1326 ///////////////////////////////////////////////////////////////////////////
1328 /* AsyncSocket use to verify immediate read works */
1329 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1331 bool immediateReadCalled = false;
1332 explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1334 void checkForImmediateRead() noexcept override {
1335 immediateReadCalled = true;
1336 AsyncSocket::handleRead();
1340 TEST(AsyncSocket, ConnectReadImmediateRead) {
1343 const size_t maxBufferSz = 100;
1344 const size_t maxReadsPerEvent = 1;
1345 const size_t expectedDataSz = maxBufferSz * 3;
1346 char expectedData[expectedDataSz];
1347 memset(expectedData, 'j', expectedDataSz);
1350 ReadCallback rcb(maxBufferSz);
1351 AsyncSocketImmediateRead socket(&evb);
1352 socket.connect(nullptr, server.getAddress(), 30);
1354 evb.loop(); // loop until the socket is connected
1356 socket.setReadCB(&rcb);
1357 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1358 socket.immediateReadCalled = false;
1360 auto acceptedSocket = server.acceptAsync(&evb);
1362 ReadCallback rcbServer;
1363 WriteCallback wcbServer;
1364 rcbServer.dataAvailableCallback = [&]() {
1365 if (rcbServer.dataRead() == expectedDataSz) {
1366 // write back all data read
1367 rcbServer.verifyData(expectedData, expectedDataSz);
1368 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1369 acceptedSocket->close();
1372 acceptedSocket->setReadCB(&rcbServer);
1376 socket.write(&wcb1, expectedData, expectedDataSz);
1378 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1379 rcb.verifyData(expectedData, expectedDataSz);
1380 CHECK_EQ(socket.immediateReadCalled, true);
1382 ASSERT_FALSE(socket.isClosedBySelf());
1383 ASSERT_FALSE(socket.isClosedByPeer());
1386 TEST(AsyncSocket, ConnectReadUninstallRead) {
1389 const size_t maxBufferSz = 100;
1390 const size_t maxReadsPerEvent = 1;
1391 const size_t expectedDataSz = maxBufferSz * 3;
1392 char expectedData[expectedDataSz];
1393 memset(expectedData, 'k', expectedDataSz);
1396 ReadCallback rcb(maxBufferSz);
1397 AsyncSocketImmediateRead socket(&evb);
1398 socket.connect(nullptr, server.getAddress(), 30);
1400 evb.loop(); // loop until the socket is connected
1402 socket.setReadCB(&rcb);
1403 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1404 socket.immediateReadCalled = false;
1406 auto acceptedSocket = server.acceptAsync(&evb);
1408 ReadCallback rcbServer;
1409 WriteCallback wcbServer;
1410 rcbServer.dataAvailableCallback = [&]() {
1411 if (rcbServer.dataRead() == expectedDataSz) {
1412 // write back all data read
1413 rcbServer.verifyData(expectedData, expectedDataSz);
1414 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1415 acceptedSocket->close();
1418 acceptedSocket->setReadCB(&rcbServer);
1420 rcb.dataAvailableCallback = [&]() {
1421 // we read data and reset readCB
1422 socket.setReadCB(nullptr);
1427 socket.write(&wcb, expectedData, expectedDataSz);
1429 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1431 /* we shoud've only read maxBufferSz data since readCallback_
1432 * was reset in dataAvailableCallback */
1433 CHECK_EQ(rcb.dataRead(), maxBufferSz);
1434 CHECK_EQ(socket.immediateReadCalled, false);
1436 ASSERT_FALSE(socket.isClosedBySelf());
1437 ASSERT_FALSE(socket.isClosedByPeer());
1441 // - Test connect() and have the connect callback set the read callback
1442 // - Test connect() and have the connect callback unset the read callback
1443 // - Test reading/writing/closing/destroying the socket in the connect callback
1444 // - Test reading/writing/closing/destroying the socket in the read callback
1445 // - Test reading/writing/closing/destroying the socket in the write callback
1446 // - Test one-way shutdown behavior
1447 // - Test changing the EventBase
1449 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1450 // in connectSuccess(), readDataAvailable(), writeSuccess()
1453 ///////////////////////////////////////////////////////////////////////////
1454 // AsyncServerSocket tests
1455 ///////////////////////////////////////////////////////////////////////////
1458 * Helper ConnectionEventCallback class for the test code.
1459 * It maintains counters protected by a spin lock.
1461 class TestConnectionEventCallback :
1462 public AsyncServerSocket::ConnectionEventCallback {
1464 virtual void onConnectionAccepted(
1466 const SocketAddress& addr) noexcept override {
1467 folly::RWSpinLock::WriteHolder holder(spinLock_);
1468 connectionAccepted_++;
1471 virtual void onConnectionAcceptError(const int err) noexcept override {
1472 folly::RWSpinLock::WriteHolder holder(spinLock_);
1473 connectionAcceptedError_++;
1476 virtual void onConnectionDropped(
1478 const SocketAddress& addr) noexcept override {
1479 folly::RWSpinLock::WriteHolder holder(spinLock_);
1480 connectionDropped_++;
1483 virtual void onConnectionEnqueuedForAcceptorCallback(
1485 const SocketAddress& addr) noexcept override {
1486 folly::RWSpinLock::WriteHolder holder(spinLock_);
1487 connectionEnqueuedForAcceptCallback_++;
1490 virtual void onConnectionDequeuedByAcceptorCallback(
1492 const SocketAddress& addr) noexcept override {
1493 folly::RWSpinLock::WriteHolder holder(spinLock_);
1494 connectionDequeuedByAcceptCallback_++;
1497 virtual void onBackoffStarted() noexcept override {
1498 folly::RWSpinLock::WriteHolder holder(spinLock_);
1502 virtual void onBackoffEnded() noexcept override {
1503 folly::RWSpinLock::WriteHolder holder(spinLock_);
1507 virtual void onBackoffError() noexcept override {
1508 folly::RWSpinLock::WriteHolder holder(spinLock_);
1512 unsigned int getConnectionAccepted() const {
1513 folly::RWSpinLock::ReadHolder holder(spinLock_);
1514 return connectionAccepted_;
1517 unsigned int getConnectionAcceptedError() const {
1518 folly::RWSpinLock::ReadHolder holder(spinLock_);
1519 return connectionAcceptedError_;
1522 unsigned int getConnectionDropped() const {
1523 folly::RWSpinLock::ReadHolder holder(spinLock_);
1524 return connectionDropped_;
1527 unsigned int getConnectionEnqueuedForAcceptCallback() const {
1528 folly::RWSpinLock::ReadHolder holder(spinLock_);
1529 return connectionEnqueuedForAcceptCallback_;
1532 unsigned int getConnectionDequeuedByAcceptCallback() const {
1533 folly::RWSpinLock::ReadHolder holder(spinLock_);
1534 return connectionDequeuedByAcceptCallback_;
1537 unsigned int getBackoffStarted() const {
1538 folly::RWSpinLock::ReadHolder holder(spinLock_);
1539 return backoffStarted_;
1542 unsigned int getBackoffEnded() const {
1543 folly::RWSpinLock::ReadHolder holder(spinLock_);
1544 return backoffEnded_;
1547 unsigned int getBackoffError() const {
1548 folly::RWSpinLock::ReadHolder holder(spinLock_);
1549 return backoffError_;
1553 mutable folly::RWSpinLock spinLock_;
1554 unsigned int connectionAccepted_{0};
1555 unsigned int connectionAcceptedError_{0};
1556 unsigned int connectionDropped_{0};
1557 unsigned int connectionEnqueuedForAcceptCallback_{0};
1558 unsigned int connectionDequeuedByAcceptCallback_{0};
1559 unsigned int backoffStarted_{0};
1560 unsigned int backoffEnded_{0};
1561 unsigned int backoffError_{0};
1565 * Helper AcceptCallback class for the test code
1566 * It records the callbacks that were invoked, and also supports calling
1567 * generic std::function objects in each callback.
1569 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1578 EventInfo(int fd, const folly::SocketAddress& addr)
1579 : type(TYPE_ACCEPT),
1583 explicit EventInfo(const std::string& msg)
1588 explicit EventInfo(EventType et)
1595 int fd; // valid for TYPE_ACCEPT
1596 folly::SocketAddress address; // valid for TYPE_ACCEPT
1597 string errorMsg; // valid for TYPE_ERROR
1599 typedef std::deque<EventInfo> EventList;
1601 TestAcceptCallback()
1602 : connectionAcceptedFn_(),
1607 std::deque<EventInfo>* getEvents() {
1611 void setConnectionAcceptedFn(
1612 const std::function<void(int, const folly::SocketAddress&)>& fn) {
1613 connectionAcceptedFn_ = fn;
1615 void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1616 acceptErrorFn_ = fn;
1618 void setAcceptStartedFn(const std::function<void()>& fn) {
1619 acceptStartedFn_ = fn;
1621 void setAcceptStoppedFn(const std::function<void()>& fn) {
1622 acceptStoppedFn_ = fn;
1625 void connectionAccepted(
1626 int fd, const folly::SocketAddress& clientAddr) noexcept override {
1627 events_.emplace_back(fd, clientAddr);
1629 if (connectionAcceptedFn_) {
1630 connectionAcceptedFn_(fd, clientAddr);
1633 void acceptError(const std::exception& ex) noexcept override {
1634 events_.emplace_back(ex.what());
1636 if (acceptErrorFn_) {
1640 void acceptStarted() noexcept override {
1641 events_.emplace_back(TYPE_START);
1643 if (acceptStartedFn_) {
1647 void acceptStopped() noexcept override {
1648 events_.emplace_back(TYPE_STOP);
1650 if (acceptStoppedFn_) {
1656 std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1657 std::function<void(const std::exception&)> acceptErrorFn_;
1658 std::function<void()> acceptStartedFn_;
1659 std::function<void()> acceptStoppedFn_;
1661 std::deque<EventInfo> events_;
1666 * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1668 TEST(AsyncSocketTest, ServerAcceptOptions) {
1669 EventBase eventBase;
1671 // Create a server socket
1672 std::shared_ptr<AsyncServerSocket> serverSocket(
1673 AsyncServerSocket::newSocket(&eventBase));
1674 serverSocket->bind(0);
1675 serverSocket->listen(16);
1676 folly::SocketAddress serverAddress;
1677 serverSocket->getAddress(&serverAddress);
1679 // Add a callback to accept one connection then stop the loop
1680 TestAcceptCallback acceptCallback;
1681 acceptCallback.setConnectionAcceptedFn(
1682 [&](int fd, const folly::SocketAddress& addr) {
1683 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1685 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1686 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1688 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1689 serverSocket->startAccepting();
1691 // Connect to the server socket
1692 std::shared_ptr<AsyncSocket> socket(
1693 AsyncSocket::newSocket(&eventBase, serverAddress));
1697 // Verify that the server accepted a connection
1698 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1699 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1700 TestAcceptCallback::TYPE_START);
1701 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1702 TestAcceptCallback::TYPE_ACCEPT);
1703 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1704 TestAcceptCallback::TYPE_STOP);
1705 int fd = acceptCallback.getEvents()->at(1).fd;
1707 // The accepted connection should already be in non-blocking mode
1708 int flags = fcntl(fd, F_GETFL, 0);
1709 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1712 // The accepted connection should already have TCP_NODELAY set
1714 socklen_t valueLength = sizeof(value);
1715 int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1722 * Test AsyncServerSocket::removeAcceptCallback()
1724 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1725 // Create a new AsyncServerSocket
1726 EventBase eventBase;
1727 std::shared_ptr<AsyncServerSocket> serverSocket(
1728 AsyncServerSocket::newSocket(&eventBase));
1729 serverSocket->bind(0);
1730 serverSocket->listen(16);
1731 folly::SocketAddress serverAddress;
1732 serverSocket->getAddress(&serverAddress);
1734 // Add several accept callbacks
1735 TestAcceptCallback cb1;
1736 TestAcceptCallback cb2;
1737 TestAcceptCallback cb3;
1738 TestAcceptCallback cb4;
1739 TestAcceptCallback cb5;
1740 TestAcceptCallback cb6;
1741 TestAcceptCallback cb7;
1743 // Test having callbacks remove other callbacks before them on the list,
1744 // after them on the list, or removing themselves.
1746 // Have callback 2 remove callback 3 and callback 5 the first time it is
1749 cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1750 std::shared_ptr<AsyncSocket> sock2(
1751 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1753 cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1755 cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1756 std::shared_ptr<AsyncSocket> sock3(
1757 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1759 cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1760 std::shared_ptr<AsyncSocket> sock5(
1761 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1764 cb2.setConnectionAcceptedFn(
1765 [&](int fd, const folly::SocketAddress& addr) {
1766 if (cb2Count == 0) {
1767 serverSocket->removeAcceptCallback(&cb3, nullptr);
1768 serverSocket->removeAcceptCallback(&cb5, nullptr);
1772 // Have callback 6 remove callback 4 the first time it is called,
1773 // and destroy the server socket the second time it is called
1775 cb6.setConnectionAcceptedFn(
1776 [&](int fd, const folly::SocketAddress& addr) {
1777 if (cb6Count == 0) {
1778 serverSocket->removeAcceptCallback(&cb4, nullptr);
1779 std::shared_ptr<AsyncSocket> sock6(
1780 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1781 std::shared_ptr<AsyncSocket> sock7(
1782 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1783 std::shared_ptr<AsyncSocket> sock8(
1784 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1787 serverSocket.reset();
1791 // Have callback 7 remove itself
1792 cb7.setConnectionAcceptedFn(
1793 [&](int fd, const folly::SocketAddress& addr) {
1794 serverSocket->removeAcceptCallback(&cb7, nullptr);
1797 serverSocket->addAcceptCallback(&cb1, nullptr);
1798 serverSocket->addAcceptCallback(&cb2, nullptr);
1799 serverSocket->addAcceptCallback(&cb3, nullptr);
1800 serverSocket->addAcceptCallback(&cb4, nullptr);
1801 serverSocket->addAcceptCallback(&cb5, nullptr);
1802 serverSocket->addAcceptCallback(&cb6, nullptr);
1803 serverSocket->addAcceptCallback(&cb7, nullptr);
1804 serverSocket->startAccepting();
1806 // Make several connections to the socket
1807 std::shared_ptr<AsyncSocket> sock1(
1808 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1809 std::shared_ptr<AsyncSocket> sock4(
1810 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1812 // Loop until we are stopped
1815 // Check to make sure that the expected callbacks were invoked.
1817 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1818 // the AcceptCallbacks in round-robin fashion, in the order that they were
1819 // added. The code is implemented this way right now, but the API doesn't
1820 // explicitly require it be done this way. If we change the code not to be
1821 // exactly round robin in the future, we can simplify the test checks here.
1822 // (We'll also need to update the termination code, since we expect cb6 to
1823 // get called twice to terminate the loop.)
1824 CHECK_EQ(cb1.getEvents()->size(), 4);
1825 CHECK_EQ(cb1.getEvents()->at(0).type,
1826 TestAcceptCallback::TYPE_START);
1827 CHECK_EQ(cb1.getEvents()->at(1).type,
1828 TestAcceptCallback::TYPE_ACCEPT);
1829 CHECK_EQ(cb1.getEvents()->at(2).type,
1830 TestAcceptCallback::TYPE_ACCEPT);
1831 CHECK_EQ(cb1.getEvents()->at(3).type,
1832 TestAcceptCallback::TYPE_STOP);
1834 CHECK_EQ(cb2.getEvents()->size(), 4);
1835 CHECK_EQ(cb2.getEvents()->at(0).type,
1836 TestAcceptCallback::TYPE_START);
1837 CHECK_EQ(cb2.getEvents()->at(1).type,
1838 TestAcceptCallback::TYPE_ACCEPT);
1839 CHECK_EQ(cb2.getEvents()->at(2).type,
1840 TestAcceptCallback::TYPE_ACCEPT);
1841 CHECK_EQ(cb2.getEvents()->at(3).type,
1842 TestAcceptCallback::TYPE_STOP);
1844 CHECK_EQ(cb3.getEvents()->size(), 2);
1845 CHECK_EQ(cb3.getEvents()->at(0).type,
1846 TestAcceptCallback::TYPE_START);
1847 CHECK_EQ(cb3.getEvents()->at(1).type,
1848 TestAcceptCallback::TYPE_STOP);
1850 CHECK_EQ(cb4.getEvents()->size(), 3);
1851 CHECK_EQ(cb4.getEvents()->at(0).type,
1852 TestAcceptCallback::TYPE_START);
1853 CHECK_EQ(cb4.getEvents()->at(1).type,
1854 TestAcceptCallback::TYPE_ACCEPT);
1855 CHECK_EQ(cb4.getEvents()->at(2).type,
1856 TestAcceptCallback::TYPE_STOP);
1858 CHECK_EQ(cb5.getEvents()->size(), 2);
1859 CHECK_EQ(cb5.getEvents()->at(0).type,
1860 TestAcceptCallback::TYPE_START);
1861 CHECK_EQ(cb5.getEvents()->at(1).type,
1862 TestAcceptCallback::TYPE_STOP);
1864 CHECK_EQ(cb6.getEvents()->size(), 4);
1865 CHECK_EQ(cb6.getEvents()->at(0).type,
1866 TestAcceptCallback::TYPE_START);
1867 CHECK_EQ(cb6.getEvents()->at(1).type,
1868 TestAcceptCallback::TYPE_ACCEPT);
1869 CHECK_EQ(cb6.getEvents()->at(2).type,
1870 TestAcceptCallback::TYPE_ACCEPT);
1871 CHECK_EQ(cb6.getEvents()->at(3).type,
1872 TestAcceptCallback::TYPE_STOP);
1874 CHECK_EQ(cb7.getEvents()->size(), 3);
1875 CHECK_EQ(cb7.getEvents()->at(0).type,
1876 TestAcceptCallback::TYPE_START);
1877 CHECK_EQ(cb7.getEvents()->at(1).type,
1878 TestAcceptCallback::TYPE_ACCEPT);
1879 CHECK_EQ(cb7.getEvents()->at(2).type,
1880 TestAcceptCallback::TYPE_STOP);
1884 * Test AsyncServerSocket::removeAcceptCallback()
1886 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1887 // Create a new AsyncServerSocket
1888 EventBase eventBase;
1889 std::shared_ptr<AsyncServerSocket> serverSocket(
1890 AsyncServerSocket::newSocket(&eventBase));
1891 serverSocket->bind(0);
1892 serverSocket->listen(16);
1893 folly::SocketAddress serverAddress;
1894 serverSocket->getAddress(&serverAddress);
1896 // Add several accept callbacks
1897 TestAcceptCallback cb1;
1898 auto thread_id = pthread_self();
1899 cb1.setAcceptStartedFn([&](){
1900 CHECK_NE(thread_id, pthread_self());
1901 thread_id = pthread_self();
1903 cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1904 CHECK_EQ(thread_id, pthread_self());
1905 serverSocket->removeAcceptCallback(&cb1, nullptr);
1907 cb1.setAcceptStoppedFn([&](){
1908 CHECK_EQ(thread_id, pthread_self());
1911 // Test having callbacks remove other callbacks before them on the list,
1912 serverSocket->addAcceptCallback(&cb1, nullptr);
1913 serverSocket->startAccepting();
1915 // Make several connections to the socket
1916 std::shared_ptr<AsyncSocket> sock1(
1917 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1919 // Loop in another thread
1920 auto other = std::thread([&](){
1925 // Check to make sure that the expected callbacks were invoked.
1927 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1928 // the AcceptCallbacks in round-robin fashion, in the order that they were
1929 // added. The code is implemented this way right now, but the API doesn't
1930 // explicitly require it be done this way. If we change the code not to be
1931 // exactly round robin in the future, we can simplify the test checks here.
1932 // (We'll also need to update the termination code, since we expect cb6 to
1933 // get called twice to terminate the loop.)
1934 CHECK_EQ(cb1.getEvents()->size(), 3);
1935 CHECK_EQ(cb1.getEvents()->at(0).type,
1936 TestAcceptCallback::TYPE_START);
1937 CHECK_EQ(cb1.getEvents()->at(1).type,
1938 TestAcceptCallback::TYPE_ACCEPT);
1939 CHECK_EQ(cb1.getEvents()->at(2).type,
1940 TestAcceptCallback::TYPE_STOP);
1944 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1945 // Add a callback to accept one connection then stop accepting
1946 TestAcceptCallback acceptCallback;
1947 acceptCallback.setConnectionAcceptedFn(
1948 [&](int fd, const folly::SocketAddress& addr) {
1949 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1951 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1952 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1954 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1955 serverSocket->startAccepting();
1957 // Connect to the server socket
1958 EventBase* eventBase = serverSocket->getEventBase();
1959 folly::SocketAddress serverAddress;
1960 serverSocket->getAddress(&serverAddress);
1961 AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1963 // Loop to process all events
1966 // Verify that the server accepted a connection
1967 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1968 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1969 TestAcceptCallback::TYPE_START);
1970 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1971 TestAcceptCallback::TYPE_ACCEPT);
1972 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1973 TestAcceptCallback::TYPE_STOP);
1976 /* Verify that we don't leak sockets if we are destroyed()
1977 * and there are still writes pending
1979 * If destroy() only calls close() instead of closeNow(),
1980 * it would shutdown(writes) on the socket, but it would
1981 * never be close()'d, and the socket would leak
1983 TEST(AsyncSocketTest, DestroyCloseTest) {
1989 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1991 socket->connect(&ccb, server.getAddress(), 30);
1993 // Accept the connection
1994 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1996 acceptedSocket->setReadCB(&rcb);
1998 // Write a large buffer to the socket that is larger than kernel buffer
1999 size_t simpleBufLength = 5000000;
2000 char* simpleBuf = new char[simpleBufLength];
2001 memset(simpleBuf, 'a', simpleBufLength);
2004 // Let the reads and writes run to completion
2005 int fd = acceptedSocket->getFd();
2007 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
2009 acceptedSocket.reset();
2011 // Test that server socket was closed
2012 ssize_t sz = read(fd, simpleBuf, simpleBufLength);
2019 * Test AsyncServerSocket::useExistingSocket()
2021 TEST(AsyncSocketTest, ServerExistingSocket) {
2022 EventBase eventBase;
2024 // Test creating a socket, and letting AsyncServerSocket bind and listen
2026 // Manually create a socket
2027 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2030 // Create a server socket
2031 AsyncServerSocket::UniquePtr serverSocket(
2032 new AsyncServerSocket(&eventBase));
2033 serverSocket->useExistingSocket(fd);
2034 folly::SocketAddress address;
2035 serverSocket->getAddress(&address);
2037 serverSocket->bind(address);
2038 serverSocket->listen(16);
2040 // Make sure the socket works
2041 serverSocketSanityTest(serverSocket.get());
2044 // Test creating a socket and binding manually,
2045 // then letting AsyncServerSocket listen
2047 // Manually create a socket
2048 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2051 struct sockaddr_in addr;
2052 addr.sin_family = AF_INET;
2054 addr.sin_addr.s_addr = INADDR_ANY;
2055 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2057 // Look up the address that we bound to
2058 folly::SocketAddress boundAddress;
2059 boundAddress.setFromLocalAddress(fd);
2061 // Create a server socket
2062 AsyncServerSocket::UniquePtr serverSocket(
2063 new AsyncServerSocket(&eventBase));
2064 serverSocket->useExistingSocket(fd);
2065 serverSocket->listen(16);
2067 // Make sure AsyncServerSocket reports the same address that we bound to
2068 folly::SocketAddress serverSocketAddress;
2069 serverSocket->getAddress(&serverSocketAddress);
2070 CHECK_EQ(boundAddress, serverSocketAddress);
2072 // Make sure the socket works
2073 serverSocketSanityTest(serverSocket.get());
2076 // Test creating a socket, binding and listening manually,
2077 // then giving it to AsyncServerSocket
2079 // Manually create a socket
2080 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
2083 struct sockaddr_in addr;
2084 addr.sin_family = AF_INET;
2086 addr.sin_addr.s_addr = INADDR_ANY;
2087 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
2089 // Look up the address that we bound to
2090 folly::SocketAddress boundAddress;
2091 boundAddress.setFromLocalAddress(fd);
2093 CHECK_EQ(listen(fd, 16), 0);
2095 // Create a server socket
2096 AsyncServerSocket::UniquePtr serverSocket(
2097 new AsyncServerSocket(&eventBase));
2098 serverSocket->useExistingSocket(fd);
2100 // Make sure AsyncServerSocket reports the same address that we bound to
2101 folly::SocketAddress serverSocketAddress;
2102 serverSocket->getAddress(&serverSocketAddress);
2103 CHECK_EQ(boundAddress, serverSocketAddress);
2105 // Make sure the socket works
2106 serverSocketSanityTest(serverSocket.get());
2110 TEST(AsyncSocketTest, UnixDomainSocketTest) {
2111 EventBase eventBase;
2113 // Create a server socket
2114 std::shared_ptr<AsyncServerSocket> serverSocket(
2115 AsyncServerSocket::newSocket(&eventBase));
2117 path.append("/anonymous");
2118 folly::SocketAddress serverAddress;
2119 serverAddress.setFromPath(path);
2120 serverSocket->bind(serverAddress);
2121 serverSocket->listen(16);
2123 // Add a callback to accept one connection then stop the loop
2124 TestAcceptCallback acceptCallback;
2125 acceptCallback.setConnectionAcceptedFn(
2126 [&](int fd, const folly::SocketAddress& addr) {
2127 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2129 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
2130 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2132 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2133 serverSocket->startAccepting();
2135 // Connect to the server socket
2136 std::shared_ptr<AsyncSocket> socket(
2137 AsyncSocket::newSocket(&eventBase, serverAddress));
2141 // Verify that the server accepted a connection
2142 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
2143 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
2144 TestAcceptCallback::TYPE_START);
2145 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
2146 TestAcceptCallback::TYPE_ACCEPT);
2147 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
2148 TestAcceptCallback::TYPE_STOP);
2149 int fd = acceptCallback.getEvents()->at(1).fd;
2151 // The accepted connection should already be in non-blocking mode
2152 int flags = fcntl(fd, F_GETFL, 0);
2153 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
2156 TEST(AsyncSocketTest, ConnectionEventCallbackDefault) {
2157 EventBase eventBase;
2158 TestConnectionEventCallback connectionEventCallback;
2160 // Create a server socket
2161 std::shared_ptr<AsyncServerSocket> serverSocket(
2162 AsyncServerSocket::newSocket(&eventBase));
2163 serverSocket->setConnectionEventCallback(&connectionEventCallback);
2164 serverSocket->bind(0);
2165 serverSocket->listen(16);
2166 folly::SocketAddress serverAddress;
2167 serverSocket->getAddress(&serverAddress);
2169 // Add a callback to accept one connection then stop the loop
2170 TestAcceptCallback acceptCallback;
2171 acceptCallback.setConnectionAcceptedFn(
2172 [&](int fd, const folly::SocketAddress& addr) {
2173 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2175 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
2176 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
2178 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
2179 serverSocket->startAccepting();
2181 // Connect to the server socket
2182 std::shared_ptr<AsyncSocket> socket(
2183 AsyncSocket::newSocket(&eventBase, serverAddress));
2187 // Validate the connection event counters
2188 ASSERT_EQ(connectionEventCallback.getConnectionAccepted(), 1);
2189 ASSERT_EQ(connectionEventCallback.getConnectionAcceptedError(), 0);
2190 ASSERT_EQ(connectionEventCallback.getConnectionDropped(), 0);
2192 connectionEventCallback.getConnectionEnqueuedForAcceptCallback(), 1);
2193 ASSERT_EQ(connectionEventCallback.getConnectionDequeuedByAcceptCallback(), 1);
2194 ASSERT_EQ(connectionEventCallback.getBackoffStarted(), 0);
2195 ASSERT_EQ(connectionEventCallback.getBackoffEnded(), 0);
2196 ASSERT_EQ(connectionEventCallback.getBackoffError(), 0);
2200 * Test AsyncServerSocket::getNumPendingMessagesInQueue()
2202 TEST(AsyncSocketTest, NumPendingMessagesInQueue) {
2203 EventBase eventBase;
2205 // Counter of how many connections have been accepted
2208 // Create a server socket
2209 auto serverSocket(AsyncServerSocket::newSocket(&eventBase));
2210 serverSocket->bind(0);
2211 serverSocket->listen(16);
2212 folly::SocketAddress serverAddress;
2213 serverSocket->getAddress(&serverAddress);
2215 // Add a callback to accept connections
2216 TestAcceptCallback acceptCallback;
2217 acceptCallback.setConnectionAcceptedFn(
2218 [&](int fd, const folly::SocketAddress& addr) {
2220 CHECK_EQ(4 - count, serverSocket->getNumPendingMessagesInQueue());
2223 // all messages are processed, remove accept callback
2224 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2227 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
2228 serverSocket->removeAcceptCallback(&acceptCallback, &eventBase);
2230 serverSocket->addAcceptCallback(&acceptCallback, &eventBase);
2231 serverSocket->startAccepting();
2233 // Connect to the server socket, 4 clients, there are 4 connections
2234 auto socket1(AsyncSocket::newSocket(&eventBase, serverAddress));
2235 auto socket2(AsyncSocket::newSocket(&eventBase, serverAddress));
2236 auto socket3(AsyncSocket::newSocket(&eventBase, serverAddress));
2237 auto socket4(AsyncSocket::newSocket(&eventBase, serverAddress));
2242 TEST(AsyncSocketTest, BufferTest) {
2246 AsyncSocket::OptionMap option{{{SOL_SOCKET, SO_SNDBUF}, 128}};
2247 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
2249 socket->connect(&ccb, server.getAddress(), 30, option);
2252 char buf[100 * 1024];
2253 memset(buf, 'c', sizeof(buf));
2256 socket->write(&wcb, buf, sizeof(buf), WriteFlags::NONE, &bcb);
2259 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
2260 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
2262 ASSERT_TRUE(bcb.hasBuffered());
2265 server.verifyConnection(buf, sizeof(buf));
2267 ASSERT_TRUE(socket->isClosedBySelf());
2268 ASSERT_FALSE(socket->isClosedByPeer());