2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 #include <folly/io/async/AsyncServerSocket.h>
17 #include <folly/io/async/AsyncSocket.h>
18 #include <folly/io/async/AsyncTimeout.h>
19 #include <folly/io/async/EventBase.h>
20 #include <folly/SocketAddress.h>
22 #include <folly/io/IOBuf.h>
23 #include <folly/io/async/test/AsyncSocketTest.h>
24 #include <folly/io/async/test/Util.h>
25 #include <folly/test/SocketAddressTestHelper.h>
27 #include <gtest/gtest.h>
28 #include <boost/scoped_array.hpp>
33 #include <sys/types.h>
34 #include <sys/socket.h>
35 #include <netinet/tcp.h>
38 using namespace boost;
45 using std::unique_ptr;
46 using std::chrono::milliseconds;
47 using boost::scoped_array;
49 using namespace folly;
51 class DelayedWrite: public AsyncTimeout {
53 DelayedWrite(const std::shared_ptr<AsyncSocket>& socket,
54 unique_ptr<IOBuf>&& bufs, AsyncTransportWrapper::WriteCallback* wcb,
55 bool cork, bool lastWrite = false):
56 AsyncTimeout(socket->getEventBase()),
58 bufs_(std::move(bufs)),
61 lastWrite_(lastWrite) {}
64 void timeoutExpired() noexcept override {
65 WriteFlags flags = cork_ ? WriteFlags::CORK : WriteFlags::NONE;
66 socket_->writeChain(wcb_, std::move(bufs_), flags);
68 socket_->shutdownWrite();
72 std::shared_ptr<AsyncSocket> socket_;
73 unique_ptr<IOBuf> bufs_;
74 AsyncTransportWrapper::WriteCallback* wcb_;
79 ///////////////////////////////////////////////////////////////////////////
81 ///////////////////////////////////////////////////////////////////////////
84 * Test connecting to a server
86 TEST(AsyncSocketTest, Connect) {
87 // Start listening on a local port
90 // Connect using a AsyncSocket
92 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
94 socket->connect(&cb, server.getAddress(), 30);
98 CHECK_EQ(cb.state, STATE_SUCCEEDED);
102 * Test connecting to a server that isn't listening
104 TEST(AsyncSocketTest, ConnectRefused) {
107 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
109 // Hopefully nothing is actually listening on this address
110 folly::SocketAddress addr("127.0.0.1", 65535);
112 socket->connect(&cb, addr, 30);
116 CHECK_EQ(cb.state, STATE_FAILED);
117 CHECK_EQ(cb.exception.getType(), AsyncSocketException::NOT_OPEN);
121 * Test connection timeout
123 TEST(AsyncSocketTest, ConnectTimeout) {
126 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
128 // Try connecting to server that won't respond.
130 // This depends somewhat on the network where this test is run.
131 // Hopefully this IP will be routable but unresponsive.
132 // (Alternatively, we could try listening on a local raw socket, but that
133 // normally requires root privileges.)
135 SocketAddressTestHelper::isIPv6Enabled() ?
136 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv6 :
137 SocketAddressTestHelper::isIPv4Enabled() ?
138 SocketAddressTestHelper::kGooglePublicDnsAAddrIPv4 :
140 SocketAddress addr(host, 65535);
142 socket->connect(&cb, addr, 1); // also set a ridiculously small timeout
146 CHECK_EQ(cb.state, STATE_FAILED);
147 CHECK_EQ(cb.exception.getType(), AsyncSocketException::TIMED_OUT);
149 // Verify that we can still get the peer address after a timeout.
150 // Use case is if the client was created from a client pool, and we want
151 // to log which peer failed.
152 folly::SocketAddress peer;
153 socket->getPeerAddress(&peer);
154 CHECK_EQ(peer, addr);
158 * Test writing immediately after connecting, without waiting for connect
161 TEST(AsyncSocketTest, ConnectAndWrite) {
166 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
168 socket->connect(&ccb, server.getAddress(), 30);
172 memset(buf, 'a', sizeof(buf));
174 socket->write(&wcb, buf, sizeof(buf));
176 // Loop. We don't bother accepting on the server socket yet.
177 // The kernel should be able to buffer the write request so it can succeed.
180 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
181 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
183 // Make sure the server got a connection and received the data
185 server.verifyConnection(buf, sizeof(buf));
189 * Test connecting using a nullptr connect callback.
191 TEST(AsyncSocketTest, ConnectNullCallback) {
196 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
197 socket->connect(nullptr, server.getAddress(), 30);
199 // write some data, just so we have some way of verifing
200 // that the socket works correctly after connecting
202 memset(buf, 'a', sizeof(buf));
204 socket->write(&wcb, buf, sizeof(buf));
208 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
210 // Make sure the server got a connection and received the data
212 server.verifyConnection(buf, sizeof(buf));
216 * Test calling both write() and close() immediately after connecting, without
217 * waiting for connect to finish.
219 * This exercises the STATE_CONNECTING_CLOSING code.
221 TEST(AsyncSocketTest, ConnectWriteAndClose) {
226 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
228 socket->connect(&ccb, server.getAddress(), 30);
232 memset(buf, 'a', sizeof(buf));
234 socket->write(&wcb, buf, sizeof(buf));
239 // Loop. We don't bother accepting on the server socket yet.
240 // The kernel should be able to buffer the write request so it can succeed.
243 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
244 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
246 // Make sure the server got a connection and received the data
247 server.verifyConnection(buf, sizeof(buf));
251 * Test calling close() immediately after connect()
253 TEST(AsyncSocketTest, ConnectAndClose) {
256 // Connect using a AsyncSocket
258 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
260 socket->connect(&ccb, server.getAddress(), 30);
262 // Hopefully the connect didn't succeed immediately.
263 // If it did, we can't exercise the close-while-connecting code path.
264 if (ccb.state == STATE_SUCCEEDED) {
265 LOG(INFO) << "connect() succeeded immediately; aborting test "
266 "of close-during-connect behavior";
272 // Loop, although there shouldn't be anything to do.
275 // Make sure the connection was aborted
276 CHECK_EQ(ccb.state, STATE_FAILED);
280 * Test calling closeNow() immediately after connect()
282 * This should be identical to the normal close behavior.
284 TEST(AsyncSocketTest, ConnectAndCloseNow) {
287 // Connect using a AsyncSocket
289 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
291 socket->connect(&ccb, server.getAddress(), 30);
293 // Hopefully the connect didn't succeed immediately.
294 // If it did, we can't exercise the close-while-connecting code path.
295 if (ccb.state == STATE_SUCCEEDED) {
296 LOG(INFO) << "connect() succeeded immediately; aborting test "
297 "of closeNow()-during-connect behavior";
303 // Loop, although there shouldn't be anything to do.
306 // Make sure the connection was aborted
307 CHECK_EQ(ccb.state, STATE_FAILED);
311 * Test calling both write() and closeNow() immediately after connecting,
312 * without waiting for connect to finish.
314 * This should abort the pending write.
316 TEST(AsyncSocketTest, ConnectWriteAndCloseNow) {
321 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
323 socket->connect(&ccb, server.getAddress(), 30);
325 // Hopefully the connect didn't succeed immediately.
326 // If it did, we can't exercise the close-while-connecting code path.
327 if (ccb.state == STATE_SUCCEEDED) {
328 LOG(INFO) << "connect() succeeded immediately; aborting test "
329 "of write-during-connect behavior";
335 memset(buf, 'a', sizeof(buf));
337 socket->write(&wcb, buf, sizeof(buf));
342 // Loop, although there shouldn't be anything to do.
345 CHECK_EQ(ccb.state, STATE_FAILED);
346 CHECK_EQ(wcb.state, STATE_FAILED);
350 * Test installing a read callback immediately, before connect() finishes.
352 TEST(AsyncSocketTest, ConnectAndRead) {
357 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
359 socket->connect(&ccb, server.getAddress(), 30);
362 socket->setReadCB(&rcb);
364 // Even though we haven't looped yet, we should be able to accept
365 // the connection and send data to it.
366 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
368 memset(buf, 'a', sizeof(buf));
369 acceptedSocket->write(buf, sizeof(buf));
370 acceptedSocket->flush();
371 acceptedSocket->close();
373 // Loop, although there shouldn't be anything to do.
376 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
377 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
378 CHECK_EQ(rcb.buffers.size(), 1);
379 CHECK_EQ(rcb.buffers[0].length, sizeof(buf));
380 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf, sizeof(buf)), 0);
384 * Test installing a read callback and then closing immediately before the
385 * connect attempt finishes.
387 TEST(AsyncSocketTest, ConnectReadAndClose) {
392 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
394 socket->connect(&ccb, server.getAddress(), 30);
396 // Hopefully the connect didn't succeed immediately.
397 // If it did, we can't exercise the close-while-connecting code path.
398 if (ccb.state == STATE_SUCCEEDED) {
399 LOG(INFO) << "connect() succeeded immediately; aborting test "
400 "of read-during-connect behavior";
405 socket->setReadCB(&rcb);
410 // Loop, although there shouldn't be anything to do.
413 CHECK_EQ(ccb.state, STATE_FAILED); // we aborted the close attempt
414 CHECK_EQ(rcb.buffers.size(), 0);
415 CHECK_EQ(rcb.state, STATE_SUCCEEDED); // this indicates EOF
419 * Test both writing and installing a read callback immediately,
420 * before connect() finishes.
422 TEST(AsyncSocketTest, ConnectWriteAndRead) {
427 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
429 socket->connect(&ccb, server.getAddress(), 30);
433 memset(buf1, 'a', sizeof(buf1));
435 socket->write(&wcb, buf1, sizeof(buf1));
437 // set a read callback
439 socket->setReadCB(&rcb);
441 // Even though we haven't looped yet, we should be able to accept
442 // the connection and send data to it.
443 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
445 memset(buf2, 'b', sizeof(buf2));
446 acceptedSocket->write(buf2, sizeof(buf2));
447 acceptedSocket->flush();
449 // shut down the write half of acceptedSocket, so that the AsyncSocket
450 // will stop reading and we can break out of the event loop.
451 shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
456 // Make sure the connect succeeded
457 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
459 // Make sure the AsyncSocket read the data written by the accepted socket
460 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
461 CHECK_EQ(rcb.buffers.size(), 1);
462 CHECK_EQ(rcb.buffers[0].length, sizeof(buf2));
463 CHECK_EQ(memcmp(rcb.buffers[0].buffer, buf2, sizeof(buf2)), 0);
465 // Close the AsyncSocket so we'll see EOF on acceptedSocket
468 // Make sure the accepted socket saw the data written by the AsyncSocket
469 uint8_t readbuf[sizeof(buf1)];
470 acceptedSocket->readAll(readbuf, sizeof(readbuf));
471 CHECK_EQ(memcmp(buf1, readbuf, sizeof(buf1)), 0);
472 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
473 CHECK_EQ(bytesRead, 0);
477 * Test writing to the socket then shutting down writes before the connect
480 TEST(AsyncSocketTest, ConnectWriteAndShutdownWrite) {
485 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
487 socket->connect(&ccb, server.getAddress(), 30);
489 // Hopefully the connect didn't succeed immediately.
490 // If it did, we can't exercise the write-while-connecting code path.
491 if (ccb.state == STATE_SUCCEEDED) {
492 LOG(INFO) << "connect() succeeded immediately; skipping test";
496 // Ask to write some data
498 memset(wbuf, 'a', sizeof(wbuf));
500 socket->write(&wcb, wbuf, sizeof(wbuf));
501 socket->shutdownWrite();
504 socket->shutdownWrite();
506 // Even though we haven't looped yet, we should be able to accept
508 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
510 // Since the connection is still in progress, there should be no data to
511 // read yet. Verify that the accepted socket is not readable.
512 struct pollfd fds[1];
513 fds[0].fd = acceptedSocket->getSocketFD();
514 fds[0].events = POLLIN;
516 int rc = poll(fds, 1, 0);
519 // Write data to the accepted socket
520 uint8_t acceptedWbuf[192];
521 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
522 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
523 acceptedSocket->flush();
528 // The loop should have completed the connection, written the queued data,
529 // and shutdown writes on the socket.
531 // Check that the connection was completed successfully and that the write
532 // callback succeeded.
533 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
534 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
536 // Check that we can read the data that was written to the socket, and that
537 // we see an EOF, since its socket was half-shutdown.
538 uint8_t readbuf[sizeof(wbuf)];
539 acceptedSocket->readAll(readbuf, sizeof(readbuf));
540 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
541 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
542 CHECK_EQ(bytesRead, 0);
544 // Close the accepted socket. This will cause it to see EOF
545 // and uninstall the read callback when we loop next.
546 acceptedSocket->close();
548 // Install a read callback, then loop again.
550 socket->setReadCB(&rcb);
553 // This loop should have read the data and seen the EOF
554 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
555 CHECK_EQ(rcb.buffers.size(), 1);
556 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
557 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
558 acceptedWbuf, sizeof(acceptedWbuf)), 0);
562 * Test reading, writing, and shutting down writes before the connect attempt
565 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWrite) {
570 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
572 socket->connect(&ccb, server.getAddress(), 30);
574 // Hopefully the connect didn't succeed immediately.
575 // If it did, we can't exercise the write-while-connecting code path.
576 if (ccb.state == STATE_SUCCEEDED) {
577 LOG(INFO) << "connect() succeeded immediately; skipping test";
581 // Install a read callback
583 socket->setReadCB(&rcb);
585 // Ask to write some data
587 memset(wbuf, 'a', sizeof(wbuf));
589 socket->write(&wcb, wbuf, sizeof(wbuf));
592 socket->shutdownWrite();
594 // Even though we haven't looped yet, we should be able to accept
596 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
598 // Since the connection is still in progress, there should be no data to
599 // read yet. Verify that the accepted socket is not readable.
600 struct pollfd fds[1];
601 fds[0].fd = acceptedSocket->getSocketFD();
602 fds[0].events = POLLIN;
604 int rc = poll(fds, 1, 0);
607 // Write data to the accepted socket
608 uint8_t acceptedWbuf[192];
609 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
610 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
611 acceptedSocket->flush();
612 // Shutdown writes to the accepted socket. This will cause it to see EOF
613 // and uninstall the read callback.
614 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
619 // The loop should have completed the connection, written the queued data,
620 // shutdown writes on the socket, read the data we wrote to it, and see the
623 // Check that the connection was completed successfully and that the read
624 // and write callbacks were invoked as expected.
625 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
626 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
627 CHECK_EQ(rcb.buffers.size(), 1);
628 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
629 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
630 acceptedWbuf, sizeof(acceptedWbuf)), 0);
631 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
633 // Check that we can read the data that was written to the socket, and that
634 // we see an EOF, since its socket was half-shutdown.
635 uint8_t readbuf[sizeof(wbuf)];
636 acceptedSocket->readAll(readbuf, sizeof(readbuf));
637 CHECK_EQ(memcmp(wbuf, readbuf, sizeof(wbuf)), 0);
638 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
639 CHECK_EQ(bytesRead, 0);
641 // Fully close both sockets
642 acceptedSocket->close();
647 * Test reading, writing, and calling shutdownWriteNow() before the
648 * connect attempt finishes.
650 TEST(AsyncSocketTest, ConnectReadWriteAndShutdownWriteNow) {
655 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
657 socket->connect(&ccb, server.getAddress(), 30);
659 // Hopefully the connect didn't succeed immediately.
660 // If it did, we can't exercise the write-while-connecting code path.
661 if (ccb.state == STATE_SUCCEEDED) {
662 LOG(INFO) << "connect() succeeded immediately; skipping test";
666 // Install a read callback
668 socket->setReadCB(&rcb);
670 // Ask to write some data
672 memset(wbuf, 'a', sizeof(wbuf));
674 socket->write(&wcb, wbuf, sizeof(wbuf));
676 // Shutdown writes immediately.
677 // This should immediately discard the data that we just tried to write.
678 socket->shutdownWriteNow();
680 // Verify that writeError() was invoked on the write callback.
681 CHECK_EQ(wcb.state, STATE_FAILED);
682 CHECK_EQ(wcb.bytesWritten, 0);
684 // Even though we haven't looped yet, we should be able to accept
686 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
688 // Since the connection is still in progress, there should be no data to
689 // read yet. Verify that the accepted socket is not readable.
690 struct pollfd fds[1];
691 fds[0].fd = acceptedSocket->getSocketFD();
692 fds[0].events = POLLIN;
694 int rc = poll(fds, 1, 0);
697 // Write data to the accepted socket
698 uint8_t acceptedWbuf[192];
699 memset(acceptedWbuf, 'b', sizeof(acceptedWbuf));
700 acceptedSocket->write(acceptedWbuf, sizeof(acceptedWbuf));
701 acceptedSocket->flush();
702 // Shutdown writes to the accepted socket. This will cause it to see EOF
703 // and uninstall the read callback.
704 ::shutdown(acceptedSocket->getSocketFD(), SHUT_WR);
709 // The loop should have completed the connection, written the queued data,
710 // shutdown writes on the socket, read the data we wrote to it, and see the
713 // Check that the connection was completed successfully and that the read
714 // callback was invoked as expected.
715 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
716 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
717 CHECK_EQ(rcb.buffers.size(), 1);
718 CHECK_EQ(rcb.buffers[0].length, sizeof(acceptedWbuf));
719 CHECK_EQ(memcmp(rcb.buffers[0].buffer,
720 acceptedWbuf, sizeof(acceptedWbuf)), 0);
722 // Since we used shutdownWriteNow(), it should have discarded all pending
723 // write data. Verify we see an immediate EOF when reading from the accepted
725 uint8_t readbuf[sizeof(wbuf)];
726 uint32_t bytesRead = acceptedSocket->read(readbuf, sizeof(readbuf));
727 CHECK_EQ(bytesRead, 0);
729 // Fully close both sockets
730 acceptedSocket->close();
734 // Helper function for use in testConnectOptWrite()
735 // Temporarily disable the read callback
736 void tmpDisableReads(AsyncSocket* socket, ReadCallback* rcb) {
737 // Uninstall the read callback
738 socket->setReadCB(nullptr);
739 // Schedule the read callback to be reinstalled after 1ms
740 socket->getEventBase()->runInLoop(
741 std::bind(&AsyncSocket::setReadCB, socket, rcb));
745 * Test connect+write, then have the connect callback perform another write.
747 * This tests interaction of the optimistic writing after connect with
748 * additional write attempts that occur in the connect callback.
750 void testConnectOptWrite(size_t size1, size_t size2, bool close = false) {
753 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
757 socket->connect(&ccb, server.getAddress(), 30);
759 // Hopefully the connect didn't succeed immediately.
760 // If it did, we can't exercise the optimistic write code path.
761 if (ccb.state == STATE_SUCCEEDED) {
762 LOG(INFO) << "connect() succeeded immediately; aborting test "
763 "of optimistic write behavior";
767 // Tell the connect callback to perform a write when the connect succeeds
769 scoped_array<char> buf2(new char[size2]);
770 memset(buf2.get(), 'b', size2);
772 ccb.successCallback = [&] { socket->write(&wcb2, buf2.get(), size2); };
773 // Tell the second write callback to close the connection when it is done
774 wcb2.successCallback = [&] { socket->closeNow(); };
777 // Schedule one write() immediately, before the connect finishes
778 scoped_array<char> buf1(new char[size1]);
779 memset(buf1.get(), 'a', size1);
782 socket->write(&wcb1, buf1.get(), size1);
786 // immediately perform a close, before connect() completes
790 // Start reading from the other endpoint after 10ms.
791 // If we're using large buffers, we have to read so that the writes don't
793 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
795 rcb.dataAvailableCallback = std::bind(tmpDisableReads,
796 acceptedSocket.get(), &rcb);
797 socket->getEventBase()->tryRunAfterDelay(
798 std::bind(&AsyncSocket::setReadCB, acceptedSocket.get(), &rcb),
801 // Loop. We don't bother accepting on the server socket yet.
802 // The kernel should be able to buffer the write request so it can succeed.
805 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
807 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
810 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
815 // Make sure the read callback received all of the data
816 size_t bytesRead = 0;
817 for (vector<ReadCallback::Buffer>::const_iterator it = rcb.buffers.begin();
818 it != rcb.buffers.end();
820 size_t start = bytesRead;
821 bytesRead += it->length;
822 size_t end = bytesRead;
824 size_t cmpLen = min(size1, end) - start;
825 CHECK_EQ(memcmp(it->buffer, buf1.get() + start, cmpLen), 0);
827 if (end > size1 && end <= size1 + size2) {
831 if (start >= size1) {
833 buf2Offset = start - size1;
834 cmpLen = end - start;
836 itOffset = size1 - start;
838 cmpLen = end - size1;
840 CHECK_EQ(memcmp(it->buffer + itOffset, buf2.get() + buf2Offset,
845 CHECK_EQ(bytesRead, size1 + size2);
848 TEST(AsyncSocketTest, ConnectCallbackWrite) {
849 // Test using small writes that should both succeed immediately
850 testConnectOptWrite(100, 200);
852 // Test using a large buffer in the connect callback, that should block
853 const size_t largeSize = 8*1024*1024;
854 testConnectOptWrite(100, largeSize);
856 // Test using a large initial write
857 testConnectOptWrite(largeSize, 100);
859 // Test using two large buffers
860 testConnectOptWrite(largeSize, largeSize);
862 // Test a small write in the connect callback,
863 // but no immediate write before connect completes
864 testConnectOptWrite(0, 64);
866 // Test a large write in the connect callback,
867 // but no immediate write before connect completes
868 testConnectOptWrite(0, largeSize);
870 // Test connect, a small write, then immediately call close() before connect
872 testConnectOptWrite(211, 0, true);
874 // Test connect, a large immediate write (that will block), then immediately
875 // call close() before connect completes
876 testConnectOptWrite(largeSize, 0, true);
879 ///////////////////////////////////////////////////////////////////////////
880 // write() related tests
881 ///////////////////////////////////////////////////////////////////////////
884 * Test writing using a nullptr callback
886 TEST(AsyncSocketTest, WriteNullCallback) {
891 std::shared_ptr<AsyncSocket> socket =
892 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
893 evb.loop(); // loop until the socket is connected
895 // write() with a nullptr callback
897 memset(buf, 'a', sizeof(buf));
898 socket->write(nullptr, buf, sizeof(buf));
900 evb.loop(); // loop until the data is sent
902 // Make sure the server got a connection and received the data
904 server.verifyConnection(buf, sizeof(buf));
908 * Test writing with a send timeout
910 TEST(AsyncSocketTest, WriteTimeout) {
915 std::shared_ptr<AsyncSocket> socket =
916 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
917 evb.loop(); // loop until the socket is connected
919 // write() a large chunk of data, with no-one on the other end reading
920 size_t writeLength = 8*1024*1024;
921 uint32_t timeout = 200;
922 socket->setSendTimeout(timeout);
923 scoped_array<char> buf(new char[writeLength]);
924 memset(buf.get(), 'a', writeLength);
926 socket->write(&wcb, buf.get(), writeLength);
932 // Make sure the write attempt timed out as requested
933 CHECK_EQ(wcb.state, STATE_FAILED);
934 CHECK_EQ(wcb.exception.getType(), AsyncSocketException::TIMED_OUT);
936 // Check that the write timed out within a reasonable period of time.
937 // We don't check for exactly the specified timeout, since AsyncSocket only
938 // times out when it hasn't made progress for that period of time.
940 // On linux, the first write sends a few hundred kb of data, then blocks for
941 // writability, and then unblocks again after 40ms and is able to write
942 // another smaller of data before blocking permanently. Therefore it doesn't
943 // time out until 40ms + timeout.
945 // I haven't fully verified the cause of this, but I believe it probably
946 // occurs because the receiving end delays sending an ack for up to 40ms.
947 // (This is the default value for TCP_DELACK_MIN.) Once the sender receives
948 // the ack, it can send some more data. However, after that point the
949 // receiver's kernel buffer is full. This 40ms delay happens even with
950 // TCP_NODELAY and TCP_QUICKACK enabled on both endpoints. However, the
951 // kernel may be automatically disabling TCP_QUICKACK after receiving some
954 // For now, we simply check that the timeout occurred within 160ms of
955 // the requested value.
956 T_CHECK_TIMEOUT(start, end, milliseconds(timeout), milliseconds(160));
960 * Test writing to a socket that the remote endpoint has closed
962 TEST(AsyncSocketTest, WritePipeError) {
967 std::shared_ptr<AsyncSocket> socket =
968 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
969 socket->setSendTimeout(1000);
970 evb.loop(); // loop until the socket is connected
972 // accept and immediately close the socket
973 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
974 acceptedSocket.reset();
976 // write() a large chunk of data
977 size_t writeLength = 8*1024*1024;
978 scoped_array<char> buf(new char[writeLength]);
979 memset(buf.get(), 'a', writeLength);
981 socket->write(&wcb, buf.get(), writeLength);
985 // Make sure the write failed.
986 // It would be nice if AsyncSocketException could convey the errno value,
987 // so that we could check for EPIPE
988 CHECK_EQ(wcb.state, STATE_FAILED);
989 CHECK_EQ(wcb.exception.getType(),
990 AsyncSocketException::INTERNAL_ERROR);
994 * Test writing a mix of simple buffers and IOBufs
996 TEST(AsyncSocketTest, WriteIOBuf) {
1001 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1003 socket->connect(&ccb, server.getAddress(), 30);
1005 // Accept the connection
1006 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1008 acceptedSocket->setReadCB(&rcb);
1010 // Write a simple buffer to the socket
1011 size_t simpleBufLength = 5;
1012 char simpleBuf[simpleBufLength];
1013 memset(simpleBuf, 'a', simpleBufLength);
1015 socket->write(&wcb, simpleBuf, simpleBufLength);
1017 // Write a single-element IOBuf chain
1018 size_t buf1Length = 7;
1019 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1020 memset(buf1->writableData(), 'b', buf1Length);
1021 buf1->append(buf1Length);
1022 unique_ptr<IOBuf> buf1Copy(buf1->clone());
1024 socket->writeChain(&wcb2, std::move(buf1));
1026 // Write a multiple-element IOBuf chain
1027 size_t buf2Length = 11;
1028 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1029 memset(buf2->writableData(), 'c', buf2Length);
1030 buf2->append(buf2Length);
1031 size_t buf3Length = 13;
1032 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1033 memset(buf3->writableData(), 'd', buf3Length);
1034 buf3->append(buf3Length);
1035 buf2->appendChain(std::move(buf3));
1036 unique_ptr<IOBuf> buf2Copy(buf2->clone());
1037 buf2Copy->coalesce();
1039 socket->writeChain(&wcb3, std::move(buf2));
1040 socket->shutdownWrite();
1042 // Let the reads and writes run to completion
1045 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1046 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1047 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1049 // Make sure the reader got the right data in the right order
1050 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1051 CHECK_EQ(rcb.buffers.size(), 1);
1052 CHECK_EQ(rcb.buffers[0].length,
1053 simpleBufLength + buf1Length + buf2Length + buf3Length);
1055 memcmp(rcb.buffers[0].buffer, simpleBuf, simpleBufLength), 0);
1057 memcmp(rcb.buffers[0].buffer + simpleBufLength,
1058 buf1Copy->data(), buf1Copy->length()), 0);
1060 memcmp(rcb.buffers[0].buffer + simpleBufLength + buf1Length,
1061 buf2Copy->data(), buf2Copy->length()), 0);
1063 acceptedSocket->close();
1067 TEST(AsyncSocketTest, WriteIOBufCorked) {
1072 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1074 socket->connect(&ccb, server.getAddress(), 30);
1076 // Accept the connection
1077 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&evb);
1079 acceptedSocket->setReadCB(&rcb);
1081 // Do three writes, 100ms apart, with the "cork" flag set
1082 // on the second write. The reader should see the first write
1083 // arrive by itself, followed by the second and third writes
1084 // arriving together.
1085 size_t buf1Length = 5;
1086 unique_ptr<IOBuf> buf1(IOBuf::create(buf1Length));
1087 memset(buf1->writableData(), 'a', buf1Length);
1088 buf1->append(buf1Length);
1089 size_t buf2Length = 7;
1090 unique_ptr<IOBuf> buf2(IOBuf::create(buf2Length));
1091 memset(buf2->writableData(), 'b', buf2Length);
1092 buf2->append(buf2Length);
1093 size_t buf3Length = 11;
1094 unique_ptr<IOBuf> buf3(IOBuf::create(buf3Length));
1095 memset(buf3->writableData(), 'c', buf3Length);
1096 buf3->append(buf3Length);
1098 socket->writeChain(&wcb1, std::move(buf1));
1100 DelayedWrite write2(socket, std::move(buf2), &wcb2, true);
1101 write2.scheduleTimeout(100);
1103 DelayedWrite write3(socket, std::move(buf3), &wcb3, false, true);
1104 write3.scheduleTimeout(200);
1107 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1108 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1109 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1110 if (wcb3.state != STATE_SUCCEEDED) {
1111 throw(wcb3.exception);
1113 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1115 // Make sure the reader got the data with the right grouping
1116 CHECK_EQ(rcb.state, STATE_SUCCEEDED);
1117 CHECK_EQ(rcb.buffers.size(), 2);
1118 CHECK_EQ(rcb.buffers[0].length, buf1Length);
1119 CHECK_EQ(rcb.buffers[1].length, buf2Length + buf3Length);
1121 acceptedSocket->close();
1126 * Test performing a zero-length write
1128 TEST(AsyncSocketTest, ZeroLengthWrite) {
1133 std::shared_ptr<AsyncSocket> socket =
1134 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1135 evb.loop(); // loop until the socket is connected
1137 auto acceptedSocket = server.acceptAsync(&evb);
1139 acceptedSocket->setReadCB(&rcb);
1141 size_t len1 = 1024*1024;
1142 size_t len2 = 1024*1024;
1143 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1144 memset(buf.get(), 'a', len1);
1145 memset(buf.get(), 'b', len2);
1151 socket->write(&wcb1, buf.get(), 0);
1152 socket->write(&wcb2, buf.get(), len1);
1153 socket->write(&wcb3, buf.get() + len1, 0);
1154 socket->write(&wcb4, buf.get() + len1, len2);
1157 evb.loop(); // loop until the data is sent
1159 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1160 CHECK_EQ(wcb2.state, STATE_SUCCEEDED);
1161 CHECK_EQ(wcb3.state, STATE_SUCCEEDED);
1162 CHECK_EQ(wcb4.state, STATE_SUCCEEDED);
1163 rcb.verifyData(buf.get(), len1 + len2);
1166 TEST(AsyncSocketTest, ZeroLengthWritev) {
1171 std::shared_ptr<AsyncSocket> socket =
1172 AsyncSocket::newSocket(&evb, server.getAddress(), 30);
1173 evb.loop(); // loop until the socket is connected
1175 auto acceptedSocket = server.acceptAsync(&evb);
1177 acceptedSocket->setReadCB(&rcb);
1179 size_t len1 = 1024*1024;
1180 size_t len2 = 1024*1024;
1181 std::unique_ptr<char[]> buf(new char[len1 + len2]);
1182 memset(buf.get(), 'a', len1);
1183 memset(buf.get(), 'b', len2);
1186 size_t iovCount = 4;
1187 struct iovec iov[iovCount];
1188 iov[0].iov_base = buf.get();
1189 iov[0].iov_len = len1;
1190 iov[1].iov_base = buf.get() + len1;
1192 iov[2].iov_base = buf.get() + len1;
1193 iov[2].iov_len = len2;
1194 iov[3].iov_base = buf.get() + len1 + len2;
1197 socket->writev(&wcb, iov, iovCount);
1199 evb.loop(); // loop until the data is sent
1201 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1202 rcb.verifyData(buf.get(), len1 + len2);
1205 ///////////////////////////////////////////////////////////////////////////
1206 // close() related tests
1207 ///////////////////////////////////////////////////////////////////////////
1210 * Test calling close() with pending writes when the socket is already closing.
1212 TEST(AsyncSocketTest, ClosePendingWritesWhileClosing) {
1217 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
1219 socket->connect(&ccb, server.getAddress(), 30);
1221 // accept the socket on the server side
1222 std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
1224 // Loop to ensure the connect has completed
1227 // Make sure we are connected
1228 CHECK_EQ(ccb.state, STATE_SUCCEEDED);
1230 // Schedule pending writes, until several write attempts have blocked
1232 memset(buf, 'a', sizeof(buf));
1233 typedef vector< std::shared_ptr<WriteCallback> > WriteCallbackVector;
1234 WriteCallbackVector writeCallbacks;
1236 writeCallbacks.reserve(5);
1237 while (writeCallbacks.size() < 5) {
1238 std::shared_ptr<WriteCallback> wcb(new WriteCallback);
1240 socket->write(wcb.get(), buf, sizeof(buf));
1241 if (wcb->state == STATE_SUCCEEDED) {
1242 // Succeeded immediately. Keep performing more writes
1246 // This write is blocked.
1247 // Have the write callback call close() when writeError() is invoked
1248 wcb->errorCallback = std::bind(&AsyncSocket::close, socket.get());
1249 writeCallbacks.push_back(wcb);
1252 // Call closeNow() to immediately fail the pending writes
1255 // Make sure writeError() was invoked on all of the pending write callbacks
1256 for (WriteCallbackVector::const_iterator it = writeCallbacks.begin();
1257 it != writeCallbacks.end();
1259 CHECK_EQ((*it)->state, STATE_FAILED);
1263 ///////////////////////////////////////////////////////////////////////////
1264 // ImmediateRead related tests
1265 ///////////////////////////////////////////////////////////////////////////
1267 /* AsyncSocket use to verify immediate read works */
1268 class AsyncSocketImmediateRead : public folly::AsyncSocket {
1270 bool immediateReadCalled = false;
1271 explicit AsyncSocketImmediateRead(folly::EventBase* evb) : AsyncSocket(evb) {}
1273 void checkForImmediateRead() noexcept override {
1274 immediateReadCalled = true;
1275 AsyncSocket::handleRead();
1279 TEST(AsyncSocket, ConnectReadImmediateRead) {
1282 const size_t maxBufferSz = 100;
1283 const size_t maxReadsPerEvent = 1;
1284 const size_t expectedDataSz = maxBufferSz * 3;
1285 char expectedData[expectedDataSz];
1286 memset(expectedData, 'j', expectedDataSz);
1289 ReadCallback rcb(maxBufferSz);
1290 AsyncSocketImmediateRead socket(&evb);
1291 socket.connect(nullptr, server.getAddress(), 30);
1293 evb.loop(); // loop until the socket is connected
1295 socket.setReadCB(&rcb);
1296 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1297 socket.immediateReadCalled = false;
1299 auto acceptedSocket = server.acceptAsync(&evb);
1301 ReadCallback rcbServer;
1302 WriteCallback wcbServer;
1303 rcbServer.dataAvailableCallback = [&]() {
1304 if (rcbServer.dataRead() == expectedDataSz) {
1305 // write back all data read
1306 rcbServer.verifyData(expectedData, expectedDataSz);
1307 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1308 acceptedSocket->close();
1311 acceptedSocket->setReadCB(&rcbServer);
1315 socket.write(&wcb1, expectedData, expectedDataSz);
1317 CHECK_EQ(wcb1.state, STATE_SUCCEEDED);
1318 rcb.verifyData(expectedData, expectedDataSz);
1319 CHECK_EQ(socket.immediateReadCalled, true);
1322 TEST(AsyncSocket, ConnectReadUninstallRead) {
1325 const size_t maxBufferSz = 100;
1326 const size_t maxReadsPerEvent = 1;
1327 const size_t expectedDataSz = maxBufferSz * 3;
1328 char expectedData[expectedDataSz];
1329 memset(expectedData, 'k', expectedDataSz);
1332 ReadCallback rcb(maxBufferSz);
1333 AsyncSocketImmediateRead socket(&evb);
1334 socket.connect(nullptr, server.getAddress(), 30);
1336 evb.loop(); // loop until the socket is connected
1338 socket.setReadCB(&rcb);
1339 socket.setMaxReadsPerEvent(maxReadsPerEvent);
1340 socket.immediateReadCalled = false;
1342 auto acceptedSocket = server.acceptAsync(&evb);
1344 ReadCallback rcbServer;
1345 WriteCallback wcbServer;
1346 rcbServer.dataAvailableCallback = [&]() {
1347 if (rcbServer.dataRead() == expectedDataSz) {
1348 // write back all data read
1349 rcbServer.verifyData(expectedData, expectedDataSz);
1350 acceptedSocket->write(&wcbServer, expectedData, expectedDataSz);
1351 acceptedSocket->close();
1354 acceptedSocket->setReadCB(&rcbServer);
1356 rcb.dataAvailableCallback = [&]() {
1357 // we read data and reset readCB
1358 socket.setReadCB(nullptr);
1363 socket.write(&wcb, expectedData, expectedDataSz);
1365 CHECK_EQ(wcb.state, STATE_SUCCEEDED);
1367 /* we shoud've only read maxBufferSz data since readCallback_
1368 * was reset in dataAvailableCallback */
1369 CHECK_EQ(rcb.dataRead(), maxBufferSz);
1370 CHECK_EQ(socket.immediateReadCalled, false);
1374 // - Test connect() and have the connect callback set the read callback
1375 // - Test connect() and have the connect callback unset the read callback
1376 // - Test reading/writing/closing/destroying the socket in the connect callback
1377 // - Test reading/writing/closing/destroying the socket in the read callback
1378 // - Test reading/writing/closing/destroying the socket in the write callback
1379 // - Test one-way shutdown behavior
1380 // - Test changing the EventBase
1382 // - TODO: test multiple threads sharing a AsyncSocket, and detaching from it
1383 // in connectSuccess(), readDataAvailable(), writeSuccess()
1386 ///////////////////////////////////////////////////////////////////////////
1387 // AsyncServerSocket tests
1388 ///////////////////////////////////////////////////////////////////////////
1391 * Helper AcceptCallback class for the test code
1392 * It records the callbacks that were invoked, and also supports calling
1393 * generic std::function objects in each callback.
1395 class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
1404 EventInfo(int fd, const folly::SocketAddress& addr)
1405 : type(TYPE_ACCEPT),
1409 explicit EventInfo(const std::string& msg)
1414 explicit EventInfo(EventType et)
1421 int fd; // valid for TYPE_ACCEPT
1422 folly::SocketAddress address; // valid for TYPE_ACCEPT
1423 string errorMsg; // valid for TYPE_ERROR
1425 typedef std::deque<EventInfo> EventList;
1427 TestAcceptCallback()
1428 : connectionAcceptedFn_(),
1433 std::deque<EventInfo>* getEvents() {
1437 void setConnectionAcceptedFn(
1438 const std::function<void(int, const folly::SocketAddress&)>& fn) {
1439 connectionAcceptedFn_ = fn;
1441 void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
1442 acceptErrorFn_ = fn;
1444 void setAcceptStartedFn(const std::function<void()>& fn) {
1445 acceptStartedFn_ = fn;
1447 void setAcceptStoppedFn(const std::function<void()>& fn) {
1448 acceptStoppedFn_ = fn;
1451 void connectionAccepted(
1452 int fd, const folly::SocketAddress& clientAddr) noexcept override {
1453 events_.emplace_back(fd, clientAddr);
1455 if (connectionAcceptedFn_) {
1456 connectionAcceptedFn_(fd, clientAddr);
1459 void acceptError(const std::exception& ex) noexcept override {
1460 events_.emplace_back(ex.what());
1462 if (acceptErrorFn_) {
1466 void acceptStarted() noexcept override {
1467 events_.emplace_back(TYPE_START);
1469 if (acceptStartedFn_) {
1473 void acceptStopped() noexcept override {
1474 events_.emplace_back(TYPE_STOP);
1476 if (acceptStoppedFn_) {
1482 std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
1483 std::function<void(const std::exception&)> acceptErrorFn_;
1484 std::function<void()> acceptStartedFn_;
1485 std::function<void()> acceptStoppedFn_;
1487 std::deque<EventInfo> events_;
1491 * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
1493 TEST(AsyncSocketTest, ServerAcceptOptions) {
1494 EventBase eventBase;
1496 // Create a server socket
1497 std::shared_ptr<AsyncServerSocket> serverSocket(
1498 AsyncServerSocket::newSocket(&eventBase));
1499 serverSocket->bind(0);
1500 serverSocket->listen(16);
1501 folly::SocketAddress serverAddress;
1502 serverSocket->getAddress(&serverAddress);
1504 // Add a callback to accept one connection then stop the loop
1505 TestAcceptCallback acceptCallback;
1506 acceptCallback.setConnectionAcceptedFn(
1507 [&](int fd, const folly::SocketAddress& addr) {
1508 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1510 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1511 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1513 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1514 serverSocket->startAccepting();
1516 // Connect to the server socket
1517 std::shared_ptr<AsyncSocket> socket(
1518 AsyncSocket::newSocket(&eventBase, serverAddress));
1522 // Verify that the server accepted a connection
1523 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1524 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1525 TestAcceptCallback::TYPE_START);
1526 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1527 TestAcceptCallback::TYPE_ACCEPT);
1528 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1529 TestAcceptCallback::TYPE_STOP);
1530 int fd = acceptCallback.getEvents()->at(1).fd;
1532 // The accepted connection should already be in non-blocking mode
1533 int flags = fcntl(fd, F_GETFL, 0);
1534 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);
1537 // The accepted connection should already have TCP_NODELAY set
1539 socklen_t valueLength = sizeof(value);
1540 int rc = getsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &value, &valueLength);
1547 * Test AsyncServerSocket::removeAcceptCallback()
1549 TEST(AsyncSocketTest, RemoveAcceptCallback) {
1550 // Create a new AsyncServerSocket
1551 EventBase eventBase;
1552 std::shared_ptr<AsyncServerSocket> serverSocket(
1553 AsyncServerSocket::newSocket(&eventBase));
1554 serverSocket->bind(0);
1555 serverSocket->listen(16);
1556 folly::SocketAddress serverAddress;
1557 serverSocket->getAddress(&serverAddress);
1559 // Add several accept callbacks
1560 TestAcceptCallback cb1;
1561 TestAcceptCallback cb2;
1562 TestAcceptCallback cb3;
1563 TestAcceptCallback cb4;
1564 TestAcceptCallback cb5;
1565 TestAcceptCallback cb6;
1566 TestAcceptCallback cb7;
1568 // Test having callbacks remove other callbacks before them on the list,
1569 // after them on the list, or removing themselves.
1571 // Have callback 2 remove callback 3 and callback 5 the first time it is
1574 cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1575 std::shared_ptr<AsyncSocket> sock2(
1576 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2: -cb3 -cb5
1578 cb3.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1580 cb4.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1581 std::shared_ptr<AsyncSocket> sock3(
1582 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb4
1584 cb5.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1585 std::shared_ptr<AsyncSocket> sock5(
1586 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb7: -cb7
1589 cb2.setConnectionAcceptedFn(
1590 [&](int fd, const folly::SocketAddress& addr) {
1591 if (cb2Count == 0) {
1592 serverSocket->removeAcceptCallback(&cb3, nullptr);
1593 serverSocket->removeAcceptCallback(&cb5, nullptr);
1597 // Have callback 6 remove callback 4 the first time it is called,
1598 // and destroy the server socket the second time it is called
1600 cb6.setConnectionAcceptedFn(
1601 [&](int fd, const folly::SocketAddress& addr) {
1602 if (cb6Count == 0) {
1603 serverSocket->removeAcceptCallback(&cb4, nullptr);
1604 std::shared_ptr<AsyncSocket> sock6(
1605 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1606 std::shared_ptr<AsyncSocket> sock7(
1607 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb2
1608 std::shared_ptr<AsyncSocket> sock8(
1609 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: stop
1612 serverSocket.reset();
1616 // Have callback 7 remove itself
1617 cb7.setConnectionAcceptedFn(
1618 [&](int fd, const folly::SocketAddress& addr) {
1619 serverSocket->removeAcceptCallback(&cb7, nullptr);
1622 serverSocket->addAcceptCallback(&cb1, nullptr);
1623 serverSocket->addAcceptCallback(&cb2, nullptr);
1624 serverSocket->addAcceptCallback(&cb3, nullptr);
1625 serverSocket->addAcceptCallback(&cb4, nullptr);
1626 serverSocket->addAcceptCallback(&cb5, nullptr);
1627 serverSocket->addAcceptCallback(&cb6, nullptr);
1628 serverSocket->addAcceptCallback(&cb7, nullptr);
1629 serverSocket->startAccepting();
1631 // Make several connections to the socket
1632 std::shared_ptr<AsyncSocket> sock1(
1633 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1634 std::shared_ptr<AsyncSocket> sock4(
1635 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb6: -cb4
1637 // Loop until we are stopped
1640 // Check to make sure that the expected callbacks were invoked.
1642 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1643 // the AcceptCallbacks in round-robin fashion, in the order that they were
1644 // added. The code is implemented this way right now, but the API doesn't
1645 // explicitly require it be done this way. If we change the code not to be
1646 // exactly round robin in the future, we can simplify the test checks here.
1647 // (We'll also need to update the termination code, since we expect cb6 to
1648 // get called twice to terminate the loop.)
1649 CHECK_EQ(cb1.getEvents()->size(), 4);
1650 CHECK_EQ(cb1.getEvents()->at(0).type,
1651 TestAcceptCallback::TYPE_START);
1652 CHECK_EQ(cb1.getEvents()->at(1).type,
1653 TestAcceptCallback::TYPE_ACCEPT);
1654 CHECK_EQ(cb1.getEvents()->at(2).type,
1655 TestAcceptCallback::TYPE_ACCEPT);
1656 CHECK_EQ(cb1.getEvents()->at(3).type,
1657 TestAcceptCallback::TYPE_STOP);
1659 CHECK_EQ(cb2.getEvents()->size(), 4);
1660 CHECK_EQ(cb2.getEvents()->at(0).type,
1661 TestAcceptCallback::TYPE_START);
1662 CHECK_EQ(cb2.getEvents()->at(1).type,
1663 TestAcceptCallback::TYPE_ACCEPT);
1664 CHECK_EQ(cb2.getEvents()->at(2).type,
1665 TestAcceptCallback::TYPE_ACCEPT);
1666 CHECK_EQ(cb2.getEvents()->at(3).type,
1667 TestAcceptCallback::TYPE_STOP);
1669 CHECK_EQ(cb3.getEvents()->size(), 2);
1670 CHECK_EQ(cb3.getEvents()->at(0).type,
1671 TestAcceptCallback::TYPE_START);
1672 CHECK_EQ(cb3.getEvents()->at(1).type,
1673 TestAcceptCallback::TYPE_STOP);
1675 CHECK_EQ(cb4.getEvents()->size(), 3);
1676 CHECK_EQ(cb4.getEvents()->at(0).type,
1677 TestAcceptCallback::TYPE_START);
1678 CHECK_EQ(cb4.getEvents()->at(1).type,
1679 TestAcceptCallback::TYPE_ACCEPT);
1680 CHECK_EQ(cb4.getEvents()->at(2).type,
1681 TestAcceptCallback::TYPE_STOP);
1683 CHECK_EQ(cb5.getEvents()->size(), 2);
1684 CHECK_EQ(cb5.getEvents()->at(0).type,
1685 TestAcceptCallback::TYPE_START);
1686 CHECK_EQ(cb5.getEvents()->at(1).type,
1687 TestAcceptCallback::TYPE_STOP);
1689 CHECK_EQ(cb6.getEvents()->size(), 4);
1690 CHECK_EQ(cb6.getEvents()->at(0).type,
1691 TestAcceptCallback::TYPE_START);
1692 CHECK_EQ(cb6.getEvents()->at(1).type,
1693 TestAcceptCallback::TYPE_ACCEPT);
1694 CHECK_EQ(cb6.getEvents()->at(2).type,
1695 TestAcceptCallback::TYPE_ACCEPT);
1696 CHECK_EQ(cb6.getEvents()->at(3).type,
1697 TestAcceptCallback::TYPE_STOP);
1699 CHECK_EQ(cb7.getEvents()->size(), 3);
1700 CHECK_EQ(cb7.getEvents()->at(0).type,
1701 TestAcceptCallback::TYPE_START);
1702 CHECK_EQ(cb7.getEvents()->at(1).type,
1703 TestAcceptCallback::TYPE_ACCEPT);
1704 CHECK_EQ(cb7.getEvents()->at(2).type,
1705 TestAcceptCallback::TYPE_STOP);
1709 * Test AsyncServerSocket::removeAcceptCallback()
1711 TEST(AsyncSocketTest, OtherThreadAcceptCallback) {
1712 // Create a new AsyncServerSocket
1713 EventBase eventBase;
1714 std::shared_ptr<AsyncServerSocket> serverSocket(
1715 AsyncServerSocket::newSocket(&eventBase));
1716 serverSocket->bind(0);
1717 serverSocket->listen(16);
1718 folly::SocketAddress serverAddress;
1719 serverSocket->getAddress(&serverAddress);
1721 // Add several accept callbacks
1722 TestAcceptCallback cb1;
1723 auto thread_id = pthread_self();
1724 cb1.setAcceptStartedFn([&](){
1725 CHECK_NE(thread_id, pthread_self());
1726 thread_id = pthread_self();
1728 cb1.setConnectionAcceptedFn([&](int fd, const folly::SocketAddress& addr){
1729 CHECK_EQ(thread_id, pthread_self());
1730 serverSocket->removeAcceptCallback(&cb1, nullptr);
1732 cb1.setAcceptStoppedFn([&](){
1733 CHECK_EQ(thread_id, pthread_self());
1736 // Test having callbacks remove other callbacks before them on the list,
1737 serverSocket->addAcceptCallback(&cb1, nullptr);
1738 serverSocket->startAccepting();
1740 // Make several connections to the socket
1741 std::shared_ptr<AsyncSocket> sock1(
1742 AsyncSocket::newSocket(&eventBase, serverAddress)); // cb1
1744 // Loop in another thread
1745 auto other = std::thread([&](){
1750 // Check to make sure that the expected callbacks were invoked.
1752 // NOTE: This code depends on the AsyncServerSocket operating calling all of
1753 // the AcceptCallbacks in round-robin fashion, in the order that they were
1754 // added. The code is implemented this way right now, but the API doesn't
1755 // explicitly require it be done this way. If we change the code not to be
1756 // exactly round robin in the future, we can simplify the test checks here.
1757 // (We'll also need to update the termination code, since we expect cb6 to
1758 // get called twice to terminate the loop.)
1759 CHECK_EQ(cb1.getEvents()->size(), 3);
1760 CHECK_EQ(cb1.getEvents()->at(0).type,
1761 TestAcceptCallback::TYPE_START);
1762 CHECK_EQ(cb1.getEvents()->at(1).type,
1763 TestAcceptCallback::TYPE_ACCEPT);
1764 CHECK_EQ(cb1.getEvents()->at(2).type,
1765 TestAcceptCallback::TYPE_STOP);
1769 void serverSocketSanityTest(AsyncServerSocket* serverSocket) {
1770 // Add a callback to accept one connection then stop accepting
1771 TestAcceptCallback acceptCallback;
1772 acceptCallback.setConnectionAcceptedFn(
1773 [&](int fd, const folly::SocketAddress& addr) {
1774 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1776 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1777 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1779 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1780 serverSocket->startAccepting();
1782 // Connect to the server socket
1783 EventBase* eventBase = serverSocket->getEventBase();
1784 folly::SocketAddress serverAddress;
1785 serverSocket->getAddress(&serverAddress);
1786 AsyncSocket::UniquePtr socket(new AsyncSocket(eventBase, serverAddress));
1788 // Loop to process all events
1791 // Verify that the server accepted a connection
1792 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1793 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1794 TestAcceptCallback::TYPE_START);
1795 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1796 TestAcceptCallback::TYPE_ACCEPT);
1797 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1798 TestAcceptCallback::TYPE_STOP);
1801 /* Verify that we don't leak sockets if we are destroyed()
1802 * and there are still writes pending
1804 * If destroy() only calls close() instead of closeNow(),
1805 * it would shutdown(writes) on the socket, but it would
1806 * never be close()'d, and the socket would leak
1808 TEST(AsyncSocketTest, DestroyCloseTest) {
1814 std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&clientEB);
1816 socket->connect(&ccb, server.getAddress(), 30);
1818 // Accept the connection
1819 std::shared_ptr<AsyncSocket> acceptedSocket = server.acceptAsync(&serverEB);
1821 acceptedSocket->setReadCB(&rcb);
1823 // Write a large buffer to the socket that is larger than kernel buffer
1824 size_t simpleBufLength = 5000000;
1825 char* simpleBuf = new char[simpleBufLength];
1826 memset(simpleBuf, 'a', simpleBufLength);
1829 // Let the reads and writes run to completion
1830 int fd = acceptedSocket->getFd();
1832 acceptedSocket->write(&wcb, simpleBuf, simpleBufLength);
1834 acceptedSocket.reset();
1836 // Test that server socket was closed
1837 ssize_t sz = read(fd, simpleBuf, simpleBufLength);
1844 * Test AsyncServerSocket::useExistingSocket()
1846 TEST(AsyncSocketTest, ServerExistingSocket) {
1847 EventBase eventBase;
1849 // Test creating a socket, and letting AsyncServerSocket bind and listen
1851 // Manually create a socket
1852 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1855 // Create a server socket
1856 AsyncServerSocket::UniquePtr serverSocket(
1857 new AsyncServerSocket(&eventBase));
1858 serverSocket->useExistingSocket(fd);
1859 folly::SocketAddress address;
1860 serverSocket->getAddress(&address);
1862 serverSocket->bind(address);
1863 serverSocket->listen(16);
1865 // Make sure the socket works
1866 serverSocketSanityTest(serverSocket.get());
1869 // Test creating a socket and binding manually,
1870 // then letting AsyncServerSocket listen
1872 // Manually create a socket
1873 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1876 struct sockaddr_in addr;
1877 addr.sin_family = AF_INET;
1879 addr.sin_addr.s_addr = INADDR_ANY;
1880 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1882 // Look up the address that we bound to
1883 folly::SocketAddress boundAddress;
1884 boundAddress.setFromLocalAddress(fd);
1886 // Create a server socket
1887 AsyncServerSocket::UniquePtr serverSocket(
1888 new AsyncServerSocket(&eventBase));
1889 serverSocket->useExistingSocket(fd);
1890 serverSocket->listen(16);
1892 // Make sure AsyncServerSocket reports the same address that we bound to
1893 folly::SocketAddress serverSocketAddress;
1894 serverSocket->getAddress(&serverSocketAddress);
1895 CHECK_EQ(boundAddress, serverSocketAddress);
1897 // Make sure the socket works
1898 serverSocketSanityTest(serverSocket.get());
1901 // Test creating a socket, binding and listening manually,
1902 // then giving it to AsyncServerSocket
1904 // Manually create a socket
1905 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
1908 struct sockaddr_in addr;
1909 addr.sin_family = AF_INET;
1911 addr.sin_addr.s_addr = INADDR_ANY;
1912 CHECK_EQ(bind(fd, reinterpret_cast<struct sockaddr*>(&addr),
1914 // Look up the address that we bound to
1915 folly::SocketAddress boundAddress;
1916 boundAddress.setFromLocalAddress(fd);
1918 CHECK_EQ(listen(fd, 16), 0);
1920 // Create a server socket
1921 AsyncServerSocket::UniquePtr serverSocket(
1922 new AsyncServerSocket(&eventBase));
1923 serverSocket->useExistingSocket(fd);
1925 // Make sure AsyncServerSocket reports the same address that we bound to
1926 folly::SocketAddress serverSocketAddress;
1927 serverSocket->getAddress(&serverSocketAddress);
1928 CHECK_EQ(boundAddress, serverSocketAddress);
1930 // Make sure the socket works
1931 serverSocketSanityTest(serverSocket.get());
1935 TEST(AsyncSocketTest, UnixDomainSocketTest) {
1936 EventBase eventBase;
1938 // Create a server socket
1939 std::shared_ptr<AsyncServerSocket> serverSocket(
1940 AsyncServerSocket::newSocket(&eventBase));
1942 path.append("/anonymous");
1943 folly::SocketAddress serverAddress;
1944 serverAddress.setFromPath(path);
1945 serverSocket->bind(serverAddress);
1946 serverSocket->listen(16);
1948 // Add a callback to accept one connection then stop the loop
1949 TestAcceptCallback acceptCallback;
1950 acceptCallback.setConnectionAcceptedFn(
1951 [&](int fd, const folly::SocketAddress& addr) {
1952 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1954 acceptCallback.setAcceptErrorFn([&](const std::exception& ex) {
1955 serverSocket->removeAcceptCallback(&acceptCallback, nullptr);
1957 serverSocket->addAcceptCallback(&acceptCallback, nullptr);
1958 serverSocket->startAccepting();
1960 // Connect to the server socket
1961 std::shared_ptr<AsyncSocket> socket(
1962 AsyncSocket::newSocket(&eventBase, serverAddress));
1966 // Verify that the server accepted a connection
1967 CHECK_EQ(acceptCallback.getEvents()->size(), 3);
1968 CHECK_EQ(acceptCallback.getEvents()->at(0).type,
1969 TestAcceptCallback::TYPE_START);
1970 CHECK_EQ(acceptCallback.getEvents()->at(1).type,
1971 TestAcceptCallback::TYPE_ACCEPT);
1972 CHECK_EQ(acceptCallback.getEvents()->at(2).type,
1973 TestAcceptCallback::TYPE_STOP);
1974 int fd = acceptCallback.getEvents()->at(1).fd;
1976 // The accepted connection should already be in non-blocking mode
1977 int flags = fcntl(fd, F_GETFL, 0);
1978 CHECK_EQ(flags & O_NONBLOCK, O_NONBLOCK);