/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#include <folly/io/async/AsyncSocket.h>
#include <folly/ExceptionWrapper.h>
writeFlags |= WriteFlags::CORK;
}
- socket_->adjustZeroCopyFlags(getOps(), getOpCount(), writeFlags);
+ socket_->adjustZeroCopyFlags(writeFlags);
auto writeResult = socket_->performWrite(
getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
connect(nullptr, ip, port, connectTimeout);
}
-AsyncSocket::AsyncSocket(EventBase* evb, int fd)
- : eventBase_(evb),
+AsyncSocket::AsyncSocket(EventBase* evb, int fd, uint32_t zeroCopyBufId)
+ : zeroCopyBufId_(zeroCopyBufId),
+ eventBase_(evb),
writeTimeout_(this, evb),
ioHandler_(this, evb, fd),
immediateReadHandler_(this) {
- VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
- << fd << ")";
+ VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd=" << fd
+ << ", zeroCopyBufId=" << zeroCopyBufId << ")";
init();
fd_ = fd;
setCloseOnExec();
}
AsyncSocket::AsyncSocket(AsyncSocket::UniquePtr oldAsyncSocket)
- : AsyncSocket(oldAsyncSocket->getEventBase(), oldAsyncSocket->detachFd()) {
+ : AsyncSocket(
+ oldAsyncSocket->getEventBase(),
+ oldAsyncSocket->detachFd(),
+ oldAsyncSocket->getZeroCopyBufId()) {
preReceivedData_ = std::move(oldAsyncSocket->preReceivedData_);
}
<< ", fd=" << fd_ << ", callback=" << callback
<< ", state=" << state_;
+ // In the latest stable kernel 4.14.3 as of 2017-12-04, unix domain
+ // socket does not support MSG_ERRQUEUE. So recvmsg(MSG_ERRQUEUE)
+ // will read application data from unix doamin socket as error
+ // message, which breaks the message flow in application. Feel free
+ // to remove the next code block if MSG_ERRQUEUE is added for unix
+ // domain socket in the future.
+ if (callback != nullptr) {
+ cacheLocalAddress();
+ if (localAddr_.getFamily() == AF_UNIX) {
+ LOG(ERROR) << "Failed to set ErrMessageCallback=" << callback
+ << " for Unix Doamin Socket where MSG_ERRQUEUE is unsupported,"
+ << " fd=" << fd_;
+ return;
+ }
+ }
+
// Short circuit if callback is the same as the existing errMessageCallback_.
if (callback == errMessageCallback_) {
return;
return false;
}
-void AsyncSocket::setZeroCopyWriteChainThreshold(size_t threshold) {
- zeroCopyWriteChainThreshold_ = threshold;
-}
-
bool AsyncSocket::isZeroCopyRequest(WriteFlags flags) {
return (zeroCopyEnabled_ && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY));
}
-void AsyncSocket::adjustZeroCopyFlags(
- folly::IOBuf* buf,
- folly::WriteFlags& flags) {
- if (zeroCopyEnabled_ && zeroCopyWriteChainThreshold_ && buf &&
- buf->isManaged()) {
- if (buf->computeChainDataLength() >= zeroCopyWriteChainThreshold_) {
- flags |= folly::WriteFlags::WRITE_MSG_ZEROCOPY;
- } else {
- flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY);
- }
- }
-}
-
-void AsyncSocket::adjustZeroCopyFlags(
- const iovec* vec,
- uint32_t count,
- folly::WriteFlags& flags) {
- if (zeroCopyEnabled_ && zeroCopyWriteChainThreshold_) {
- count = std::min<uint32_t>(count, kIovMax);
- size_t sum = 0;
- for (uint32_t i = 0; i < count; ++i) {
- const iovec* v = vec + i;
- sum += v->iov_len;
- }
-
- if (sum >= zeroCopyWriteChainThreshold_) {
- flags |= folly::WriteFlags::WRITE_MSG_ZEROCOPY;
- } else {
- flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY);
- }
+void AsyncSocket::adjustZeroCopyFlags(folly::WriteFlags& flags) {
+ if (!zeroCopyEnabled_) {
+ flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY);
}
}
void AsyncSocket::addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
- uint32_t id = getNextZeroCopyBuffId();
+ uint32_t id = getNextZeroCopyBufId();
folly::IOBuf* ptr = buf.get();
idZeroCopyBufPtrMap_[id] = ptr;
}
void AsyncSocket::addZeroCopyBuf(folly::IOBuf* ptr) {
- uint32_t id = getNextZeroCopyBuffId();
+ uint32_t id = getNextZeroCopyBufId();
idZeroCopyBufPtrMap_[id] = ptr;
idZeroCopyBufInfoMap_[ptr].count_++;
if (0 == --iter1->second.count_) {
idZeroCopyBufInfoMap_.erase(iter1);
}
+
+ idZeroCopyBufPtrMap_.erase(iter);
}
void AsyncSocket::setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
WriteFlags flags) {
- adjustZeroCopyFlags(buf.get(), flags);
+ adjustZeroCopyFlags(flags);
constexpr size_t kSmallSizeMax = 64;
size_t count = buf->countChainElements();
return failWrite(__func__, callback, 0, ex);
} else if (countWritten == count) {
// done, add the whole buffer
- if (isZeroCopyRequest(flags)) {
+ if (countWritten && isZeroCopyRequest(flags)) {
addZeroCopyBuf(std::move(ioBuf));
}
// We successfully wrote everything.
return;
} else { // continue writing the next writeReq
// add just the ptr
- if (isZeroCopyRequest(flags)) {
+ if (bytesWritten && isZeroCopyRequest(flags)) {
addZeroCopyBuf(ioBuf.get());
}
if (bufferCallback_) {
eventBase_ = eventBase;
ioHandler_.attachEventBase(eventBase);
+
+ updateEventRegistration();
+
writeTimeout_.attachEventBase(eventBase);
if (evbChangeCb_) {
evbChangeCb_->evbAttached(this);
eventBase_->dcheckIsInEventBaseThread();
eventBase_ = nullptr;
+
+ ioHandler_.unregisterHandler();
+
ioHandler_.detachEventBase();
writeTimeout_.detachEventBase();
if (evbChangeCb_) {
DCHECK(eventBase_ != nullptr);
eventBase_->dcheckIsInEventBaseThread();
- return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
+ return !writeTimeout_.isScheduled();
}
void AsyncSocket::cacheAddresses() {
// EventHandler::WRITE is set. Any of these flags can
// indicate that there are messages available in the socket
// error message queue.
- handleErrMessages();
+ // Return if we handle any error messages - this is to avoid
+ // unnecessary read/write calls
+ if (handleErrMessages()) {
+ return;
+ }
// Return now if handleErrMessages() detached us from our EventBase
if (eventBase_ != originalEventBase) {
readCallback_->getReadBuffer(buf, buflen);
}
-void AsyncSocket::handleErrMessages() noexcept {
+size_t AsyncSocket::handleErrMessages() noexcept {
// This method has non-empty implementation only for platforms
// supporting per-socket error queues.
VLOG(5) << "AsyncSocket::handleErrMessages() this=" << this << ", fd=" << fd_
<< ", state=" << state_;
- if (errMessageCallback_ == nullptr &&
- (!zeroCopyEnabled_ || idZeroCopyBufPtrMap_.empty())) {
+ if (errMessageCallback_ == nullptr && idZeroCopyBufPtrMap_.empty()) {
VLOG(7) << "AsyncSocket::handleErrMessages(): "
<< "no callback installed - exiting.";
- return;
+ return 0;
}
#ifdef FOLLY_HAVE_MSG_ERRQUEUE
msg.msg_flags = 0;
int ret;
+ size_t num = 0;
while (true) {
ret = recvmsg(fd_, &msg, MSG_ERRQUEUE);
VLOG(5) << "AsyncSocket::handleErrMessages(): recvmsg returned " << ret;
errnoCopy);
failErrMessageRead(__func__, ex);
}
- return;
+
+ return num;
}
for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
cmsg != nullptr && cmsg->cmsg_len != 0;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ ++num;
if (isZeroCopyMsg(*cmsg)) {
processZeroCopyMsg(*cmsg);
} else {
}
}
}
+#else
+ return 0;
#endif // FOLLY_HAVE_MSG_ERRQUEUE
}
+bool AsyncSocket::processZeroCopyWriteInProgress() noexcept {
+ eventBase_->dcheckIsInEventBaseThread();
+ if (idZeroCopyBufPtrMap_.empty()) {
+ return true;
+ }
+
+ handleErrMessages();
+
+ return idZeroCopyBufPtrMap_.empty();
+}
+
void AsyncSocket::handleRead() noexcept {
VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
<< ", state=" << state_;
// this bug is fixed.
tryAgain |= (errno == ENOTCONN);
#endif
+
+ // workaround for running with zerocopy enabled but without a proper
+ // memlock value - see ulimit -l
+ if (zeroCopyEnabled_ && (errno == ENOBUFS)) {
+ tryAgain = true;
+ zeroCopyEnabled_ = false;
+ }
+
if (!writeResult.exception && tryAgain) {
// TCP buffer is full; we can't write any more data right now.
*countWritten = 0;