#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
+#include <boost/preprocessor/control/if.hpp>
using std::string;
using std::unique_ptr;
return invalidState(callback);
}
+ connectStartTime_ = std::chrono::steady_clock::now();
+ // Make connect end time at least >= connectStartTime.
+ connectEndTime_ = connectStartTime_;
+
assert(fd_ == -1);
state_ = StateEnum::CONNECTING;
connectCallback_ = callback;
assert(readCallback_ == nullptr);
assert(writeReqHead_ == nullptr);
state_ = StateEnum::ESTABLISHED;
- if (callback) {
- connectCallback_ = nullptr;
- callback->connectSuccess();
- }
+ invokeConnectSuccess();
}
void AsyncSocket::connect(ConnectCallback* callback,
iovec op;
op.iov_base = const_cast<void*>(buf);
op.iov_len = bytes;
- writeImpl(callback, &op, 1, std::move(unique_ptr<IOBuf>()), flags);
+ writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
}
void AsyncSocket::writev(WriteCallback* callback,
const iovec* vec,
size_t count,
WriteFlags flags) {
- writeImpl(callback, vec, count, std::move(unique_ptr<IOBuf>()), flags);
+ writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
}
void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
WriteFlags flags) {
+ constexpr size_t kSmallSizeMax = 64;
size_t count = buf->countChainElements();
- if (count <= 64) {
- iovec vec[count];
+ if (count <= kSmallSizeMax) {
+ iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
writeChainImpl(callback, vec, count, std::move(buf), flags);
} else {
iovec* vec = new iovec[count];
callback->writeSuccess();
}
return;
- } // else { continue writing the next writeReq }
+ } else { // continue writing the next writeReq
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
+ }
+ }
mustRegister = true;
}
} else if (!connecting()) {
doClose();
}
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(socketClosedLocallyEx);
- }
+ invokeConnectErr(socketClosedLocallyEx);
failAllWrites(socketClosedLocallyEx);
}
void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
- address->setFromLocalAddress(fd_);
+ if (!localAddr_.isInitialized()) {
+ localAddr_.setFromLocalAddress(fd_);
+ }
+ *address = localAddr_;
}
void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
return 0;
}
+void AsyncSocket::setPersistentCork(bool cork) {
+ if (setCork(cork) == 0) {
+ persistentCork_ = cork;
+ }
+}
+
+int AsyncSocket::setCork(bool cork) {
+#ifdef TCP_CORK
+ if (fd_ < 0) {
+ VLOG(4) << "AsyncSocket::setCork() called on non-open socket "
+ << this << "(stats=" << state_ << ")";
+ return EINVAL;
+ }
+
+ if (corked_ == cork) {
+ return 0;
+ }
+
+ int flag = cork ? 1 : 0;
+ if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) {
+ int errnoCopy = errno;
+ VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket"
+ << this << "(fd=" << fd_ << ", state=" << state_ << "):"
+ << folly::errnoStr(errnoCopy);
+ return errnoCopy;
+ }
+ corked_ = cork;
+#endif
+ return 0;
+}
+
void AsyncSocket::ioReady(uint16_t events) noexcept {
VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
<< ", events=" << std::hex << events << ", state=" << state_;
}
}
-ssize_t AsyncSocket::performRead(void** buf, size_t* buflen, size_t* offset) {
+ssize_t AsyncSocket::performRead(void** buf,
+ size_t* buflen,
+ size_t* /* offset */) {
VLOG(5) << "AsyncSocket::performRead() this=" << this
<< ", buf=" << *buf << ", buflen=" << *buflen;
- ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT);
+ int recvFlags = 0;
+ if (peek_) {
+ recvFlags |= MSG_PEEK;
+ }
+
+ ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// No more data to read right now.
// No more data to read right now.
return;
} else if (bytesRead == READ_ERROR) {
+ readErr_ = READ_ERROR;
AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
withAddr("recv() failed"), errno);
return failRead(__func__, ex);
} else {
assert(bytesRead == READ_EOF);
+ readErr_ = READ_EOF;
// EOF
shutdownFlags_ |= SHUT_READ;
if (!updateEventRegistration(0, EventHandler::READ)) {
// We'll continue around the loop, trying to write another request
} else {
// Partial write.
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
+ }
writeReqHead_->consume();
// Stop after a partial write; it's highly likely that a subsequent write
// attempt will just return EAGAIN.
return;
}
}
+ if (!writeReqHead_ && bufferCallback_) {
+ bufferCallback_->onEgressBufferCleared();
+ }
}
void AsyncSocket::checkForImmediateRead() noexcept {
// callbacks (since the callbacks may call detachEventBase()).
EventBase* originalEventBase = eventBase_;
- // Call the connect callback.
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectSuccess();
- }
-
+ invokeConnectSuccess();
// Note that the connect callback may have changed our state.
// (set or unset the read callback, called write(), closed the socket, etc.)
// The following code needs to handle these situations correctly.
AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
withAddr("socket closing after error"));
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(ex);
- }
-
+ invokeConnectErr(ex);
failAllWrites(ex);
if (readCallback_) {
<< ex.what();
startFail();
- if (connectCallback_ != nullptr) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(ex);
- }
-
+ invokeConnectErr(ex);
finishFail();
}
AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
"connect() called with socket in invalid state");
+ connectEndTime_ = std::chrono::steady_clock::now();
if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
if (callback) {
callback->connectErr(ex);
}
}
+void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
+ connectEndTime_ = std::chrono::steady_clock::now();
+ if (connectCallback_) {
+ ConnectCallback* callback = connectCallback_;
+ connectCallback_ = nullptr;
+ callback->connectErr(ex);
+ }
+}
+
+void AsyncSocket::invokeConnectSuccess() {
+ connectEndTime_ = std::chrono::steady_clock::now();
+ if (connectCallback_) {
+ ConnectCallback* callback = connectCallback_;
+ connectCallback_ = nullptr;
+ callback->connectSuccess();
+ }
+}
+
void AsyncSocket::invalidState(ReadCallback* callback) {
VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
<< "): setReadCallback(" << callback
return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
}
+void AsyncSocket::setBufferCallback(BufferCallback* cb) {
+ bufferCallback_ = cb;
+}
+
} // folly