#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/EventBase.h>
+#include <folly/io/async/EventHandler.h>
#include <folly/SocketAddress.h>
#include <folly/io/IOBuf.h>
#include <errno.h>
#include <limits.h>
#include <unistd.h>
+#include <thread>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
+#include <boost/preprocessor/control/if.hpp>
using std::string;
using std::unique_ptr;
// static members initializers
const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
-const folly::SocketAddress AsyncSocket::anyAddress =
- folly::SocketAddress("0.0.0.0", 0);
const AsyncSocketException socketClosedLocallyEx(
AsyncSocketException::END_OF_FILE, "socket closed locally");
const AsyncSocketException socketShutdownForWritesEx(
AsyncSocketException::END_OF_FILE, "socket shutdown for writes");
-// TODO: It might help performance to provide a version of WriteRequest that
+// TODO: It might help performance to provide a version of BytesWriteRequest that
// users could derive from, so we can avoid the extra allocation for each call
// to write()/writev(). We could templatize TFramedAsyncChannel just like the
// protocols are currently templatized for transports.
// storage space, and only our internal version would allocate it at the end of
// the WriteRequest.
-/**
- * A WriteRequest object tracks information about a pending write() or writev()
- * operation.
+/* The default WriteRequest implementation, used for write(), writev() and
+ * writeChain()
*
- * A new WriteRequest operation is allocated on the heap for all write
+ * A new BytesWriteRequest operation is allocated on the heap for all write
* operations that cannot be completed immediately.
*/
-class AsyncSocket::WriteRequest {
+class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
public:
- static WriteRequest* newRequest(WriteCallback* callback,
- const iovec* ops,
- uint32_t opCount,
- unique_ptr<IOBuf>&& ioBuf,
- WriteFlags flags) {
+ static BytesWriteRequest* newRequest(AsyncSocket* socket,
+ WriteCallback* callback,
+ const iovec* ops,
+ uint32_t opCount,
+ uint32_t partialWritten,
+ uint32_t bytesWritten,
+ unique_ptr<IOBuf>&& ioBuf,
+ WriteFlags flags) {
assert(opCount > 0);
// Since we put a variable size iovec array at the end
- // of each WriteRequest, we have to manually allocate the memory.
- void* buf = malloc(sizeof(WriteRequest) +
+ // of each BytesWriteRequest, we have to manually allocate the memory.
+ void* buf = malloc(sizeof(BytesWriteRequest) +
(opCount * sizeof(struct iovec)));
if (buf == nullptr) {
throw std::bad_alloc();
}
- return new(buf) WriteRequest(callback, ops, opCount, std::move(ioBuf),
- flags);
+ return new(buf) BytesWriteRequest(socket, callback, ops, opCount,
+ partialWritten, bytesWritten,
+ std::move(ioBuf), flags);
}
- void destroy() {
- this->~WriteRequest();
+ void destroy() override {
+ this->~BytesWriteRequest();
free(this);
}
- bool cork() const {
- return isSet(flags_, WriteFlags::CORK);
- }
-
- WriteFlags flags() const {
- return flags_;
- }
-
- WriteRequest* getNext() const {
- return next_;
- }
-
- WriteCallback* getCallback() const {
- return callback_;
- }
-
- uint32_t getBytesWritten() const {
- return bytesWritten_;
- }
-
- const struct iovec* getOps() const {
- assert(opCount_ > opIndex_);
- return writeOps_ + opIndex_;
+ bool performWrite() override {
+ WriteFlags writeFlags = flags_;
+ if (getNext() != nullptr) {
+ writeFlags = writeFlags | WriteFlags::CORK;
+ }
+ bytesWritten_ = socket_->performWrite(getOps(), getOpCount(), writeFlags,
+ &opsWritten_, &partialBytes_);
+ return bytesWritten_ >= 0;
}
- uint32_t getOpCount() const {
- assert(opCount_ > opIndex_);
- return opCount_ - opIndex_;
+ bool isComplete() override {
+ return opsWritten_ == getOpCount();
}
- void consume(uint32_t wholeOps, uint32_t partialBytes,
- uint32_t totalBytesWritten) {
- // Advance opIndex_ forward by wholeOps
- opIndex_ += wholeOps;
+ void consume() override {
+ // Advance opIndex_ forward by opsWritten_
+ opIndex_ += opsWritten_;
assert(opIndex_ < opCount_);
// If we've finished writing any IOBufs, release them
if (ioBuf_) {
- for (uint32_t i = wholeOps; i != 0; --i) {
+ for (uint32_t i = opsWritten_; i != 0; --i) {
assert(ioBuf_);
ioBuf_ = ioBuf_->pop();
}
}
- // Move partialBytes forward into the current iovec buffer
+ // Move partialBytes_ forward into the current iovec buffer
struct iovec* currentOp = writeOps_ + opIndex_;
- assert((partialBytes < currentOp->iov_len) || (currentOp->iov_len == 0));
+ assert((partialBytes_ < currentOp->iov_len) || (currentOp->iov_len == 0));
currentOp->iov_base =
- reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes;
- currentOp->iov_len -= partialBytes;
+ reinterpret_cast<uint8_t*>(currentOp->iov_base) + partialBytes_;
+ currentOp->iov_len -= partialBytes_;
- // Increment the bytesWritten_ count by totalBytesWritten
- bytesWritten_ += totalBytesWritten;
- }
-
- void append(WriteRequest* next) {
- assert(next_ == nullptr);
- next_ = next;
+ // Increment the totalBytesWritten_ count by bytesWritten_;
+ totalBytesWritten_ += bytesWritten_;
}
private:
- WriteRequest(WriteCallback* callback,
- const struct iovec* ops,
- uint32_t opCount,
- unique_ptr<IOBuf>&& ioBuf,
- WriteFlags flags)
- : next_(nullptr)
- , callback_(callback)
- , bytesWritten_(0)
+ BytesWriteRequest(AsyncSocket* socket,
+ WriteCallback* callback,
+ const struct iovec* ops,
+ uint32_t opCount,
+ uint32_t partialBytes,
+ uint32_t bytesWritten,
+ unique_ptr<IOBuf>&& ioBuf,
+ WriteFlags flags)
+ : AsyncSocket::WriteRequest(socket, callback)
, opCount_(opCount)
, opIndex_(0)
, flags_(flags)
- , ioBuf_(std::move(ioBuf)) {
+ , ioBuf_(std::move(ioBuf))
+ , opsWritten_(0)
+ , partialBytes_(partialBytes)
+ , bytesWritten_(bytesWritten) {
memcpy(writeOps_, ops, sizeof(*ops) * opCount_);
}
- // Private destructor, to ensure callers use destroy()
- ~WriteRequest() {}
+ // private destructor, to ensure callers use destroy()
+ ~BytesWriteRequest() override = default;
+
+ const struct iovec* getOps() const {
+ assert(opCount_ > opIndex_);
+ return writeOps_ + opIndex_;
+ }
+
+ uint32_t getOpCount() const {
+ assert(opCount_ > opIndex_);
+ return opCount_ - opIndex_;
+ }
- WriteRequest* next_; ///< pointer to next WriteRequest
- WriteCallback* callback_; ///< completion callback
- uint32_t bytesWritten_; ///< bytes written
uint32_t opCount_; ///< number of entries in writeOps_
uint32_t opIndex_; ///< current index into writeOps_
WriteFlags flags_; ///< set for WriteFlags
unique_ptr<IOBuf> ioBuf_; ///< underlying IOBuf, or nullptr if N/A
+
+ // for consume(), how much we wrote on the last write
+ uint32_t opsWritten_; ///< complete ops written
+ uint32_t partialBytes_; ///< partial bytes of incomplete op written
+ ssize_t bytesWritten_; ///< bytes written altogether
+
struct iovec writeOps_[]; ///< write operation(s) list
};
AsyncSocket::AsyncSocket()
: eventBase_(nullptr)
, writeTimeout_(this, nullptr)
- , ioHandler_(this, nullptr) {
+ , ioHandler_(this, nullptr)
+ , immediateReadHandler_(this) {
VLOG(5) << "new AsyncSocket()";
init();
}
AsyncSocket::AsyncSocket(EventBase* evb)
: eventBase_(evb)
, writeTimeout_(this, evb)
- , ioHandler_(this, evb) {
+ , ioHandler_(this, evb)
+ , immediateReadHandler_(this) {
VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ")";
init();
}
AsyncSocket::AsyncSocket(EventBase* evb, int fd)
: eventBase_(evb)
, writeTimeout_(this, evb)
- , ioHandler_(this, evb, fd) {
+ , ioHandler_(this, evb, fd)
+ , immediateReadHandler_(this) {
VLOG(5) << "new AsyncSocket(" << this << ", evb=" << evb << ", fd="
<< fd << ")";
init();
return fd;
}
+const folly::SocketAddress& AsyncSocket::anyAddress() {
+ static const folly::SocketAddress anyAddress =
+ folly::SocketAddress("0.0.0.0", 0);
+ return anyAddress;
+}
+
void AsyncSocket::setShutdownSocketSet(ShutdownSocketSet* newSS) {
if (shutdownSocketSet_ == newSS) {
return;
return invalidState(callback);
}
+ connectStartTime_ = std::chrono::steady_clock::now();
+ // Make connect end time at least >= connectStartTime.
+ connectEndTime_ = connectStartTime_;
+
assert(fd_ == -1);
state_ = StateEnum::CONNECTING;
connectCallback_ = callback;
<< ", fd=" << fd_ << ", host=" << address.describe().c_str();
// bind the socket
- if (bindAddr != anyAddress) {
+ if (bindAddr != anyAddress()) {
int one = 1;
if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) {
doClose();
assert(readCallback_ == nullptr);
assert(writeReqHead_ == nullptr);
state_ = StateEnum::ESTABLISHED;
- if (callback) {
- connectCallback_ = nullptr;
- callback->connectSuccess();
- }
+ invokeConnectSuccess();
}
void AsyncSocket::connect(ConnectCallback* callback,
return;
}
+ /* We are removing a read callback */
+ if (callback == nullptr &&
+ immediateReadHandler_.isLoopCallbackScheduled()) {
+ immediateReadHandler_.cancelLoopCallback();
+ }
+
if (shutdownFlags_ & SHUT_READ) {
// Reads have already been shut down on this socket.
//
iovec op;
op.iov_base = const_cast<void*>(buf);
op.iov_len = bytes;
- writeImpl(callback, &op, 1, std::move(unique_ptr<IOBuf>()), flags);
+ writeImpl(callback, &op, 1, unique_ptr<IOBuf>(), flags);
}
void AsyncSocket::writev(WriteCallback* callback,
const iovec* vec,
size_t count,
WriteFlags flags) {
- writeImpl(callback, vec, count, std::move(unique_ptr<IOBuf>()), flags);
+ writeImpl(callback, vec, count, unique_ptr<IOBuf>(), flags);
}
void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
WriteFlags flags) {
+ constexpr size_t kSmallSizeMax = 64;
size_t count = buf->countChainElements();
- if (count <= 64) {
- iovec vec[count];
+ if (count <= kSmallSizeMax) {
+ iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
writeChainImpl(callback, vec, count, std::move(buf), flags);
} else {
iovec* vec = new iovec[count];
void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
- const IOBuf* head = buf.get();
- const IOBuf* next = head;
- unsigned i = 0;
- do {
- vec[i].iov_base = const_cast<uint8_t *>(next->data());
- vec[i].iov_len = next->length();
- // IOBuf can get confused by empty iovec buffers, so increment the
- // output pointer only if the iovec buffer is non-empty. We could
- // end the loop with i < count, but that's ok.
- if (vec[i].iov_len != 0) {
- i++;
- }
- next = next->next();
- } while (next != head);
- writeImpl(callback, vec, i, std::move(buf), flags);
+ size_t veclen = buf->fillIov(vec, count);
+ writeImpl(callback, vec, veclen, std::move(buf), flags);
}
void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
callback->writeSuccess();
}
return;
- } // else { continue writing the next writeReq }
+ } else { // continue writing the next writeReq
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
+ }
+ }
mustRegister = true;
}
} else if (!connecting()) {
// Create a new WriteRequest to add to the queue
WriteRequest* req;
try {
- req = WriteRequest::newRequest(callback, vec + countWritten,
- count - countWritten, std::move(ioBuf),
- flags);
+ req = BytesWriteRequest::newRequest(this, callback, vec + countWritten,
+ count - countWritten, partialWritten,
+ bytesWritten, std::move(ioBuf), flags);
} catch (const std::exception& ex) {
// we mainly expect to catch std::bad_alloc here
AsyncSocketException tex(AsyncSocketException::INTERNAL_ERROR,
withAddr(string("failed to append new WriteRequest: ") + ex.what()));
return failWrite(__func__, callback, bytesWritten, tex);
}
- req->consume(0, partialWritten, bytesWritten);
+ req->consume();
if (writeReqTail_ == nullptr) {
assert(writeReqHead_ == nullptr);
writeReqHead_ = writeReqTail_ = req;
}
}
+void AsyncSocket::writeRequest(WriteRequest* req) {
+ if (writeReqTail_ == nullptr) {
+ assert(writeReqHead_ == nullptr);
+ writeReqHead_ = writeReqTail_ = req;
+ req->start();
+ } else {
+ writeReqTail_->append(req);
+ writeReqTail_ = req;
+ }
+}
+
void AsyncSocket::close() {
VLOG(5) << "AsyncSocket::close(): this=" << this << ", fd_=" << fd_
<< ", state=" << state_ << ", shutdownFlags="
}
}
+ if (immediateReadHandler_.isLoopCallbackScheduled()) {
+ immediateReadHandler_.cancelLoopCallback();
+ }
+
if (fd_ >= 0) {
ioHandler_.changeHandlerFD(-1);
doClose();
}
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(socketClosedLocallyEx);
- }
+ invokeConnectErr(socketClosedLocallyEx);
failAllWrites(socketClosedLocallyEx);
}
void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
- address->setFromLocalAddress(fd_);
+ if (!localAddr_.isInitialized()) {
+ localAddr_.setFromLocalAddress(fd_);
+ }
+ *address = localAddr_;
}
void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
return 0;
}
+void AsyncSocket::setPersistentCork(bool cork) {
+ if (setCork(cork) == 0) {
+ persistentCork_ = cork;
+ }
+}
+
+int AsyncSocket::setCork(bool cork) {
+#ifdef TCP_CORK
+ if (fd_ < 0) {
+ VLOG(4) << "AsyncSocket::setCork() called on non-open socket "
+ << this << "(stats=" << state_ << ")";
+ return EINVAL;
+ }
+
+ if (corked_ == cork) {
+ return 0;
+ }
+
+ int flag = cork ? 1 : 0;
+ if (setsockopt(fd_, IPPROTO_TCP, TCP_CORK, &flag, sizeof(flag)) != 0) {
+ int errnoCopy = errno;
+ VLOG(2) << "faield to turn on TCP_CORK option on AsyncSocket"
+ << this << "(fd=" << fd_ << ", state=" << state_ << "):"
+ << folly::errnoStr(errnoCopy);
+ return errnoCopy;
+ }
+ corked_ = cork;
+#endif
+ return 0;
+}
+
void AsyncSocket::ioReady(uint16_t events) noexcept {
VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
<< ", events=" << std::hex << events << ", state=" << state_;
}
}
-ssize_t AsyncSocket::performRead(void* buf, size_t buflen) {
- ssize_t bytes = recv(fd_, buf, buflen, MSG_DONTWAIT);
+ssize_t AsyncSocket::performRead(void** buf,
+ size_t* buflen,
+ size_t* /* offset */) {
+ VLOG(5) << "AsyncSocket::performRead() this=" << this
+ << ", buf=" << *buf << ", buflen=" << *buflen;
+
+ int recvFlags = 0;
+ if (peek_) {
+ recvFlags |= MSG_PEEK;
+ }
+
+ ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// No more data to read right now.
}
}
+void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) noexcept {
+ // no matter what, buffer should be preapared for non-ssl socket
+ CHECK(readCallback_);
+ readCallback_->getReadBuffer(buf, buflen);
+}
+
void AsyncSocket::handleRead() noexcept {
VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
<< ", state=" << state_;
while (readCallback_ && eventBase_ == originalEventBase) {
// Get the buffer to read into.
void* buf = nullptr;
- size_t buflen = 0;
+ size_t buflen = 0, offset = 0;
try {
- readCallback_->getReadBuffer(&buf, &buflen);
+ prepareReadBuffer(&buf, &buflen);
+ VLOG(5) << "prepareReadBuffer() buf=" << buf << ", buflen=" << buflen;
} catch (const AsyncSocketException& ex) {
return failRead(__func__, ex);
} catch (const std::exception& ex) {
"non-exception type");
return failRead(__func__, ex);
}
- if (buf == nullptr || buflen == 0) {
+ if (!isBufferMovable_ && (buf == nullptr || buflen == 0)) {
AsyncSocketException ex(AsyncSocketException::BAD_ARGS,
"ReadCallback::getReadBuffer() returned "
"empty buffer");
}
// Perform the read
- ssize_t bytesRead = performRead(buf, buflen);
+ ssize_t bytesRead = performRead(&buf, &buflen, &offset);
+ VLOG(4) << "this=" << this << ", AsyncSocket::handleRead() got "
+ << bytesRead << " bytes";
if (bytesRead > 0) {
- readCallback_->readDataAvailable(bytesRead);
+ if (!isBufferMovable_) {
+ readCallback_->readDataAvailable(bytesRead);
+ } else {
+ CHECK(kOpenSslModeMoveBufferOwnership);
+ VLOG(5) << "this=" << this << ", AsyncSocket::handleRead() got "
+ << "buf=" << buf << ", " << bytesRead << "/" << buflen
+ << ", offset=" << offset;
+ auto readBuf = folly::IOBuf::takeOwnership(buf, buflen);
+ readBuf->trimStart(offset);
+ readBuf->trimEnd(buflen - offset - bytesRead);
+ readCallback_->readBufferAvailable(std::move(readBuf));
+ }
+
// Fall through and continue around the loop if the read
// completely filled the available buffer.
// Note that readCallback_ may have been uninstalled or changed inside
// No more data to read right now.
return;
} else if (bytesRead == READ_ERROR) {
+ readErr_ = READ_ERROR;
AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
withAddr("recv() failed"), errno);
return failRead(__func__, ex);
} else {
assert(bytesRead == READ_EOF);
+ readErr_ = READ_EOF;
// EOF
shutdownFlags_ |= SHUT_READ;
if (!updateEventRegistration(0, EventHandler::READ)) {
return;
}
if (maxReadsPerEvent_ && (++numReads >= maxReadsPerEvent_)) {
+ if (readCallback_ != nullptr) {
+ // We might still have data in the socket.
+ // (e.g. see comment in AsyncSSLSocket::checkForImmediateRead)
+ scheduleImmediateRead();
+ }
return;
}
}
// (See the comment in handleRead() explaining how this can happen.)
EventBase* originalEventBase = eventBase_;
while (writeReqHead_ != nullptr && eventBase_ == originalEventBase) {
- uint32_t countWritten;
- uint32_t partialWritten;
- WriteFlags writeFlags = writeReqHead_->flags();
- if (writeReqHead_->getNext() != nullptr) {
- writeFlags = writeFlags | WriteFlags::CORK;
- }
- int bytesWritten = performWrite(writeReqHead_->getOps(),
- writeReqHead_->getOpCount(),
- writeFlags, &countWritten, &partialWritten);
- if (bytesWritten < 0) {
+ if (!writeReqHead_->performWrite()) {
AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
withAddr("writev() failed"), errno);
return failWrite(__func__, ex);
- } else if (countWritten == writeReqHead_->getOpCount()) {
+ } else if (writeReqHead_->isComplete()) {
// We finished this request
WriteRequest* req = writeReqHead_;
writeReqHead_ = req->getNext();
// We'll continue around the loop, trying to write another request
} else {
// Partial write.
- writeReqHead_->consume(countWritten, partialWritten, bytesWritten);
+ if (bufferCallback_) {
+ bufferCallback_->onEgressBuffered();
+ }
+ writeReqHead_->consume();
// Stop after a partial write; it's highly likely that a subsequent write
// attempt will just return EAGAIN.
//
return;
}
}
+ if (!writeReqHead_ && bufferCallback_) {
+ bufferCallback_->onEgressBufferCleared();
+ }
}
void AsyncSocket::checkForImmediateRead() noexcept {
// callbacks (since the callbacks may call detachEventBase()).
EventBase* originalEventBase = eventBase_;
- // Call the connect callback.
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectSuccess();
- }
-
+ invokeConnectSuccess();
// Note that the connect callback may have changed our state.
// (set or unset the read callback, called write(), closed the socket, etc.)
// The following code needs to handle these situations correctly.
AsyncSocketException ex(AsyncSocketException::INTERNAL_ERROR,
withAddr("socket closing after error"));
- if (connectCallback_) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(ex);
- }
-
+ invokeConnectErr(ex);
failAllWrites(ex);
if (readCallback_) {
<< ex.what();
startFail();
- if (connectCallback_ != nullptr) {
- ConnectCallback* callback = connectCallback_;
- connectCallback_ = nullptr;
- callback->connectErr(ex);
- }
-
+ invokeConnectErr(ex);
finishFail();
}
WriteRequest* req = writeReqHead_;
writeReqHead_ = req->getNext();
WriteCallback* callback = req->getCallback();
- uint32_t bytesWritten = req->getBytesWritten();
+ uint32_t bytesWritten = req->getTotalBytesWritten();
req->destroy();
if (callback) {
callback->writeErr(bytesWritten, ex);
writeReqHead_ = req->getNext();
WriteCallback* callback = req->getCallback();
if (callback) {
- callback->writeErr(req->getBytesWritten(), ex);
+ callback->writeErr(req->getTotalBytesWritten(), ex);
}
req->destroy();
}
AsyncSocketException ex(AsyncSocketException::ALREADY_OPEN,
"connect() called with socket in invalid state");
+ connectEndTime_ = std::chrono::steady_clock::now();
if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
if (callback) {
callback->connectErr(ex);
}
}
+void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
+ connectEndTime_ = std::chrono::steady_clock::now();
+ if (connectCallback_) {
+ ConnectCallback* callback = connectCallback_;
+ connectCallback_ = nullptr;
+ callback->connectErr(ex);
+ }
+}
+
+void AsyncSocket::invokeConnectSuccess() {
+ connectEndTime_ = std::chrono::steady_clock::now();
+ if (connectCallback_) {
+ ConnectCallback* callback = connectCallback_;
+ connectCallback_ = nullptr;
+ callback->connectSuccess();
+ }
+}
+
void AsyncSocket::invalidState(ReadCallback* callback) {
VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
<< "): setReadCallback(" << callback
return s + " (peer=" + peer.describe() + ", local=" + local.describe() + ")";
}
+void AsyncSocket::setBufferCallback(BufferCallback* cb) {
+ bufferCallback_ = cb;
+}
+
} // folly