#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
+#include <boost/preprocessor/control/if.hpp>
using std::string;
using std::unique_ptr;
}
// private destructor, to ensure callers use destroy()
- virtual ~BytesWriteRequest() {}
+ ~BytesWriteRequest() override = default;
const struct iovec* getOps() const {
assert(opCount_ > opIndex_);
return invalidState(callback);
}
+ connectStartTime_ = std::chrono::steady_clock::now();
+ // Make connect end time at least >= connectStartTime.
+ connectEndTime_ = connectStartTime_;
+
assert(fd_ == -1);
state_ = StateEnum::CONNECTING;
connectCallback_ = callback;
assert(readCallback_ == nullptr);
assert(writeReqHead_ == nullptr);
state_ = StateEnum::ESTABLISHED;
- if (callback) {
- connectCallback_ = nullptr;
- callback->connectSuccess();
- }
+ invokeConnectSuccess();
}
void AsyncSocket::connect(ConnectCallback* callback,
return;
}
+ /* We are removing a read callback */
+ if (callback == nullptr &&
+ immediateReadHandler_.isLoopCallbackScheduled()) {
+ immediateReadHandler_.cancelLoopCallback();
+ }
+
if (shutdownFlags_ & SHUT_READ) {
// Reads have already been shut down on this socket.
//
iovec op;
op.iov_base = const_cast<void*>(buf);
op.iov_len = bytes;
- writeImpl(callback, &op, 1, std::move(unique_ptr<IOBuf>()), flags);
+ writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
}
void AsyncSocket::writev(WriteCallback* callback,
const iovec* vec,
size_t count,
WriteFlags flags) {
- writeImpl(callback, vec, count, std::move(unique_ptr<IOBuf>()), flags);
+ writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
}
void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
WriteFlags flags) {
+ constexpr size_t kSmallSizeMax = 64;
size_t count = buf->countChainElements();
- if (count <= 64) {
- iovec vec[count];
+ if (count <= kSmallSizeMax) {
+ iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
writeChainImpl(callback, vec, count, std::move(buf), flags);
} else {
iovec* vec = new iovec[count];
callback->writeSuccess();
}
return;
- } // else { continue writing the next writeReq }
+ } else { // continue writing the next writeReq
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
+ }
+ }
mustRegister = true;
}
} else if (!connecting()) {
doClose();
}
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(socketClosedLocallyEx);
- }
+ invokeConnectErr(socketClosedLocallyEx);
failAllWrites(socketClosedLocallyEx);
}
void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
- address->setFromLocalAddress(fd_);
+ if (!localAddr_.isInitialized()) {
+ localAddr_.setFromLocalAddress(fd_);
+ }
+ *address = localAddr_;
}
void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
return 0;
}
+void AsyncSocket::setPersistentCork(bool cork) {
+ if (setCork(cork) == 0) {
+ persistentCork_ = cork;
+ }
+}
+
+int AsyncSocket::setCork(bool cork) {
+#ifdef TCP_CORK
+ if (fd_ < 0) {
+ VLOG(4) << "AsyncSocket::setCork() called on non-open socket "
+ << this << "(stats=" << state_ << ")";
+ return EINVAL;
+ }
+
+ if (corked_ == cork) {
+ return 0;
+ }
+
+ int flag = cork ? 1 : 0;
+ if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) {
+ int errnoCopy = errno;
+ VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket"
+ << this << "(fd=" << fd_ << ", state=" << state_ << "):"
+ << folly::errnoStr(errnoCopy);
+ return errnoCopy;
+ }
+ corked_ = cork;
+#endif
+ return 0;
+}
+
void AsyncSocket::ioReady(uint16_t events) noexcept {
VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
<< ", events=" << std::hex << events << ", state=" << state_;
}
}
-ssize_t AsyncSocket::performRead(void* buf, size_t buflen) {
- ssize_t bytes = recv(fd_, buf, buflen, MSG_DONTWAIT);
+ssize_t AsyncSocket::performRead(void** buf,
+ size_t* buflen,
+ size_t* /* offset */) {
+ VLOG(5) << "AsyncSocket::performRead() this=" << this
+ << ", buf=" << *buf << ", buflen=" << *buflen;
+
+ int recvFlags = 0;
+ if (peek_) {
+ recvFlags |= MSG_PEEK;
+ }
+
+ ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// No more data to read right now.
}
}
+void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
+ // no matter what, buffer should be preapared for non-ssl socket
+ CHECK(readCallback_);
+ readCallback_->getReadBuffer(buf, buflen);
+}
+
void AsyncSocket::handleRead() noexcept {
VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
<< ", state=" << state_;
while (readCallback_ && eventBase_ == originalEventBase) {
// Get the buffer to read into.
void* buf = nullptr;
- size_t buflen = 0;
+ size_t buflen = 0, offset = 0;
try {
- readCallback_->getReadBuffer(&buf, &buflen);
+ prepareReadBuffer(&buf, &buflen);
+ VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
} catch (const AsyncSocketException& ex) {
return failRead(__func__, ex);
} catch (const std::exception& ex) {
"non-exception type");
return failRead(__func__, ex);
}
- if (buf == nullptr || buflen == 0) {
+ if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
"ReadCallback::getReadBuffer() returned "
"empty buffer");
}
// Perform the read
- ssize_t bytesRead = performRead(buf, buflen);
+ ssize_t bytesRead = performRead(&buf, &buflen, &offset);
+ VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
+ << bytesRead << " bytes";
if (bytesRead > 0) {
- readCallback_->readDataAvailable(bytesRead);
+ if (!isBufferMovable_) {
+ readCallback_->readDataAvailable(bytesRead);
+ } else {
+ CHECK(kOpenSslModeMoveBufferOwnership);
+ VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
+ << "buf=" << buf << ", " << bytesRead << "/" << buflen
+ << ", offset=" << offset;
+ auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
+ readBuf->trimStart(offset);
+ readBuf->trimEnd(buflen - offset - bytesRead);
+ readCallback_->readBufferAvailable(std::move(readBuf));
+ }
+
// Fall through and continue around the loop if the read
// completely filled the available buffer.
// Note that readCallback_ may have been uninstalled or changed inside
// No more data to read right now.
return;
} else if (bytesRead == READ_ERROR) {
+ readErr_ = READ_ERROR;
AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
withAddr("recv() failed"), errno);
return failRead(__func__, ex);
} else {
assert(bytesRead == READ_EOF);
+ readErr_ = READ_EOF;
// EOF
shutdownFlags_ |= SHUT_READ;
if (!updateEventRegistration(0, EventHandler::READ)) {
return;
}
if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
- // We might still have data in the socket.
- // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
- scheduleImmediateRead();
+ if (readCallback_ != nullptr) {
+ // We might still have data in the socket.
+ // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
+ scheduleImmediateRead();
+ }
return;
}
}
// We'll continue around the loop, trying to write another request
} else {
// Partial write.
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
+ }
writeReqHead_->consume();
// Stop after a partial write; it's highly likely that a subsequent write
// attempt will just return EAGAIN.
return;
}
}
+ if (!writeReqHead_ && bufferCallback_) {
+ bufferCallback_->onEgressBufferCleared();
+ }
}
void AsyncSocket::checkForImmediateRead() noexcept {
// callbacks (since the callbacks may call detachEventBase()).
EventBase* originalEventBase = eventBase_;
- // Call the connect callback.
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectSuccess();
- }
-
+ invokeConnectSuccess();
// Note that the connect callback may have changed our state.
// (set or unset the read callback, called write(), closed the socket, etc.)
// The following code needs to handle these situations correctly.
AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
withAddr("socket closing after error"));
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(ex);
- }
-
+ invokeConnectErr(ex);
failAllWrites(ex);
if (readCallback_) {
<< ex.what();
startFail();
- if (connectCallback_ != nullptr) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(ex);
- }
-
+ invokeConnectErr(ex);
finishFail();
}
AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
"connect() called with socket in invalid state");
+ connectEndTime_ = std::chrono::steady_clock::now();
if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
if (callback) {
callback->connectErr(ex);
}
}
+void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
+ connectEndTime_ = std::chrono::steady_clock::now();
+ if (connectCallback_) {
+ ConnectCallback* callback = connectCallback_;
+ connectCallback_ = nullptr;
+ callback->connectErr(ex);
+ }
+}
+
+void AsyncSocket::invokeConnectSuccess() {
+ connectEndTime_ = std::chrono::steady_clock::now();
+ if (connectCallback_) {
+ ConnectCallback* callback = connectCallback_;
+ connectCallback_ = nullptr;
+ callback->connectSuccess();
+ }
+}
+
void AsyncSocket::invalidState(ReadCallback* callback) {
VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
<< "): setReadCallback(" << callback
return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
}
+void AsyncSocket::setBufferCallback(BufferCallback* cb) {
+ bufferCallback_ = cb;
+}
+
} // folly