X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fio%2Fasync%2FAsyncSocket.cpp;h=a8fb13e0425f25fce7b658fa8321e37a464b8617;hp=4ab1cdea0f74e9de696b172ca7fd82cf0fef2681;hb=a1614feea3f3c0beb75fb2dc43ec45b3e5d57223;hpb=7f22ad99b5c4583d8f579911b8907c7758a4151e diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp index 4ab1cdea..a8fb13e0 100644 --- a/folly/io/async/AsyncSocket.cpp +++ b/folly/io/async/AsyncSocket.cpp @@ -31,6 +31,7 @@ #include #include #include +#include using std::string; using std::unique_ptr; @@ -317,6 +318,10 @@ void AsyncSocket::connect(ConnectCallback* callback, return invalidState(callback); } + connectStartTime_ = std::chrono::steady_clock::now(); + // Make connect end time at least >= connectStartTime. + connectEndTime_ = connectStartTime_; + assert(fd_ == -1); state_ = StateEnum::CONNECTING; connectCallback_ = callback; @@ -462,10 +467,7 @@ void AsyncSocket::connect(ConnectCallback* callback, assert(readCallback_ == nullptr); assert(writeReqHead_ == nullptr); state_ = StateEnum::ESTABLISHED; - if (callback) { - connectCallback_ = nullptr; - callback->connectSuccess(); - } + invokeConnectSuccess(); } void AsyncSocket::connect(ConnectCallback* callback, @@ -610,21 +612,22 @@ void AsyncSocket::write(WriteCallback* callback, iovec op; op.iov_base = const_cast(buf); op.iov_len = bytes; - writeImpl(callback, &op, 1, std::move(unique_ptr()), flags); + writeImpl(callback, &op, 1, unique_ptr(), flags); } void AsyncSocket::writev(WriteCallback* callback, const iovec* vec, size_t count, WriteFlags flags) { - writeImpl(callback, vec, count, std::move(unique_ptr()), flags); + writeImpl(callback, vec, count, unique_ptr(), flags); } void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr&& buf, WriteFlags flags) { + constexpr size_t kSmallSizeMax = 64; size_t count = buf->countChainElements(); - if (count <= 64) { - iovec vec[count]; + if (count <= kSmallSizeMax) { + iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)]; writeChainImpl(callback, vec, count, std::move(buf), flags); } else { iovec* vec = new iovec[count]; @@ -685,7 +688,11 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec, callback->writeSuccess(); } return; - } // else { continue writing the next writeReq } + } else { // continue writing the next writeReq + if (bufferCallback_) { + bufferCallback_->onEgressBuffered(); + } + } mustRegister = true; } } else if (!connecting()) { @@ -836,11 +843,7 @@ void AsyncSocket::closeNow() { doClose(); } - if (connectCallback_) { - ConnectCallback* callback = connectCallback_; - connectCallback_ = nullptr; - callback->connectErr(socketClosedLocallyEx); - } + invokeConnectErr(socketClosedLocallyEx); failAllWrites(socketClosedLocallyEx); @@ -1063,7 +1066,10 @@ bool AsyncSocket::isDetachable() const { } void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const { - address->setFromLocalAddress(fd_); + if (!localAddr_.isInitialized()) { + localAddr_.setFromLocalAddress(fd_); + } + *address = localAddr_; } void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const { @@ -1196,6 +1202,37 @@ int AsyncSocket::setTCPProfile(int profd) { return 0; } +void AsyncSocket::setPersistentCork(bool cork) { + if (setCork(cork) == 0) { + persistentCork_ = cork; + } +} + +int AsyncSocket::setCork(bool cork) { +#ifdef TCP_CORK + if (fd_ < 0) { + VLOG(4) << "AsyncSocket::setCork() called on non-open socket " + << this << "(stats=" << state_ << ")"; + return EINVAL; + } + + if (corked_ == cork) { + return 0; + } + + int flag = cork ? 1 : 0; + if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) { + int errnoCopy = errno; + VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket" + << this << "(fd=" << fd_ << ", state=" << state_ << "):" + << folly::errnoStr(errnoCopy); + return errnoCopy; + } + corked_ = cork; +#endif + return 0; +} + void AsyncSocket::ioReady(uint16_t events) noexcept { VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_ << ", events=" << std::hex << events << ", state=" << state_; @@ -1231,8 +1268,18 @@ void AsyncSocket::ioReady(uint16_t events) noexcept { } } -ssize_t AsyncSocket::performRead(void* buf, size_t buflen) { - ssize_t bytes = recv(fd_, buf, buflen, MSG_DONTWAIT); +ssize_t AsyncSocket::performRead(void** buf, + size_t* buflen, + size_t* /* offset */) { + VLOG(5) << "AsyncSocket::performRead() this=" << this + << ", buf=" << *buf << ", buflen=" << *buflen; + + int recvFlags = 0; + if (peek_) { + recvFlags |= MSG_PEEK; + } + + ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags); if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { // No more data to read right now. @@ -1246,6 +1293,12 @@ ssize_t AsyncSocket::performRead(void* buf, size_t buflen) { } } +void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept { + // no matter what, buffer should be preapared for non-ssl socket + CHECK(readCallback_); + readCallback_->getReadBuffer(buf, buflen); +} + void AsyncSocket::handleRead() noexcept { VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_ << ", state=" << state_; @@ -1277,9 +1330,10 @@ void AsyncSocket::handleRead() noexcept { while (readCallback_ && eventBase_ == originalEventBase) { // Get the buffer to read into. void* buf = nullptr; - size_t buflen = 0; + size_t buflen = 0, offset = 0; try { - readCallback_->getReadBuffer(&buf, &buflen); + prepareReadBuffer(&buf, &buflen); + VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen; } catch (const AsyncSocketException& ex) { return failRead(__func__, ex); } catch (const std::exception& ex) { @@ -1294,7 +1348,7 @@ void AsyncSocket::handleRead() noexcept { "non-exception type"); return failRead(__func__, ex); } - if (buf == nullptr || buflen == 0) { + if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) { AsyncSocketException ex(AsyncSocketException::BAD_ARGS, "ReadCallback::getReadBuffer() returned " "empty buffer"); @@ -1302,9 +1356,23 @@ void AsyncSocket::handleRead() noexcept { } // Perform the read - ssize_t bytesRead = performRead(buf, buflen); + ssize_t bytesRead = performRead(&buf, &buflen, &offset); + VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got " + << bytesRead << " bytes"; if (bytesRead > 0) { - readCallback_->readDataAvailable(bytesRead); + if (!isBufferMovable_) { + readCallback_->readDataAvailable(bytesRead); + } else { + CHECK(kOpenSslModeMoveBufferOwnership); + VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got " + << "buf=" << buf << ", " << bytesRead << "/" << buflen + << ", offset=" << offset; + auto readBuf = folly::IOBuf::takeOwnership(buf, buflen); + readBuf->trimStart(offset); + readBuf->trimEnd(buflen - offset - bytesRead); + readCallback_->readBufferAvailable(std::move(readBuf)); + } + // Fall through and continue around the loop if the read // completely filled the available buffer. // Note that readCallback_ may have been uninstalled or changed inside @@ -1316,11 +1384,13 @@ void AsyncSocket::handleRead() noexcept { // No more data to read right now. return; } else if (bytesRead == READ_ERROR) { + readErr_ = READ_ERROR; AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, withAddr("recv() failed"), errno); return failRead(__func__, ex); } else { assert(bytesRead == READ_EOF); + readErr_ = READ_EOF; // EOF shutdownFlags_ |= SHUT_READ; if (!updateEventRegistration(0, EventHandler::READ)) { @@ -1444,6 +1514,9 @@ void AsyncSocket::handleWrite() noexcept { // We'll continue around the loop, trying to write another request } else { // Partial write. + if (bufferCallback_) { + bufferCallback_->onEgressBuffered(); + } writeReqHead_->consume(); // Stop after a partial write; it's highly likely that a subsequent write // attempt will just return EAGAIN. @@ -1467,6 +1540,9 @@ void AsyncSocket::handleWrite() noexcept { return; } } + if (!writeReqHead_ && bufferCallback_) { + bufferCallback_->onEgressBufferCleared(); + } } void AsyncSocket::checkForImmediateRead() noexcept { @@ -1584,13 +1660,7 @@ void AsyncSocket::handleConnect() noexcept { // callbacks (since the callbacks may call detachEventBase()). EventBase* originalEventBase = eventBase_; - // Call the connect callback. - if (connectCallback_) { - ConnectCallback* callback = connectCallback_; - connectCallback_ = nullptr; - callback->connectSuccess(); - } - + invokeConnectSuccess(); // Note that the connect callback may have changed our state. // (set or unset the read callback, called write(), closed the socket, etc.) // The following code needs to handle these situations correctly. @@ -1772,12 +1842,7 @@ void AsyncSocket::finishFail() { AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR, withAddr("socket closing after error")); - if (connectCallback_) { - ConnectCallback* callback = connectCallback_; - connectCallback_ = nullptr; - callback->connectErr(ex); - } - + invokeConnectErr(ex); failAllWrites(ex); if (readCallback_) { @@ -1803,12 +1868,7 @@ void AsyncSocket::failConnect(const char* fn, const AsyncSocketException& ex) { << ex.what(); startFail(); - if (connectCallback_ != nullptr) { - ConnectCallback* callback = connectCallback_; - connectCallback_ = nullptr; - callback->connectErr(ex); - } - + invokeConnectErr(ex); finishFail(); } @@ -1898,6 +1958,7 @@ void AsyncSocket::invalidState(ConnectCallback* callback) { AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN, "connect() called with socket in invalid state"); + connectEndTime_ = std::chrono::steady_clock::now(); if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) { if (callback) { callback->connectErr(ex); @@ -1914,6 +1975,24 @@ void AsyncSocket::invalidState(ConnectCallback* callback) { } } +void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) { + connectEndTime_ = std::chrono::steady_clock::now(); + if (connectCallback_) { + ConnectCallback* callback = connectCallback_; + connectCallback_ = nullptr; + callback->connectErr(ex); + } +} + +void AsyncSocket::invokeConnectSuccess() { + connectEndTime_ = std::chrono::steady_clock::now(); + if (connectCallback_) { + ConnectCallback* callback = connectCallback_; + connectCallback_ = nullptr; + callback->connectSuccess(); + } +} + void AsyncSocket::invalidState(ReadCallback* callback) { VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << "): setReadCallback(" << callback @@ -1985,4 +2064,8 @@ std::string AsyncSocket::withAddr(const std::string& s) { return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")"; } +void AsyncSocket::setBufferCallback(BufferCallback* cb) { + bufferCallback_ = cb; +} + } // folly