X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2Fasync%2Ftest%2FAsyncSocketTest2.cpp;h=13f23b75d81a6be5fab18944100bb2202f5d3ed4;hp=afe23fa134eb37f37e037373df6f4e627f6fc1e9;hb=5c74326fdc75ccdfc2152c15203625d8588096b6;hpb=16dc0043e4d9ad309b6d66565511181732ff0827 diff --git a/folly/io/async/test/AsyncSocketTest2.cpp b/folly/io/async/test/AsyncSocketTest2.cpp index afe23fa1..13f23b75 100644 --- a/folly/io/async/test/AsyncSocketTest2.cpp +++ b/folly/io/async/test/AsyncSocketTest2.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2016 Facebook, Inc. + * Copyright 2017 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -944,7 +944,7 @@ TEST(AsyncSocketTest, ConnectCallbackWrite) { testConnectOptWrite(100, 200); // Test using a large buffer in the connect callback, that should block - const size_t largeSize = 8*1024*1024; + const size_t largeSize = 32 * 1024 * 1024; testConnectOptWrite(100, largeSize); // Test using a large initial write @@ -1013,8 +1013,12 @@ TEST(AsyncSocketTest, WriteTimeout) { AsyncSocket::newSocket(&evb, server.getAddress(), 30); evb.loop(); // loop until the socket is connected - // write() a large chunk of data, with no-one on the other end reading - size_t writeLength = 8*1024*1024; + // write() a large chunk of data, with no-one on the other end reading. + // Tricky: the kernel caches the connection metrics for recently-used + // routes (see tcp_no_metrics_save) so a freshly opened connection can + // have a send buffer size bigger than wmem_default. This makes the test + // flaky on contbuild if writeLength is < wmem_max (20M on our systems). + size_t writeLength = 32 * 1024 * 1024; uint32_t timeout = 200; socket->setSendTimeout(timeout); scoped_array buf(new char[writeLength]); @@ -1071,7 +1075,7 @@ TEST(AsyncSocketTest, WritePipeError) { acceptedSocket->close(); // write() a large chunk of data - size_t writeLength = 8*1024*1024; + size_t writeLength = 32 * 1024 * 1024; scoped_array buf(new char[writeLength]); memset(buf.get(), 'a', writeLength); WriteCallback wcb; @@ -1090,6 +1094,72 @@ TEST(AsyncSocketTest, WritePipeError) { ASSERT_FALSE(socket->isClosedByPeer()); } +/** + * Test that bytes written is correctly computed in case of write failure + */ +TEST(AsyncSocketTest, WriteErrorCallbackBytesWritten) { + // Send and receive buffer sizes for the sockets. + const int sockBufSize = 8 * 1024; + + TestServer server(false, sockBufSize); + + AsyncSocket::OptionMap options{ + {{SOL_SOCKET, SO_SNDBUF}, sockBufSize}, + {{SOL_SOCKET, SO_RCVBUF}, sockBufSize}, + {{IPPROTO_TCP, TCP_NODELAY}, 1}, + }; + + // The current thread will be used by the receiver - use a separate thread + // for the sender. + EventBase senderEvb; + std::thread senderThread([&]() { senderEvb.loopForever(); }); + + ConnCallback ccb; + std::shared_ptr socket; + + senderEvb.runInEventBaseThreadAndWait([&]() { + socket = AsyncSocket::newSocket(&senderEvb); + socket->connect(&ccb, server.getAddress(), 30, options); + }); + + // accept the socket on the server side + std::shared_ptr acceptedSocket = server.accept(); + + // Send a big (45KB) write so that it is partially written. The first write + // is 16KB (8KB on both sides) and subsequent writes are 8KB each. Reading + // just under 24KB would cause 3-4 writes for the total of 32-40KB in the + // following sequence: 16KB + 8KB + 8KB (+ 8KB). This ensures that not all + // bytes are written when the socket is reset. Having at least 3 writes + // ensures that the total size (45KB) would be exceeed in case of overcounting + // based on the initial write size of 16KB. + constexpr size_t sendSize = 45 * 1024; + auto const sendBuf = std::vector(sendSize, 'a'); + + WriteCallback wcb; + + senderEvb.runInEventBaseThreadAndWait( + [&]() { socket->write(&wcb, sendBuf.data(), sendSize); }); + + // Reading 20KB would cause three additional writes of 8KB, but less + // than 45KB total, so the socket is reset before all bytes are written. + constexpr size_t recvSize = 20 * 1024; + uint8_t recvBuf[recvSize]; + int bytesRead = acceptedSocket->readAll(recvBuf, sizeof(recvBuf)); + + acceptedSocket->closeWithReset(); + + senderEvb.terminateLoopSoon(); + senderThread.join(); + + LOG(INFO) << "Bytes written: " << wcb.bytesWritten; + + ASSERT_EQ(STATE_FAILED, wcb.state); + ASSERT_GE(wcb.bytesWritten, bytesRead); + ASSERT_LE(wcb.bytesWritten, sendSize); + ASSERT_EQ(recvSize, bytesRead); + ASSERT(32 * 1024 == wcb.bytesWritten || 40 * 1024 == wcb.bytesWritten); +} + /** * Test writing a mix of simple buffers and IOBufs */ @@ -1107,6 +1177,13 @@ TEST(AsyncSocketTest, WriteIOBuf) { ReadCallback rcb; acceptedSocket->setReadCB(&rcb); + // Check if EOR tracking flag can be set and reset. + EXPECT_FALSE(socket->isEorTrackingEnabled()); + socket->setEorTracking(true); + EXPECT_TRUE(socket->isEorTrackingEnabled()); + socket->setEorTracking(false); + EXPECT_FALSE(socket->isEorTrackingEnabled()); + // Write a simple buffer to the socket constexpr size_t simpleBufLength = 5; char simpleBuf[simpleBufLength]; @@ -2926,4 +3003,24 @@ TEST(AsyncSocketTest, ConnectTFOWithBigData) { EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded()); } +class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback { + public: + MOCK_METHOD1(evbAttached, void(AsyncSocket*)); + MOCK_METHOD1(evbDetached, void(AsyncSocket*)); +}; + +TEST(AsyncSocketTest, EvbCallbacks) { + auto cb = folly::make_unique(); + EventBase evb; + std::shared_ptr socket = AsyncSocket::newSocket(&evb); + + InSequence seq; + EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1); + EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1); + + socket->setEvbChangedCallback(std::move(cb)); + socket->detachEventBase(); + socket->attachEventBase(&evb); +} + #endif