Move address caching logic from AsyncSSLSocket to AsyncSocket.
[folly.git] / folly / io / async / AsyncSocket.cpp
index bbdbc46cec4ff2de7b7918bc46ea0f0cae453f93..f32da3a75ad8e25372cd75854ba222b9c48ff9c4 100644 (file)
 #include <folly/io/async/AsyncSocket.h>
 
 #include <folly/ExceptionWrapper.h>
+#include <folly/Portability.h>
 #include <folly/SocketAddress.h>
+#include <folly/io/Cursor.h>
 #include <folly/io/IOBuf.h>
-#include <folly/Portability.h>
+#include <folly/io/IOBufQueue.h>
 #include <folly/portability/Fcntl.h>
 #include <folly/portability/Sockets.h>
 #include <folly/portability/SysUio.h>
 #include <folly/portability/Unistd.h>
 
+#include <boost/preprocessor/control/if.hpp>
 #include <errno.h>
 #include <limits.h>
-#include <thread>
 #include <sys/types.h>
-#include <boost/preprocessor/control/if.hpp>
+#include <thread>
 
 using std::string;
 using std::unique_ptr;
@@ -38,6 +40,13 @@ namespace fsp = folly::portability::sockets;
 
 namespace folly {
 
+static constexpr bool msgErrQueueSupported =
+#ifdef MSG_ERRQUEUE
+    true;
+#else
+    false;
+#endif // MSG_ERRQUEUE
+
 // static members initializers
 const AsyncSocket::OptionMap AsyncSocket::emptyOptionMap;
 
@@ -176,6 +185,33 @@ class AsyncSocket::BytesWriteRequest : public AsyncSocket::WriteRequest {
   struct iovec writeOps_[];     ///< write operation(s) list
 };
 
+int AsyncSocket::SendMsgParamsCallback::getDefaultFlags(folly::WriteFlags flags)
+                                                                      noexcept {
+  int msg_flags = MSG_DONTWAIT;
+
+#ifdef MSG_NOSIGNAL // Linux-only
+  msg_flags |= MSG_NOSIGNAL;
+#ifdef MSG_MORE
+  if (isSet(flags, WriteFlags::CORK)) {
+    // MSG_MORE tells the kernel we have more data to send, so wait for us to
+    // give it the rest of the data rather than immediately sending a partial
+    // frame, even when TCP_NODELAY is enabled.
+    msg_flags |= MSG_MORE;
+  }
+#endif // MSG_MORE
+#endif // MSG_NOSIGNAL
+  if (isSet(flags, WriteFlags::EOR)) {
+    // marks that this is the last byte of a record (response)
+    msg_flags |= MSG_EOR;
+  }
+
+  return msg_flags;
+}
+
+namespace {
+static AsyncSocket::SendMsgParamsCallback defaultSendMsgParamsCallback;
+}
+
 AsyncSocket::AsyncSocket()
     : eventBase_(nullptr),
       writeTimeout_(this, nullptr),
@@ -222,10 +258,17 @@ AsyncSocket::AsyncSocket(EventBase* evb, int fd)
   state_ = StateEnum::ESTABLISHED;
 }
 
+AsyncSocket::AsyncSocket(AsyncSocket::UniquePtr oldAsyncSocket)
+    : AsyncSocket(oldAsyncSocket->getEventBase(), oldAsyncSocket->detachFd()) {
+  preReceivedData_ = std::move(oldAsyncSocket->preReceivedData_);
+}
+
 // init() method, since constructor forwarding isn't supported in most
 // compilers yet.
 void AsyncSocket::init() {
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
   shutdownFlags_ = 0;
   state_ = StateEnum::UNINIT;
   eventFlags_ = EventHandler::NONE;
@@ -233,12 +276,14 @@ void AsyncSocket::init() {
   sendTimeout_ = 0;
   maxReadsPerEvent_ = 16;
   connectCallback_ = nullptr;
+  errMessageCallback_ = nullptr;
   readCallback_ = nullptr;
   writeReqHead_ = nullptr;
   writeReqTail_ = nullptr;
   shutdownSocketSet_ = nullptr;
   appBytesWritten_ = 0;
   appBytesReceived_ = 0;
+  sendMsgParamCallback_ = &defaultSendMsgParamsCallback;
 }
 
 AsyncSocket::~AsyncSocket() {
@@ -313,7 +358,7 @@ void AsyncSocket::connect(ConnectCallback* callback,
                            const OptionMap &options,
                            const folly::SocketAddress& bindAddr) noexcept {
   DestructorGuard dg(this);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   addr_ = address;
 
@@ -462,6 +507,7 @@ void AsyncSocket::connect(ConnectCallback* callback,
   // The read callback may not have been set yet, and no writes may be pending
   // yet, so we don't have to register for any events at the moment.
   VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
+  assert(errMessageCallback_ == nullptr);
   assert(readCallback_ == nullptr);
   assert(writeReqHead_ == nullptr);
   if (state_ != StateEnum::FAST_OPEN) {
@@ -476,6 +522,11 @@ int AsyncSocket::socketConnect(const struct sockaddr* saddr, socklen_t len) {
     // Ignore return value, errors are ok
     setsockopt(fd_, SOL_SOCKET, SO_NO_TRANSPARENT_TLS, nullptr, 0);
   }
+  if (noTSocks_) {
+    VLOG(4) << "Disabling TSOCKS for fd " << fd_;
+    // Ignore return value, errors are ok
+    setsockopt(fd_, SOL_SOCKET, SO_NO_TSOCKS, nullptr, 0);
+  }
 #endif
   int rv = fsp::connect(fd_, saddr, len);
   if (rv < 0) {
@@ -543,7 +594,9 @@ void AsyncSocket::cancelConnect() {
 
 void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
   sendTimeout_ = milliseconds;
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
 
   // If we are currently pending on write requests, immediately update
   // writeTimeout_ with the new value.
@@ -563,6 +616,68 @@ void AsyncSocket::setSendTimeout(uint32_t milliseconds) {
   }
 }
 
+void AsyncSocket::setErrMessageCB(ErrMessageCallback* callback) {
+  VLOG(6) << "AsyncSocket::setErrMessageCB() this=" << this
+          << ", fd=" << fd_ << ", callback=" << callback
+          << ", state=" << state_;
+
+  // Short circuit if callback is the same as the existing errMessageCallback_.
+  if (callback == errMessageCallback_) {
+    return;
+  }
+
+  if (!msgErrQueueSupported) {
+      // Per-socket error message queue is not supported on this platform.
+      return invalidState(callback);
+  }
+
+  DestructorGuard dg(this);
+  eventBase_->dcheckIsInEventBaseThread();
+
+  if (callback == nullptr) {
+    // We should be able to reset the callback regardless of the
+    // socket state. It's important to have a reliable callback
+    // cancellation mechanism.
+    errMessageCallback_ = callback;
+    return;
+  }
+
+  switch ((StateEnum)state_) {
+    case StateEnum::CONNECTING:
+    case StateEnum::FAST_OPEN:
+    case StateEnum::ESTABLISHED: {
+      errMessageCallback_ = callback;
+      return;
+    }
+    case StateEnum::CLOSED:
+    case StateEnum::ERROR:
+      // We should never reach here.  SHUT_READ should always be set
+      // if we are in STATE_CLOSED or STATE_ERROR.
+      assert(false);
+      return invalidState(callback);
+    case StateEnum::UNINIT:
+      // We do not allow setReadCallback() to be called before we start
+      // connecting.
+      return invalidState(callback);
+  }
+
+  // We don't put a default case in the switch statement, so that the compiler
+  // will warn us to update the switch statement if a new state is added.
+  return invalidState(callback);
+}
+
+AsyncSocket::ErrMessageCallback* AsyncSocket::getErrMessageCallback() const {
+  return errMessageCallback_;
+}
+
+void AsyncSocket::setSendMsgParamCB(SendMsgParamsCallback* callback) {
+  sendMsgParamCallback_ = callback;
+}
+
+AsyncSocket::SendMsgParamsCallback* AsyncSocket::getSendMsgParamsCB() const {
+  return sendMsgParamCallback_;
+}
+
 void AsyncSocket::setReadCB(ReadCallback *callback) {
   VLOG(6) << "AsyncSocket::setReadCallback() this=" << this << ", fd=" << fd_
           << ", callback=" << callback << ", state=" << state_;
@@ -603,7 +718,7 @@ void AsyncSocket::setReadCB(ReadCallback *callback) {
   }
 
   DestructorGuard dg(this);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   switch ((StateEnum)state_) {
     case StateEnum::CONNECTING:
@@ -678,10 +793,10 @@ void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
   size_t count = buf->countChainElements();
   if (count <= kSmallSizeMax) {
     // suppress "warning: variable length array 'vec' is used [-Wvla]"
-    FOLLY_PUSH_WARNING;
-    FOLLY_GCC_DISABLE_WARNING(vla);
+    FOLLY_PUSH_WARNING
+    FOLLY_GCC_DISABLE_WARNING("-Wvla")
     iovec vec[BOOST_PP_IF(FOLLY_HAVE_VLA, count, kSmallSizeMax)];
-    FOLLY_POP_WARNING;
+    FOLLY_POP_WARNING
 
     writeChainImpl(callback, vec, count, std::move(buf), flags);
   } else {
@@ -705,7 +820,7 @@ void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
           << ", state=" << state_;
   DestructorGuard dg(this);
   unique_ptr<IOBuf>ioBuf(std::move(buf));
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   if (shutdownFlags_ & (SHUT_WRITE | SHUT_WRITE_PENDING)) {
     // No new writes may be performed after the write side of the socket has
@@ -855,7 +970,7 @@ void AsyncSocket::close() {
   // Declare a DestructorGuard to ensure that the AsyncSocket cannot be
   // destroyed until close() returns.
   DestructorGuard dg(this);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   // Since there are write requests pending, we have to set the
   // SHUT_WRITE_PENDING flag, and wait to perform the real close until the
@@ -887,7 +1002,9 @@ void AsyncSocket::closeNow() {
           << ", state=" << state_ << ", shutdownFlags="
           << std::hex << (int) shutdownFlags_;
   DestructorGuard dg(this);
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
 
   switch (state_) {
     case StateEnum::ESTABLISHED:
@@ -979,7 +1096,7 @@ void AsyncSocket::shutdownWrite() {
     return;
   }
 
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   // There are pending writes.  Set SHUT_WRITE_PENDING so that the actual
   // shutdown will be performed once all writes complete.
@@ -1006,7 +1123,9 @@ void AsyncSocket::shutdownWriteNow() {
   }
 
   DestructorGuard dg(this);
-  assert(eventBase_ == nullptr || eventBase_->isInEventBaseThread());
+  if (eventBase_) {
+    eventBase_->dcheckIsInEventBaseThread();
+  }
 
   switch (static_cast<StateEnum>(state_)) {
     case StateEnum::ESTABLISHED:
@@ -1083,6 +1202,18 @@ bool AsyncSocket::readable() const {
   return rc == 1;
 }
 
+bool AsyncSocket::writable() const {
+  if (fd_ == -1) {
+    return false;
+  }
+  struct pollfd fds[1];
+  fds[0].fd = fd_;
+  fds[0].events = POLLOUT;
+  fds[0].revents = 0;
+  int rc = poll(fds, 1, 0);
+  return rc == 1;
+}
+
 bool AsyncSocket::isPending() const {
   return ioHandler_.isPending();
 }
@@ -1122,7 +1253,7 @@ void AsyncSocket::attachEventBase(EventBase* eventBase) {
           << ", state=" << state_ << ", events="
           << std::hex << eventFlags_ << ")";
   assert(eventBase_ == nullptr);
-  assert(eventBase->isInEventBaseThread());
+  eventBase->dcheckIsInEventBaseThread();
 
   eventBase_ = eventBase;
   ioHandler_.attachEventBase(eventBase);
@@ -1137,7 +1268,7 @@ void AsyncSocket::detachEventBase() {
           << ", old evb=" << eventBase_ << ", state=" << state_
           << ", events=" << std::hex << eventFlags_ << ")";
   assert(eventBase_ != nullptr);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   eventBase_ = nullptr;
   ioHandler_.detachEventBase();
@@ -1149,22 +1280,44 @@ void AsyncSocket::detachEventBase() {
 
 bool AsyncSocket::isDetachable() const {
   DCHECK(eventBase_ != nullptr);
-  DCHECK(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   return !ioHandler_.isHandlerRegistered() && !writeTimeout_.isScheduled();
 }
 
-void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
+void AsyncSocket::cacheAddresses() {
+  if (fd_ >= 0) {
+    try {
+      cacheLocalAddress();
+      cachePeerAddress();
+    } catch (const std::system_error& e) {
+      if (e.code() != std::error_code(ENOTCONN, std::system_category())) {
+        VLOG(1) << "Error caching addresses: " << e.code().value() << ", "
+                << e.code().message();
+      }
+    }
+  }
+}
+
+void AsyncSocket::cacheLocalAddress() const {
   if (!localAddr_.isInitialized()) {
     localAddr_.setFromLocalAddress(fd_);
   }
-  *address = localAddr_;
 }
 
-void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
+void AsyncSocket::cachePeerAddress() const {
   if (!addr_.isInitialized()) {
     addr_.setFromPeerAddress(fd_);
   }
+}
+
+void AsyncSocket::getLocalAddress(folly::SocketAddress* address) const {
+  cacheLocalAddress();
+  *address = localAddr_;
+}
+
+void AsyncSocket::getPeerAddress(folly::SocketAddress* address) const {
+  cachePeerAddress();
   *address = addr_;
 }
 
@@ -1222,6 +1375,7 @@ int AsyncSocket::setCongestionFlavor(const std::string &cname) {
 }
 
 int AsyncSocket::setQuickAck(bool quickack) {
+  (void)quickack;
   if (fd_ < 0) {
     VLOG(4) << "AsyncSocket::setQuickAck() called on non-open socket "
                << this << "(state=" << state_ << ")";
@@ -1300,19 +1454,30 @@ int AsyncSocket::setTCPProfile(int profd) {
 }
 
 void AsyncSocket::ioReady(uint16_t events) noexcept {
-  VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd" << fd_
+  VLOG(7) << "AsyncSocket::ioRead() this=" << this << ", fd=" << fd_
           << ", events=" << std::hex << events << ", state=" << state_;
   DestructorGuard dg(this);
   assert(events & EventHandler::READ_WRITE);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   uint16_t relevantEvents = uint16_t(events & EventHandler::READ_WRITE);
+  EventBase* originalEventBase = eventBase_;
+  // If we got there it means that either EventHandler::READ or
+  // EventHandler::WRITE is set. Any of these flags can
+  // indicate that there are messages available in the socket
+  // error message queue.
+  handleErrMessages();
+
+  // Return now if handleErrMessages() detached us from our EventBase
+  if (eventBase_ != originalEventBase) {
+    return;
+  }
+
   if (relevantEvents == EventHandler::READ) {
     handleRead();
   } else if (relevantEvents == EventHandler::WRITE) {
     handleWrite();
   } else if (relevantEvents == EventHandler::READ_WRITE) {
-    EventBase* originalEventBase = eventBase_;
     // If both read and write events are ready, process writes first.
     handleWrite();
 
@@ -1339,12 +1504,23 @@ AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
   VLOG(5) << "AsyncSocket::performRead() this=" << this << ", buf=" << *buf
           << ", buflen=" << *buflen;
 
-  int recvFlags = 0;
-  if (peek_) {
-    recvFlags |= MSG_PEEK;
+  if (preReceivedData_ && !preReceivedData_->empty()) {
+    VLOG(5) << "AsyncSocket::performRead() this=" << this
+            << ", reading pre-received data";
+
+    io::Cursor cursor(preReceivedData_.get());
+    auto len = cursor.pullAtMost(*buf, *buflen);
+
+    IOBufQueue queue;
+    queue.append(std::move(preReceivedData_));
+    queue.trimStart(len);
+    preReceivedData_ = queue.move();
+
+    appBytesReceived_ += len;
+    return ReadResult(len);
   }
 
-  ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT | recvFlags);
+  ssize_t bytes = recv(fd_, *buf, *buflen, MSG_DONTWAIT);
   if (bytes < 0) {
     if (errno == EAGAIN || errno == EWOULDBLOCK) {
       // No more data to read right now.
@@ -1364,6 +1540,63 @@ void AsyncSocket::prepareReadBuffer(void** buf, size_t* buflen) {
   readCallback_->getReadBuffer(buf, buflen);
 }
 
+void AsyncSocket::handleErrMessages() noexcept {
+  // This method has non-empty implementation only for platforms
+  // supporting per-socket error queues.
+  VLOG(5) << "AsyncSocket::handleErrMessages() this=" << this << ", fd=" << fd_
+          << ", state=" << state_;
+  if (errMessageCallback_ == nullptr) {
+    VLOG(7) << "AsyncSocket::handleErrMessages(): "
+            << "no callback installed - exiting.";
+    return;
+  }
+
+#ifdef MSG_ERRQUEUE
+  uint8_t ctrl[1024];
+  unsigned char data;
+  struct msghdr msg;
+  iovec entry;
+
+  entry.iov_base = &data;
+  entry.iov_len = sizeof(data);
+  msg.msg_iov = &entry;
+  msg.msg_iovlen = 1;
+  msg.msg_name = nullptr;
+  msg.msg_namelen = 0;
+  msg.msg_control = ctrl;
+  msg.msg_controllen = sizeof(ctrl);
+  msg.msg_flags = 0;
+
+  int ret;
+  while (true) {
+    ret = recvmsg(fd_, &msg, MSG_ERRQUEUE);
+    VLOG(5) << "AsyncSocket::handleErrMessages(): recvmsg returned " << ret;
+
+    if (ret < 0) {
+      if (errno != EAGAIN) {
+        auto errnoCopy = errno;
+        LOG(ERROR) << "::recvmsg exited with code " << ret
+                   << ", errno: " << errnoCopy;
+        AsyncSocketException ex(
+          AsyncSocketException::INTERNAL_ERROR,
+          withAddr("recvmsg() failed"),
+          errnoCopy);
+        failErrMessageRead(__func__, ex);
+      }
+      return;
+    }
+
+    for (struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
+         cmsg != nullptr &&
+           cmsg->cmsg_len != 0 &&
+           errMessageCallback_ != nullptr;
+         cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+      errMessageCallback_->errMessage(*cmsg);
+    }
+  }
+#endif //MSG_ERRQUEUE
+}
+
 void AsyncSocket::handleRead() noexcept {
   VLOG(5) << "AsyncSocket::handleRead() this=" << this << ", fd=" << fd_
           << ", state=" << state_;
@@ -1640,6 +1873,12 @@ void AsyncSocket::checkForImmediateRead() noexcept {
   // be a pessimism.  In most cases it probably wouldn't be readable, and we
   // would just waste an extra system call.  Even if it is readable, waiting to
   // find out from libevent on the next event loop doesn't seem that bad.
+  //
+  // The exception to this is if we have pre-received data. In that case there
+  // is definitely data available immediately.
+  if (preReceivedData_ && !preReceivedData_->empty()) {
+    handleRead();
+  }
 }
 
 void AsyncSocket::handleInitialReadWrite() noexcept {
@@ -1764,14 +2003,16 @@ void AsyncSocket::timeoutExpired() noexcept {
   VLOG(7) << "AsyncSocket " << this << ", fd " << fd_ << ": timeout expired: "
           << "state=" << state_ << ", events=" << std::hex << eventFlags_;
   DestructorGuard dg(this);
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
 
   if (state_ == StateEnum::CONNECTING) {
     // connect() timed out
     // Unregister for I/O events.
     if (connectCallback_) {
       AsyncSocketException ex(
-          AsyncSocketException::TIMED_OUT, "connect timed out");
+          AsyncSocketException::TIMED_OUT,
+          folly::sformat(
+              "connect timed out after {}ms", connectTimeout_.count()));
       failConnect(__func__, ex);
     } else {
       // we faced a connect error without a connect callback, which could
@@ -1782,7 +2023,9 @@ void AsyncSocket::timeoutExpired() noexcept {
     }
   } else {
     // a normal write operation timed out
-    AsyncSocketException ex(AsyncSocketException::TIMED_OUT, "write timed out");
+    AsyncSocketException ex(
+        AsyncSocketException::TIMED_OUT,
+        folly::sformat("write timed out after {}ms", sendTimeout_));
     failWrite(__func__, ex);
   }
 }
@@ -1817,7 +2060,7 @@ AsyncSocket::sendSocketMessage(int fd, struct msghdr* msg, int msg_flags) {
         registerForConnectEvents();
       } catch (const AsyncSocketException& ex) {
         return WriteResult(
-            WRITE_ERROR, folly::make_unique<AsyncSocketException>(ex));
+            WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
       }
       // Let's fake it that no bytes were written and return an errno.
       errno = EAGAIN;
@@ -1840,7 +2083,7 @@ AsyncSocket::sendSocketMessage(int fd, struct msghdr* msg, int msg_flags) {
         totalWritten = -1;
       } catch (const AsyncSocketException& ex) {
         return WriteResult(
-            WRITE_ERROR, folly::make_unique<AsyncSocketException>(ex));
+            WRITE_ERROR, std::make_unique<AsyncSocketException>(ex));
       }
     } else if (errno == EAGAIN) {
       // Normally sendmsg would indicate that the write would block.
@@ -1849,7 +2092,7 @@ AsyncSocket::sendSocketMessage(int fd, struct msghdr* msg, int msg_flags) {
       // instead, and is an error condition indicating no fds available.
       return WriteResult(
           WRITE_ERROR,
-          folly::make_unique<AsyncSocketException>(
+          std::make_unique<AsyncSocketException>(
               AsyncSocketException::UNKNOWN, "No more free local ports"));
     }
   } else {
@@ -1873,25 +2116,19 @@ AsyncSocket::WriteResult AsyncSocket::performWrite(
   msg.msg_namelen = 0;
   msg.msg_iov = const_cast<iovec *>(vec);
   msg.msg_iovlen = std::min<size_t>(count, kIovMax);
-  msg.msg_control = nullptr;
-  msg.msg_controllen = 0;
   msg.msg_flags = 0;
+  msg.msg_controllen = sendMsgParamCallback_->getAncillaryDataSize(flags);
+  CHECK_GE(AsyncSocket::SendMsgParamsCallback::maxAncillaryDataSize,
+           msg.msg_controllen);
 
-  int msg_flags = MSG_DONTWAIT;
-
-#ifdef MSG_NOSIGNAL // Linux-only
-  msg_flags |= MSG_NOSIGNAL;
-  if (isSet(flags, WriteFlags::CORK)) {
-    // MSG_MORE tells the kernel we have more data to send, so wait for us to
-    // give it the rest of the data rather than immediately sending a partial
-    // frame, even when TCP_NODELAY is enabled.
-    msg_flags |= MSG_MORE;
-  }
-#endif
-  if (isSet(flags, WriteFlags::EOR)) {
-    // marks that this is the last byte of a record (response)
-    msg_flags |= MSG_EOR;
+  if (msg.msg_controllen != 0) {
+    msg.msg_control = reinterpret_cast<char*>(alloca(msg.msg_controllen));
+    sendMsgParamCallback_->getAncillaryData(flags, msg.msg_control);
+  } else {
+    msg.msg_control = nullptr;
   }
+  int msg_flags = sendMsgParamCallback_->getFlags(flags);
+
   auto writeResult = sendSocketMessage(fd_, &msg, msg_flags);
   auto totalWritten = writeResult.writeReturn;
   if (totalWritten < 0) {
@@ -1944,13 +2181,13 @@ AsyncSocket::WriteResult AsyncSocket::performWrite(
  * and call all currently installed callbacks.  After an error, the
  * AsyncSocket is completely unregistered.
  *
- * @return Returns true on succcess, or false on error.
+ * @return Returns true on success, or false on error.
  */
 bool AsyncSocket::updateEventRegistration() {
   VLOG(5) << "AsyncSocket::updateEventRegistration(this=" << this
           << ", fd=" << fd_ << ", evb=" << eventBase_ << ", state=" << state_
           << ", events=" << std::hex << eventFlags_;
-  assert(eventBase_->isInEventBaseThread());
+  eventBase_->dcheckIsInEventBaseThread();
   if (eventFlags_ == EventHandler::NONE) {
     ioHandler_.unregisterHandler();
     return true;
@@ -2066,6 +2303,23 @@ void AsyncSocket::failRead(const char* fn, const AsyncSocketException& ex) {
   finishFail();
 }
 
+void AsyncSocket::failErrMessageRead(const char* fn,
+                                     const AsyncSocketException& ex) {
+  VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
+               << state_ << " host=" << addr_.describe()
+               << "): failed while reading message in " << fn << "(): "
+               << ex.what();
+  startFail();
+
+  if (errMessageCallback_ != nullptr) {
+    ErrMessageCallback* callback = errMessageCallback_;
+    errMessageCallback_ = nullptr;
+    callback->errMessageError(ex);
+  }
+
+  finishFail();
+}
+
 void AsyncSocket::failWrite(const char* fn, const AsyncSocketException& ex) {
   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_ << ", state="
                << state_ << " host=" << addr_.describe()
@@ -2125,7 +2379,7 @@ void AsyncSocket::failAllWrites(const AsyncSocketException& ex) {
 
 void AsyncSocket::invalidState(ConnectCallback* callback) {
   VLOG(5) << "AsyncSocket(this=" << this << ", fd=" << fd_
-             << "): connect() called in invalid state " << state_;
+          << "): connect() called in invalid state " << state_;
 
   /*
    * The invalidState() methods don't use the normal failure mechanisms,
@@ -2153,6 +2407,29 @@ void AsyncSocket::invalidState(ConnectCallback* callback) {
   }
 }
 
+void AsyncSocket::invalidState(ErrMessageCallback* callback) {
+  VLOG(4) << "AsyncSocket(this=" << this << ", fd=" << fd_
+          << "): setErrMessageCB(" << callback
+          << ") called in invalid state " << state_;
+
+  AsyncSocketException ex(
+      AsyncSocketException::NOT_OPEN,
+      msgErrQueueSupported
+      ? "setErrMessageCB() called with socket in invalid state"
+      : "This platform does not support socket error message notifications");
+  if (state_ == StateEnum::CLOSED || state_ == StateEnum::ERROR) {
+    if (callback) {
+      callback->errMessageError(ex);
+    }
+  } else {
+    startFail();
+    if (callback) {
+      callback->errMessageError(ex);
+    }
+    finishFail();
+  }
+}
+
 void AsyncSocket::invokeConnectErr(const AsyncSocketException& ex) {
   connectEndTime_ = std::chrono::steady_clock::now();
   if (connectCallback_) {