callback->writeSuccess();
}
return;
- } // else { continue writing the next writeReq }
+ } else { // continue writing the next writeReq
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
+ }
+ }
mustRegister = true;
}
} else if (!connecting()) {
}
void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
- address->setFromLocalAddress(fd_);
+ if (!localAddr_.isInitialized()) {
+ localAddr_.setFromLocalAddress(fd_);
+ }
+ *address = localAddr_;
}
void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
return 0;
}
+void AsyncSocket::setPersistentCork(bool cork) {
+ if (setCork(cork) == 0) {
+ persistentCork_ = cork;
+ }
+}
+
+int AsyncSocket::setCork(bool cork) {
+#ifdef TCP_CORK
+ if (fd_ < 0) {
+ VLOG(4) << "AsyncSocket::setCork() called on non-open socket "
+ << this << "(stats=" << state_ << ")";
+ return EINVAL;
+ }
+
+ if (corked_ == cork) {
+ return 0;
+ }
+
+ int flag = cork ? 1 : 0;
+ if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) {
+ int errnoCopy = errno;
+ VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket"
+ << this << "(fd=" << fd_ << ", state=" << state_ << "):"
+ << folly::errnoStr(errnoCopy);
+ return errnoCopy;
+ }
+ corked_ = cork;
+#endif
+ return 0;
+}
+
void AsyncSocket::ioReady(uint16_t events) noexcept {
VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
<< ", events=" << std::hex << events << ", state=" << state_;
}
}
-ssize_t AsyncSocket::performRead(void** buf, size_t* buflen, size_t* offset) {
+ssize_t AsyncSocket::performRead(void** buf,
+ size_t* buflen,
+ size_t* /* offset */) {
VLOG(5) << "AsyncSocket::performRead() this=" << this
<< ", buf=" << *buf << ", buflen=" << *buflen;
// We'll continue around the loop, trying to write another request
} else {
// Partial write.
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
+ }
writeReqHead_->consume();
// Stop after a partial write; it's highly likely that a subsequent write
// attempt will just return EAGAIN.
return;
}
}
+ if (!writeReqHead_ && bufferCallback_) {
+ bufferCallback_->onEgressBufferCleared();
+ }
}
void AsyncSocket::checkForImmediateRead() noexcept {
return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
}
+void AsyncSocket::setBufferCallback(BufferCallback* cb) {
+ bufferCallback_ = cb;
+}
+
} // folly