/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
WriteResult performWrite() override {
WriteFlags writeFlags = flags_;
if (getNext() != nullptr) {
- writeFlags = writeFlags | WriteFlags::CORK;
+ writeFlags |= WriteFlags::CORK;
}
- return socket_->performWrite(
+ auto writeResult = socket_->performWrite(
getOps(), getOpCount(), writeFlags, &opsWritten_, &partialBytes_);
+ bytesWritten_ = writeResult.writeReturn > 0 ? writeResult.writeReturn : 0;
+ return writeResult;
}
bool isComplete() override {
currentOp->iov_len -= partialBytes_;
// Increment the totalBytesWritten_ count by bytesWritten_;
- totalBytesWritten_ += bytesWritten_;
+ assert(bytesWritten_ >= 0);
+ totalBytesWritten_ += uint32_t(bytesWritten_);
}
private:
};
AsyncSocket::AsyncSocket()
- : eventBase_(nullptr)
- , writeTimeout_(this, nullptr)
- , ioHandler_(this, nullptr)
- , immediateReadHandler_(this) {
+ : eventBase_(nullptr),
+ writeTimeout_(this, nullptr),
+ ioHandler_(this, nullptr),
+ immediateReadHandler_(this) {
VLOG(5) << "new AsyncSocket()";
init();
}
AsyncSocket::AsyncSocket(EventBase* evb)
- : eventBase_(evb)
- , writeTimeout_(this, evb)
- , ioHandler_(this, evb)
- , immediateReadHandler_(this) {
+ : eventBase_(evb),
+ writeTimeout_(this, evb),
+ ioHandler_(this, evb),
+ immediateReadHandler_(this) {
VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
init();
}
}
AsyncSocket::AsyncSocket(EventBase* evb, int fd)
- : eventBase_(evb)
- , writeTimeout_(this, evb)
- , ioHandler_(this, evb, fd)
- , immediateReadHandler_(this) {
+ : eventBase_(evb),
+ writeTimeout_(this, evb),
+ ioHandler_(this, evb, fd),
+ immediateReadHandler_(this) {
VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
<< fd << ")";
init();
// Apply the additional options if any.
for (const auto& opt: options) {
- int rv = opt.first.apply(fd_, opt.second);
+ rv = opt.first.apply(fd_, opt.second);
if (rv != 0) {
auto errnoCopy = errno;
throw AsyncSocketException(
}
int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {
- int rv = ::connect(fd_, saddr, len);
+#if __linux__
+ if (noTransparentTls_) {
+ // Ignore return value, errors are ok
+ setsockopt(fd_, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
+ }
+#endif
+ int rv = fsp::connect(fd_, saddr, len);
if (rv < 0) {
auto errnoCopy = errno;
if (errnoCopy == EINPROGRESS) {
- scheduleConnectTimeoutAndRegisterForEvents();
+ scheduleConnectTimeout();
+ registerForConnectEvents();
} else {
throw AsyncSocketException(
AsyncSocketException::NOT_OPEN,
return rv;
}
-void AsyncSocket::scheduleConnectTimeoutAndRegisterForEvents() {
+void AsyncSocket::scheduleConnectTimeout() {
// Connection in progress.
- int timeout = connectTimeout_.count();
+ auto timeout = connectTimeout_.count();
if (timeout > 0) {
// Start a timer in case the connection takes too long.
- if (!writeTimeout_.scheduleTimeout(timeout)) {
+ if (!writeTimeout_.scheduleTimeout(uint32_t(timeout))) {
throw AsyncSocketException(
AsyncSocketException::INTERNAL_ERROR,
withAddr("failed to schedule AsyncSocket connect timeout"));
}
}
+}
+void AsyncSocket::registerForConnectEvents() {
// Register for write events, so we'll
// be notified when the connection finishes/fails.
// Note that we don't register for a persistent event here.
uint32_t countWritten = 0;
uint32_t partialWritten = 0;
- int bytesWritten = 0;
+ ssize_t bytesWritten = 0;
bool mustRegister = false;
if ((state_ == StateEnum::ESTABLISHED || state_ == StateEnum::FAST_OPEN) &&
!connecting()) {
assert(writeReqTail_ == nullptr);
assert((eventFlags_ & EventHandler::WRITE) == 0);
- auto writeResult =
- performWrite(vec, count, flags, &countWritten, &partialWritten);
+ auto writeResult = performWrite(
+ vec, uint32_t(count), flags, &countWritten, &partialWritten);
bytesWritten = writeResult.writeReturn;
if (bytesWritten < 0) {
auto errnoCopy = errno;
// Create a new WriteRequest to add to the queue
WriteRequest* req;
try {
- req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
- count - countWritten, partialWritten,
- bytesWritten, std::move(ioBuf), flags);
+ req = BytesWriteRequest::newRequest(
+ this,
+ callback,
+ vec + countWritten,
+ uint32_t(count - countWritten),
+ partialWritten,
+ uint32_t(bytesWritten),
+ std::move(ioBuf),
+ flags);
} catch (const std::exception& ex) {
// we mainly expect to catch std::bad_alloc here
AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
withAddr(string("failed to append new WriteRequest: ") + ex.what()));
- return failWrite(__func__, callback, bytesWritten, tex);
+ return failWrite(__func__, callback, size_t(bytesWritten), tex);
}
req->consume();
if (writeReqTail_ == nullptr) {
eventBase_ = eventBase;
ioHandler_.attachEventBase(eventBase);
writeTimeout_.attachEventBase(eventBase);
+ if (evbChangeCb_) {
+ evbChangeCb_->evbAttached(this);
+ }
}
void AsyncSocket::detachEventBase() {
eventBase_ = nullptr;
ioHandler_.detachEventBase();
writeTimeout_.detachEventBase();
+ if (evbChangeCb_) {
+ evbChangeCb_->evbDetached(this);
+ }
}
bool AsyncSocket::isDetachable() const {
*address = addr_;
}
+bool AsyncSocket::getTFOSucceded() const {
+ return detail::tfo_succeeded(fd_);
+}
+
int AsyncSocket::setNoDelay(bool noDelay) {
if (fd_ < 0) {
VLOG(4) << "AsyncSocket::setNoDelay() called on non-open socket "
}
- if (setsockopt(fd_, IPPROTO_TCP, TCP_CONGESTION, cname.c_str(),
- cname.length() + 1) != 0) {
+ if (setsockopt(
+ fd_,
+ IPPROTO_TCP,
+ TCP_CONGESTION,
+ cname.c_str(),
+ socklen_t(cname.length() + 1)) != 0) {
int errnoCopy = errno;
VLOG(2) << "failed to update TCP_CONGESTION option on AsyncSocket "
<< this << "(fd=" << fd_ << ", state=" << state_ << "): "
}
}
-void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
+void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) {
// no matter what, buffer should be preapared for non-ssl socket
CHECK(readCallback_);
readCallback_->getReadBuffer(buf, buflen);
// one here just to make sure, in case one of our calling code paths ever
// changes.
DestructorGuard dg(this);
-
// If we have a readCallback_, make sure we enable read events. We
// may already be registered for reads if connectSuccess() set
// the read calback.
if (totalWritten >= 0) {
tfoFinished_ = true;
state_ = StateEnum::ESTABLISHED;
- handleInitialReadWrite();
+ // We schedule this asynchrously so that we don't end up
+ // invoking initial read or write while a write is in progress.
+ scheduleInitialReadWrite();
} else if (errno == EINPROGRESS) {
VLOG(4) << "TFO falling back to connecting";
// A normal sendmsg doesn't return EINPROGRESS, however
// cookie.
state_ = StateEnum::CONNECTING;
try {
- scheduleConnectTimeoutAndRegisterForEvents();
+ scheduleConnectTimeout();
+ registerForConnectEvents();
} catch (const AsyncSocketException& ex) {
return WriteResult(
WRITE_ERROR, folly::make_unique<AsyncSocketException>(ex));
errno = EAGAIN;
totalWritten = -1;
} else if (errno == EOPNOTSUPP) {
- VLOG(4) << "TFO not supported";
// Try falling back to connecting.
+ VLOG(4) << "TFO not supported";
state_ = StateEnum::CONNECTING;
try {
int ret = socketConnect((const sockaddr*)&addr, len);
// connect succeeded immediately
// Treat this like no data was written.
state_ = StateEnum::ESTABLISHED;
- handleInitialReadWrite();
+ scheduleInitialReadWrite();
}
// If there was no exception during connections,
// we would return that no bytes were written.
auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
auto totalWritten = writeResult.writeReturn;
if (totalWritten < 0) {
- if (!writeResult.exception && errno == EAGAIN) {
+ bool tryAgain = (errno == EAGAIN);
+#ifdef __APPLE__
+ // Apple has a bug where doing a second write on a socket which we
+ // have opened with TFO causes an ENOTCONN to be thrown. However the
+ // socket is really connected, so treat ENOTCONN as a EAGAIN until
+ // this bug is fixed.
+ tryAgain |= (errno == ENOTCONN);
+#endif
+ if (!writeResult.exception && tryAgain) {
// TCP buffer is full; we can't write any more data right now.
*countWritten = 0;
*partialWritten = 0;
uint32_t bytesWritten;
uint32_t n;
- for (bytesWritten = totalWritten, n = 0; n < count; ++n) {
+ for (bytesWritten = uint32_t(totalWritten), n = 0; n < count; ++n) {
const iovec* v = vec + n;
if (v->iov_len > bytesWritten) {
// Partial write finished in the middle of this iovec
return WriteResult(totalWritten);
}
- bytesWritten -= v->iov_len;
+ bytesWritten -= uint32_t(v->iov_len);
}
assert(bytesWritten == 0);
}
}
-void AsyncSocket::finishFail() {
- assert(state_ == StateEnum::ERROR);
- assert(getDestructorGuardCount() > 0);
-
- AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
- withAddr("socket closing after error"));
+void AsyncSocket::invokeAllErrors(const AsyncSocketException& ex) {
invokeConnectErr(ex);
failAllWrites(ex);
}
}
+void AsyncSocket::finishFail() {
+ assert(state_ == StateEnum::ERROR);
+ assert(getDestructorGuardCount() > 0);
+
+ AsyncSocketException ex(
+ AsyncSocketException::INTERNAL_ERROR,
+ withAddr("socket closing after error"));
+ invokeAllErrors(ex);
+}
+
+void AsyncSocket::finishFail(const AsyncSocketException& ex) {
+ assert(state_ == StateEnum::ERROR);
+ assert(getDestructorGuardCount() > 0);
+ invokeAllErrors(ex);
+}
+
void AsyncSocket::fail(const char* fn, const AsyncSocketException& ex) {
VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
<< state_ << " host=" << addr_.describe()
startFail();
invokeConnectErr(ex);
- finishFail();
+ finishFail(ex);
}
void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {