/*
- * Copyright 2017 Facebook, Inc.
+ * Copyright 2017-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#include <folly/io/async/AsyncSocket.h>
#include <folly/ExceptionWrapper.h>
+#include <folly/Format.h>
#include <folly/Portability.h>
#include <folly/SocketAddress.h>
#include <folly/io/Cursor.h>
namespace folly {
static constexpr bool msgErrQueueSupported =
-#ifdef MSG_ERRQUEUE
+#ifdef FOLLY_HAVE_MSG_ERRQUEUE
true;
#else
false;
-#endif // MSG_ERRQUEUE
+#endif // FOLLY_HAVE_MSG_ERRQUEUE
// static members initializers
const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
if (getNext() != nullptr) {
writeFlags |= WriteFlags::CORK;
}
+
+ socket_->adjustZeroCopyFlags(writeFlags);
+
auto writeResult = socket_->performWrite(
getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0;
+ if (bytesWritten_) {
+ if (socket_->isZeroCopyRequest(writeFlags)) {
+ if (isComplete()) {
+ socket_->addZeroCopyBuf(std::move(ioBuf_));
+ } else {
+ socket_->addZeroCopyBuf(ioBuf_.get());
+ }
+ } else {
+ // this happens if at least one of the prev requests were sent
+ // with zero copy but not the last one
+ if (isComplete() && socket_->getZeroCopy() &&
+ socket_->containsZeroCopyBuf(ioBuf_.get())) {
+ socket_->setZeroCopyBuf(std::move(ioBuf_));
+ }
+ }
+ }
return writeResult;
}
opIndex_ += opsWritten_;
assert(opIndex_ < opCount_);
- // If we've finished writing any IOBufs, release them
- if (ioBuf_) {
- for (uint32_t i = opsWritten_; i != 0; --i) {
- assert(ioBuf_);
- ioBuf_ = ioBuf_->pop();
+ if (!socket_->isZeroCopyRequest(flags_)) {
+ // If we've finished writing any IOBufs, release them
+ if (ioBuf_) {
+ for (uint32_t i = opsWritten_; i != 0; --i) {
+ assert(ioBuf_);
+ ioBuf_ = ioBuf_->pop();
+ }
}
}
struct iovec writeOps_[]; ///< write operation(s) list
};
-int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(folly::WriteFlags flags)
- noexcept {
+int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(
+ folly::WriteFlags flags,
+ bool zeroCopyEnabled) noexcept {
int msg_flags = MSG_DONTWAIT;
#ifdef MSG_NOSIGNAL // Linux-only
msg_flags |= MSG_EOR;
}
+ if (zeroCopyEnabled && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY)) {
+ msg_flags |= MSG_ZEROCOPY;
+ }
+
return msg_flags;
}
namespace {
static AsyncSocket::SendMsgParamsCallback defaultSendMsgParamsCallback;
-}
+} // namespace
AsyncSocket::AsyncSocket()
: eventBase_(nullptr),
connect(nullptr, ip, port, connectTimeout);
}
-AsyncSocket::AsyncSocket(EventBase* evb, int fd)
- : eventBase_(evb),
+AsyncSocket::AsyncSocket(EventBase* evb, int fd, uint32_t zeroCopyBufId)
+ : zeroCopyBufId_(zeroCopyBufId),
+ eventBase_(evb),
writeTimeout_(this, evb),
ioHandler_(this, evb, fd),
immediateReadHandler_(this) {
- VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
- << fd << ")";
+ VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd=" << fd
+ << ", zeroCopyBufId=" << zeroCopyBufId << ")";
init();
fd_ = fd;
setCloseOnExec();
}
AsyncSocket::AsyncSocket(AsyncSocket::UniquePtr oldAsyncSocket)
- : AsyncSocket(oldAsyncSocket->getEventBase(), oldAsyncSocket->detachFd()) {
+ : AsyncSocket(
+ oldAsyncSocket->getEventBase(),
+ oldAsyncSocket->detachFd(),
+ oldAsyncSocket->getZeroCopyBufId()) {
preReceivedData_ = std::move(oldAsyncSocket->preReceivedData_);
}
readCallback_ = nullptr;
writeReqHead_ = nullptr;
writeReqTail_ = nullptr;
- shutdownSocketSet_ = nullptr;
+ wShutdownSocketSet_.reset();
appBytesWritten_ = 0;
appBytesReceived_ = 0;
sendMsgParamCallback_ = &defaultSendMsgParamsCallback;
<< ", events=" << std::hex << eventFlags_ << ")";
// Extract the fd, and set fd_ to -1 first, so closeNow() won't
// actually close the descriptor.
- if (shutdownSocketSet_) {
- shutdownSocketSet_->remove(fd_);
+ if (const auto socketSet = wShutdownSocketSet_.lock()) {
+ socketSet->remove(fd_);
}
int fd = fd_;
fd_ = -1;
return anyAddress;
}
-void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
- if (shutdownSocketSet_ == newSS) {
+void AsyncSocket::setShutdownSocketSet(
+ const std::weak_ptr<ShutdownSocketSet>& wNewSS) {
+ const auto newSS = wNewSS.lock();
+ const auto shutdownSocketSet = wShutdownSocketSet_.lock();
+
+ if (newSS == shutdownSocketSet) {
return;
}
- if (shutdownSocketSet_ && fd_ != -1) {
- shutdownSocketSet_->remove(fd_);
+
+ if (shutdownSocketSet && fd_ != -1) {
+ shutdownSocketSet->remove(fd_);
}
- shutdownSocketSet_ = newSS;
- if (shutdownSocketSet_ && fd_ != -1) {
- shutdownSocketSet_->add(fd_);
+
+ if (newSS && fd_ != -1) {
+ newSS->add(fd_);
}
+
+ wShutdownSocketSet_ = wNewSS;
}
void AsyncSocket::setCloseOnExec() {
withAddr("failed to create socket"),
errnoCopy);
}
- if (shutdownSocketSet_) {
- shutdownSocketSet_->add(fd_);
+ if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+ shutdownSocketSet->add(fd_);
}
ioHandler_.changeHandlerFD(fd_);
// By default, turn on TCP_NODELAY
// If setNoDelay() fails, we continue anyway; this isn't a fatal error.
// setNoDelay() will log an error message if it fails.
+ // Also set the cached zeroCopyVal_ since it cannot be set earlier if the fd
+ // is not created
if (address.getFamily() != AF_UNIX) {
(void)setNoDelay(true);
+ setZeroCopy(zeroCopyVal_);
}
VLOG(5) << "AsyncSocket::connect(this=" << this << ", evb=" << eventBase_
<< ", fd=" << fd_ << ", callback=" << callback
<< ", state=" << state_;
+ // In the latest stable kernel 4.14.3 as of 2017-12-04, unix domain
+ // socket does not support MSG_ERRQUEUE. So recvmsg(MSG_ERRQUEUE)
+ // will read application data from unix doamin socket as error
+ // message, which breaks the message flow in application. Feel free
+ // to remove the next code block if MSG_ERRQUEUE is added for unix
+ // domain socket in the future.
+ if (callback != nullptr) {
+ cacheLocalAddress();
+ if (localAddr_.getFamily() == AF_UNIX) {
+ LOG(ERROR) << "Failed to set ErrMessageCallback=" << callback
+ << " for Unix Doamin Socket where MSG_ERRQUEUE is unsupported,"
+ << " fd=" << fd_;
+ return;
+ }
+ }
+
// Short circuit if callback is the same as the existing errMessageCallback_.
if (callback == errMessageCallback_) {
return;
return readCallback_;
}
+bool AsyncSocket::setZeroCopy(bool enable) {
+ if (msgErrQueueSupported) {
+ zeroCopyVal_ = enable;
+
+ if (fd_ < 0) {
+ return false;
+ }
+
+ int val = enable ? 1 : 0;
+ int ret = setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, sizeof(val));
+
+ // if enable == false, set zeroCopyEnabled_ = false regardless
+ // if SO_ZEROCOPY is set or not
+ if (!enable) {
+ zeroCopyEnabled_ = enable;
+ return true;
+ }
+
+ /* if the setsockopt failed, try to see if the socket inherited the flag
+ * since we cannot set SO_ZEROCOPY on a socket s = accept
+ */
+ if (ret) {
+ val = 0;
+ socklen_t optlen = sizeof(val);
+ ret = getsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &val, &optlen);
+
+ if (!ret) {
+ enable = val ? true : false;
+ }
+ }
+
+ if (!ret) {
+ zeroCopyEnabled_ = enable;
+
+ return true;
+ }
+ }
+
+ return false;
+}
+
+bool AsyncSocket::isZeroCopyRequest(WriteFlags flags) {
+ return (zeroCopyEnabled_ && isSet(flags, WriteFlags::WRITE_MSG_ZEROCOPY));
+}
+
+void AsyncSocket::adjustZeroCopyFlags(folly::WriteFlags& flags) {
+ if (!zeroCopyEnabled_) {
+ flags = unSet(flags, folly::WriteFlags::WRITE_MSG_ZEROCOPY);
+ }
+}
+
+void AsyncSocket::addZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
+ uint32_t id = getNextZeroCopyBufId();
+ folly::IOBuf* ptr = buf.get();
+
+ idZeroCopyBufPtrMap_[id] = ptr;
+ auto& p = idZeroCopyBufInfoMap_[ptr];
+ p.count_++;
+ CHECK(p.buf_.get() == nullptr);
+ p.buf_ = std::move(buf);
+}
+
+void AsyncSocket::addZeroCopyBuf(folly::IOBuf* ptr) {
+ uint32_t id = getNextZeroCopyBufId();
+ idZeroCopyBufPtrMap_[id] = ptr;
+
+ idZeroCopyBufInfoMap_[ptr].count_++;
+}
+
+void AsyncSocket::releaseZeroCopyBuf(uint32_t id) {
+ auto iter = idZeroCopyBufPtrMap_.find(id);
+ CHECK(iter != idZeroCopyBufPtrMap_.end());
+ auto ptr = iter->second;
+ auto iter1 = idZeroCopyBufInfoMap_.find(ptr);
+ CHECK(iter1 != idZeroCopyBufInfoMap_.end());
+ if (0 == --iter1->second.count_) {
+ idZeroCopyBufInfoMap_.erase(iter1);
+ }
+
+ idZeroCopyBufPtrMap_.erase(iter);
+}
+
+void AsyncSocket::setZeroCopyBuf(std::unique_ptr<folly::IOBuf>&& buf) {
+ folly::IOBuf* ptr = buf.get();
+ auto& p = idZeroCopyBufInfoMap_[ptr];
+ CHECK(p.buf_.get() == nullptr);
+
+ p.buf_ = std::move(buf);
+}
+
+bool AsyncSocket::containsZeroCopyBuf(folly::IOBuf* ptr) {
+ return (idZeroCopyBufInfoMap_.find(ptr) != idZeroCopyBufInfoMap_.end());
+}
+
+bool AsyncSocket::isZeroCopyMsg(const cmsghdr& cmsg) const {
+#ifdef FOLLY_HAVE_MSG_ERRQUEUE
+ if (zeroCopyEnabled_ &&
+ ((cmsg.cmsg_level == SOL_IP && cmsg.cmsg_type == IP_RECVERR) ||
+ (cmsg.cmsg_level == SOL_IPV6 && cmsg.cmsg_type == IPV6_RECVERR))) {
+ const struct sock_extended_err* serr =
+ reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
+ return (
+ (serr->ee_errno == 0) && (serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY));
+ }
+#endif
+ return false;
+}
+
+void AsyncSocket::processZeroCopyMsg(const cmsghdr& cmsg) {
+#ifdef FOLLY_HAVE_MSG_ERRQUEUE
+ const struct sock_extended_err* serr =
+ reinterpret_cast<const struct sock_extended_err*>(CMSG_DATA(&cmsg));
+ uint32_t hi = serr->ee_data;
+ uint32_t lo = serr->ee_info;
+ // disable zero copy if the buffer was actually copied
+ if ((serr->ee_code & SO_EE_CODE_ZEROCOPY_COPIED) && zeroCopyEnabled_) {
+ VLOG(2) << "AsyncSocket::processZeroCopyMsg(): setting "
+ << "zeroCopyEnabled_ = false due to SO_EE_CODE_ZEROCOPY_COPIED "
+ << "on " << fd_;
+ zeroCopyEnabled_ = false;
+ }
+
+ for (uint32_t i = lo; i <= hi; i++) {
+ releaseZeroCopyBuf(i);
+ }
+#endif
+}
+
void AsyncSocket::write(WriteCallback* callback,
const void* buf, size_t bytes, WriteFlags flags) {
iovec op;
void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
WriteFlags flags) {
+ adjustZeroCopyFlags(flags);
+
constexpr size_t kSmallSizeMax = 64;
size_t count = buf->countChainElements();
if (count <= kSmallSizeMax) {
errnoCopy);
return failWrite(__func__, callback, 0, ex);
} else if (countWritten == count) {
+ // done, add the whole buffer
+ if (countWritten && isZeroCopyRequest(flags)) {
+ addZeroCopyBuf(std::move(ioBuf));
+ }
// We successfully wrote everything.
// Invoke the callback and return.
if (callback) {
}
return;
} else { // continue writing the next writeReq
+ // add just the ptr
+ if (bytesWritten && isZeroCopyRequest(flags)) {
+ addZeroCopyBuf(ioBuf.get());
+ }
if (bufferCallback_) {
bufferCallback_->onEgressBuffered();
}
eventBase_ = eventBase;
ioHandler_.attachEventBase(eventBase);
+
+ updateEventRegistration();
+
writeTimeout_.attachEventBase(eventBase);
if (evbChangeCb_) {
evbChangeCb_->evbAttached(this);
eventBase_->dcheckIsInEventBaseThread();
eventBase_ = nullptr;
+
+ ioHandler_.unregisterHandler();
+
ioHandler_.detachEventBase();
writeTimeout_.detachEventBase();
if (evbChangeCb_) {
DCHECK(eventBase_ != nullptr);
eventBase_->dcheckIsInEventBaseThread();
- return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
+ return !writeTimeout_.isScheduled();
}
void AsyncSocket::cacheAddresses() {
}
}
+bool AsyncSocket::isZeroCopyWriteInProgress() const noexcept {
+ eventBase_->dcheckIsInEventBaseThread();
+ return (!idZeroCopyBufPtrMap_.empty());
+}
+
void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
cacheLocalAddress();
*address = localAddr_;
// EventHandler::WRITE is set. Any of these flags can
// indicate that there are messages available in the socket
// error message queue.
- handleErrMessages();
+ // Return if we handle any error messages - this is to avoid
+ // unnecessary read/write calls
+ if (handleErrMessages()) {
+ return;
+ }
// Return now if handleErrMessages() detached us from our EventBase
if (eventBase_ != originalEventBase) {
readCallback_->getReadBuffer(buf, buflen);
}
-void AsyncSocket::handleErrMessages() noexcept {
+size_t AsyncSocket::handleErrMessages() noexcept {
// This method has non-empty implementation only for platforms
// supporting per-socket error queues.
VLOG(5) << "AsyncSocket::handleErrMessages() this=" << this << ", fd=" << fd_
<< ", state=" << state_;
- if (errMessageCallback_ == nullptr) {
+ if (errMessageCallback_ == nullptr && idZeroCopyBufPtrMap_.empty()) {
VLOG(7) << "AsyncSocket::handleErrMessages(): "
<< "no callback installed - exiting.";
- return;
+ return 0;
}
-#ifdef MSG_ERRQUEUE
+#ifdef FOLLY_HAVE_MSG_ERRQUEUE
uint8_t ctrl[1024];
unsigned char data;
struct msghdr msg;
msg.msg_flags = 0;
int ret;
+ size_t num = 0;
while (true) {
ret = recvmsg(fd_, &msg, MSG_ERRQUEUE);
VLOG(5) << "AsyncSocket::handleErrMessages(): recvmsg returned " << ret;
errnoCopy);
failErrMessageRead(__func__, ex);
}
- return;
+
+ return num;
}
for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
- cmsg != nullptr &&
- cmsg->cmsg_len != 0 &&
- errMessageCallback_ != nullptr;
+ cmsg != nullptr && cmsg->cmsg_len != 0;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
- errMessageCallback_->errMessage(*cmsg);
+ ++num;
+ if (isZeroCopyMsg(*cmsg)) {
+ processZeroCopyMsg(*cmsg);
+ } else {
+ if (errMessageCallback_) {
+ errMessageCallback_->errMessage(*cmsg);
+ }
+ }
}
}
-#endif //MSG_ERRQUEUE
+#else
+ return 0;
+#endif // FOLLY_HAVE_MSG_ERRQUEUE
+}
+
+bool AsyncSocket::processZeroCopyWriteInProgress() noexcept {
+ eventBase_->dcheckIsInEventBaseThread();
+ if (idZeroCopyBufPtrMap_.empty()) {
+ return true;
+ }
+
+ handleErrMessages();
+
+ return idZeroCopyBufPtrMap_.empty();
}
void AsyncSocket::handleRead() noexcept {
} else {
msg.msg_control = nullptr;
}
- int msg_flags = sendMsgParamCallback_->getFlags(flags);
+ int msg_flags = sendMsgParamCallback_->getFlags(flags, zeroCopyEnabled_);
auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
auto totalWritten = writeResult.writeReturn;
// this bug is fixed.
tryAgain |= (errno == ENOTCONN);
#endif
+
+ // workaround for running with zerocopy enabled but without a proper
+ // memlock value - see ulimit -l
+ if (zeroCopyEnabled_ && (errno == ENOBUFS)) {
+ tryAgain = true;
+ zeroCopyEnabled_ = false;
+ }
+
if (!writeResult.exception && tryAgain) {
// TCP buffer is full; we can't write any more data right now.
*countWritten = 0;
}
void AsyncSocket::doClose() {
- if (fd_ == -1) return;
- if (shutdownSocketSet_) {
- shutdownSocketSet_->close(fd_);
+ if (fd_ == -1) {
+ return;
+ }
+ if (const auto shutdownSocketSet = wShutdownSocketSet_.lock()) {
+ shutdownSocketSet->close(fd_);
} else {
::close(fd_);
}
bufferCallback_ = cb;
}
-} // folly
+} // namespace folly