return invalidState(callback);
}
+ connectStartTime_ = std::chrono::steady_clock::now();
+ // Make connect end time at least >= connectStartTime.
+ connectEndTime_ = connectStartTime_;
+
assert(fd_ == -1);
state_ = StateEnum::CONNECTING;
connectCallback_ = callback;
assert(readCallback_ == nullptr);
assert(writeReqHead_ == nullptr);
state_ = StateEnum::ESTABLISHED;
- if (callback) {
- connectCallback_ = nullptr;
- callback->connectSuccess();
- }
+ invokeConnectSuccess();
}
void AsyncSocket::connect(ConnectCallback* callback,
callback->writeSuccess();
}
return;
- } // else { continue writing the next writeReq }
+ } else { // continue writing the next writeReq
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
+ }
+ }
mustRegister = true;
}
} else if (!connecting()) {
doClose();
}
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(socketClosedLocallyEx);
- }
+ invokeConnectErr(socketClosedLocallyEx);
failAllWrites(socketClosedLocallyEx);
}
void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
- address->setFromLocalAddress(fd_);
+ if (!localAddr_.isInitialized()) {
+ localAddr_.setFromLocalAddress(fd_);
+ }
+ *address = localAddr_;
}
void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
return 0;
}
+void AsyncSocket::setPersistentCork(bool cork) {
+ if (setCork(cork) == 0) {
+ persistentCork_ = cork;
+ }
+}
+
+int AsyncSocket::setCork(bool cork) {
+#ifdef TCP_CORK
+ if (fd_ < 0) {
+ VLOG(4) << "AsyncSocket::setCork() called on non-open socket "
+ << this << "(stats=" << state_ << ")";
+ return EINVAL;
+ }
+
+ if (corked_ == cork) {
+ return 0;
+ }
+
+ int flag = cork ? 1 : 0;
+ if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) {
+ int errnoCopy = errno;
+ VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket"
+ << this << "(fd=" << fd_ << ", state=" << state_ << "):"
+ << folly::errnoStr(errnoCopy);
+ return errnoCopy;
+ }
+ corked_ = cork;
+#endif
+ return 0;
+}
+
void AsyncSocket::ioReady(uint16_t events) noexcept {
VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
<< ", events=" << std::hex << events << ", state=" << state_;
}
}
-ssize_t AsyncSocket::performRead(void** buf, size_t* buflen, size_t* offset) {
+ssize_t AsyncSocket::performRead(void** buf,
+ size_t* buflen,
+ size_t* /* offset */) {
VLOG(5) << "AsyncSocket::performRead() this=" << this
<< ", buf=" << *buf << ", buflen=" << *buflen;
// We'll continue around the loop, trying to write another request
} else {
// Partial write.
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
+ }
writeReqHead_->consume();
// Stop after a partial write; it's highly likely that a subsequent write
// attempt will just return EAGAIN.
return;
}
}
+ if (!writeReqHead_ && bufferCallback_) {
+ bufferCallback_->onEgressBufferCleared();
+ }
}
void AsyncSocket::checkForImmediateRead() noexcept {
// callbacks (since the callbacks may call detachEventBase()).
EventBase* originalEventBase = eventBase_;
- // Call the connect callback.
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectSuccess();
- }
-
+ invokeConnectSuccess();
// Note that the connect callback may have changed our state.
// (set or unset the read callback, called write(), closed the socket, etc.)
// The following code needs to handle these situations correctly.
AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
withAddr("socket closing after error"));
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(ex);
- }
-
+ invokeConnectErr(ex);
failAllWrites(ex);
if (readCallback_) {
<< ex.what();
startFail();
- if (connectCallback_ != nullptr) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(ex);
- }
-
+ invokeConnectErr(ex);
finishFail();
}
AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
"connect() called with socket in invalid state");
+ connectEndTime_ = std::chrono::steady_clock::now();
if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
if (callback) {
callback->connectErr(ex);
}
}
+void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
+ connectEndTime_ = std::chrono::steady_clock::now();
+ if (connectCallback_) {
+ ConnectCallback* callback = connectCallback_;
+ connectCallback_ = nullptr;
+ callback->connectErr(ex);
+ }
+}
+
+void AsyncSocket::invokeConnectSuccess() {
+ connectEndTime_ = std::chrono::steady_clock::now();
+ if (connectCallback_) {
+ ConnectCallback* callback = connectCallback_;
+ connectCallback_ = nullptr;
+ callback->connectSuccess();
+ }
+}
+
void AsyncSocket::invalidState(ReadCallback* callback) {
VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
<< "): setReadCallback(" << callback
return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
}
+void AsyncSocket::setBufferCallback(BufferCallback* cb) {
+ bufferCallback_ = cb;
+}
+
} // folly