X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fio%2Fasync%2FAsyncSocket.cpp;h=a8fb13e0425f25fce7b658fa8321e37a464b8617;hb=a1614feea3f3c0beb75fb2dc43ec45b3e5d57223;hp=e7c9c4510499da09cb3a16e0d57c330825bcd8eb;hpb=663d3a7b260b9991e519310fb304c71edcff2498;p=folly.git diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp index e7c9c451..a8fb13e0 100644 --- a/folly/io/async/AsyncSocket.cpp +++ b/folly/io/async/AsyncSocket.cpp @@ -31,6 +31,7 @@ #include #include #include +#include using std::string; using std::unique_ptr; @@ -317,6 +318,10 @@ void AsyncSocket::connect(ConnectCallback* callback, return invalidState(callback); } + connectStartTime_ = std::chrono::steady_clock::now(); + // Make connect end time at least >= connectStartTime. + connectEndTime_ = connectStartTime_; + assert(fd_ == -1); state_ = StateEnum::CONNECTING; connectCallback_ = callback; @@ -462,10 +467,7 @@ void AsyncSocket::connect(ConnectCallback* callback, assert(readCallback_ == nullptr); assert(writeReqHead_ == nullptr); state_ = StateEnum::ESTABLISHED; - if (callback) { - connectCallback_ = nullptr; - callback->connectSuccess(); - } + invokeConnectSuccess(); } void AsyncSocket::connect(ConnectCallback* callback, @@ -610,21 +612,22 @@ void AsyncSocket::write(WriteCallback* callback, iovec op; op.iov_base = const_cast(buf); op.iov_len = bytes; - writeImpl(callback, &op, 1, std::move(unique_ptr()), flags); + writeImpl(callback, &op, 1, unique_ptr(), flags); } void AsyncSocket::writev(WriteCallback* callback, const iovec* vec, size_t count, WriteFlags flags) { - writeImpl(callback, vec, count, std::move(unique_ptr()), flags); + writeImpl(callback, vec, count, unique_ptr(), flags); } void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr&& buf, WriteFlags flags) { + constexpr size_t kSmallSizeMax = 64; size_t count = buf->countChainElements(); - if (count <= 64) { - iovec vec[count]; + if (count <= kSmallSizeMax) { + iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)]; writeChainImpl(callback, vec, count, std::move(buf), flags); } else { iovec* vec = new iovec[count]; @@ -685,7 +688,11 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, callback->writeSuccess(); } return; - } // else { continue writing the next writeReq } + } else { // continue writing the next writeReq + if (bufferCallback_) { + bufferCallback_->onEgressBuffered(); + } + } mustRegister = true; } } else if (!connecting()) { @@ -836,11 +843,7 @@ void AsyncSocket::closeNow() { doClose(); } - if (connectCallback_) { - ConnectCallback* callback = connectCallback_; - connectCallback_ = nullptr; - callback->connectErr(socketClosedLocallyEx); - } + invokeConnectErr(socketClosedLocallyEx); failAllWrites(socketClosedLocallyEx); @@ -1063,7 +1066,10 @@ bool AsyncSocket::isDetachable() const { } void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const { - address->setFromLocalAddress(fd_); + if (!localAddr_.isInitialized()) { + localAddr_.setFromLocalAddress(fd_); + } + *address = localAddr_; } void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const { @@ -1196,6 +1202,37 @@ int AsyncSocket::setTCPProfile(int profd) { return 0; } +void AsyncSocket::setPersistentCork(bool cork) { + if (setCork(cork) == 0) { + persistentCork_ = cork; + } +} + +int AsyncSocket::setCork(bool cork) { +#ifdef TCP_CORK + if (fd_ < 0) { + VLOG(4) << "AsyncSocket::setCork() called on non-open socket " + << this << "(stats=" << state_ << ")"; + return EINVAL; + } + + if (corked_ == cork) { + return 0; + } + + int flag = cork ? 1 : 0; + if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) { + int errnoCopy = errno; + VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket" + << this << "(fd=" << fd_ << ", state=" << state_ << "):" + << folly::errnoStr(errnoCopy); + return errnoCopy; + } + corked_ = cork; +#endif + return 0; +} + void AsyncSocket::ioReady(uint16_t events) noexcept { VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_ << ", events=" << std::hex << events << ", state=" << state_; @@ -1231,7 +1268,9 @@ void AsyncSocket::ioReady(uint16_t events) noexcept { } } -ssize_t AsyncSocket::performRead(void** buf, size_t* buflen, size_t* offset) { +ssize_t AsyncSocket::performRead(void** buf, + size_t* buflen, + size_t* /* offset */) { VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf << ", buflen=" << *buflen; @@ -1345,11 +1384,13 @@ void AsyncSocket::handleRead() noexcept { // No more data to read right now. return; } else if (bytesRead == READ_ERROR) { + readErr_ = READ_ERROR; AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, withAddr("recv() failed"), errno); return failRead(__func__, ex); } else { assert(bytesRead == READ_EOF); + readErr_ = READ_EOF; // EOF shutdownFlags_ |= SHUT_READ; if (!updateEventRegistration(0, EventHandler::READ)) { @@ -1473,6 +1514,9 @@ void AsyncSocket::handleWrite() noexcept { // We'll continue around the loop, trying to write another request } else { // Partial write. + if (bufferCallback_) { + bufferCallback_->onEgressBuffered(); + } writeReqHead_->consume(); // Stop after a partial write; it's highly likely that a subsequent write // attempt will just return EAGAIN. @@ -1496,6 +1540,9 @@ void AsyncSocket::handleWrite() noexcept { return; } } + if (!writeReqHead_ && bufferCallback_) { + bufferCallback_->onEgressBufferCleared(); + } } void AsyncSocket::checkForImmediateRead() noexcept { @@ -1613,13 +1660,7 @@ void AsyncSocket::handleConnect() noexcept { // callbacks (since the callbacks may call detachEventBase()). EventBase* originalEventBase = eventBase_; - // Call the connect callback. - if (connectCallback_) { - ConnectCallback* callback = connectCallback_; - connectCallback_ = nullptr; - callback->connectSuccess(); - } - + invokeConnectSuccess(); // Note that the connect callback may have changed our state. // (set or unset the read callback, called write(), closed the socket, etc.) // The following code needs to handle these situations correctly. @@ -1801,12 +1842,7 @@ void AsyncSocket::finishFail() { AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, withAddr("socket closing after error")); - if (connectCallback_) { - ConnectCallback* callback = connectCallback_; - connectCallback_ = nullptr; - callback->connectErr(ex); - } - + invokeConnectErr(ex); failAllWrites(ex); if (readCallback_) { @@ -1832,12 +1868,7 @@ void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) { << ex.what(); startFail(); - if (connectCallback_ != nullptr) { - ConnectCallback* callback = connectCallback_; - connectCallback_ = nullptr; - callback->connectErr(ex); - } - + invokeConnectErr(ex); finishFail(); } @@ -1927,6 +1958,7 @@ void AsyncSocket::invalidState(ConnectCallback* callback) { AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN, "connect() called with socket in invalid state"); + connectEndTime_ = std::chrono::steady_clock::now(); if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) { if (callback) { callback->connectErr(ex); @@ -1943,6 +1975,24 @@ void AsyncSocket::invalidState(ConnectCallback* callback) { } } +void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) { + connectEndTime_ = std::chrono::steady_clock::now(); + if (connectCallback_) { + ConnectCallback* callback = connectCallback_; + connectCallback_ = nullptr; + callback->connectErr(ex); + } +} + +void AsyncSocket::invokeConnectSuccess() { + connectEndTime_ = std::chrono::steady_clock::now(); + if (connectCallback_) { + ConnectCallback* callback = connectCallback_; + connectCallback_ = nullptr; + callback->connectSuccess(); + } +} + void AsyncSocket::invalidState(ReadCallback* callback) { VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << "): setReadCallback(" << callback @@ -2014,4 +2064,8 @@ std::string AsyncSocket::withAddr(const std::string& s) { return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")"; } +void AsyncSocket::setBufferCallback(BufferCallback* cb) { + bufferCallback_ = cb; +} + } // folly