*/
class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
public:
- static BytesWriteRequest* newRequest(
- AsyncSocket* socket,
- WriteCallback* callback,
- const iovec* ops,
- uint32_t opCount,
- uint32_t partialWritten,
- uint32_t bytesWritten,
- unique_ptr<IOBuf>&& ioBuf,
- WriteFlags flags,
- BufferCallback* bufferCallback = nullptr) {
+ static BytesWriteRequest* newRequest(AsyncSocket* socket,
+ WriteCallback* callback,
+ const iovec* ops,
+ uint32_t opCount,
+ uint32_t partialWritten,
+ uint32_t bytesWritten,
+ unique_ptr<IOBuf>&& ioBuf,
+ WriteFlags flags) {
assert(opCount > 0);
// Since we put a variable size iovec array at the end
// of each BytesWriteRequest, we have to manually allocate the memory.
return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
partialWritten, bytesWritten,
- std::move(ioBuf), flags, bufferCallback);
+ std::move(ioBuf), flags);
}
void destroy() override {
uint32_t partialBytes,
uint32_t bytesWritten,
unique_ptr<IOBuf>&& ioBuf,
- WriteFlags flags,
- BufferCallback* bufferCallback = nullptr)
- : AsyncSocket::WriteRequest(socket, callback, bufferCallback)
+ WriteFlags flags)
+ : AsyncSocket::WriteRequest(socket, callback)
, opCount_(opCount)
, opIndex_(0)
, flags_(flags)
}
void AsyncSocket::write(WriteCallback* callback,
- const void* buf, size_t bytes, WriteFlags flags,
- BufferCallback* bufCallback) {
+ const void* buf, size_t bytes, WriteFlags flags) {
iovec op;
op.iov_base = const_cast<void*>(buf);
op.iov_len = bytes;
- writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags, bufCallback);
+ writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
}
void AsyncSocket::writev(WriteCallback* callback,
const iovec* vec,
size_t count,
- WriteFlags flags,
- BufferCallback* bufCallback) {
- writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags, bufCallback);
+ WriteFlags flags) {
+ writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
}
void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
- WriteFlags flags, BufferCallback* bufCallback) {
+ WriteFlags flags) {
constexpr size_t kSmallSizeMax = 64;
size_t count = buf->countChainElements();
if (count <= kSmallSizeMax) {
iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
- writeChainImpl(callback, vec, count, std::move(buf), flags, bufCallback);
+ writeChainImpl(callback, vec, count, std::move(buf), flags);
} else {
iovec* vec = new iovec[count];
- writeChainImpl(callback, vec, count, std::move(buf), flags, bufCallback);
+ writeChainImpl(callback, vec, count, std::move(buf), flags);
delete[] vec;
}
}
void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
- size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags,
- BufferCallback* bufCallback) {
+ size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
size_t veclen = buf->fillIov(vec, count);
- writeImpl(callback, vec, veclen, std::move(buf), flags, bufCallback);
+ writeImpl(callback, vec, veclen, std::move(buf), flags);
}
void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
size_t count, unique_ptr<IOBuf>&& buf,
- WriteFlags flags, BufferCallback* bufCallback) {
+ WriteFlags flags) {
VLOG(6) << "AsyncSocket::writev() this=" << this << ", fd=" << fd_
<< ", callback=" << callback << ", count=" << count
<< ", state=" << state_;
}
return;
} else { // continue writing the next writeReq
- if (bufCallback) {
- bufCallback->onEgressBuffered();
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
}
}
mustRegister = true;
try {
req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
count - countWritten, partialWritten,
- bytesWritten, std::move(ioBuf), flags,
- bufCallback);
+ bytesWritten, std::move(ioBuf), flags);
} catch (const std::exception& ex) {
// we mainly expect to catch std::bad_alloc here
AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
}
void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
- address->setFromLocalAddress(fd_);
+ if (!localAddr_.isInitialized()) {
+ localAddr_.setFromLocalAddress(fd_);
+ }
+ *address = localAddr_;
}
void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
}
}
-ssize_t AsyncSocket::performRead(void** buf, size_t* buflen, size_t* offset) {
+ssize_t AsyncSocket::performRead(void** buf,
+ size_t* buflen,
+ size_t* /* offset */) {
VLOG(5) << "AsyncSocket::performRead() this=" << this
<< ", buf=" << *buf << ", buflen=" << *buflen;
}
// We'll continue around the loop, trying to write another request
} else {
- // Notify BufferCallback:
- BufferCallback* bufferCallback = writeReqHead_->getBufferCallback();
- if (bufferCallback) {
- bufferCallback->onEgressBuffered();
- }
// Partial write.
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
+ }
writeReqHead_->consume();
// Stop after a partial write; it's highly likely that a subsequent write
// attempt will just return EAGAIN.
return;
}
}
+ if (!writeReqHead_ && bufferCallback_) {
+ bufferCallback_->onEgressBufferCleared();
+ }
}
void AsyncSocket::checkForImmediateRead() noexcept {
return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
}
+void AsyncSocket::setBufferCallback(BufferCallback* cb) {
+ bufferCallback_ = cb;
+}
+
} // folly