Add support for iovec for UDP send in folly::AsyncUDPSocket
authorNaizhi Li <naizhi@fb.com>
Fri, 13 Mar 2015 17:32:19 +0000 (10:32 -0700)
committerAndre Azevedo <aap@fb.com>
Wed, 18 Mar 2015 03:18:45 +0000 (20:18 -0700)
Summary:
I will be using multi-buf UDP send soon, so adding the
support in folly::AsyncUDPSocket

Test Plan: Unit tests and turn server

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, folly-diffs@, yfeldblum

FB internal diff: D1907189

Signature: t1:1907189:1426266951:046198e0a009fef085ac7eb44f054c67dfb16ba3

folly/io/IOBuf.cpp
folly/io/IOBuf.h
folly/io/async/AsyncSocket.cpp
folly/io/async/AsyncUDPSocket.cpp

index c62d60d0a382f79021c1ad522342c17679cb6d88..ff60ac83419b4e9ceb266fef714c74f543bccbe8 100644 (file)
@@ -885,6 +885,24 @@ void IOBuf::appendToIov(folly::fbvector<struct iovec>* iov) const {
   } while (p != this);
 }
 
+size_t IOBuf::fillIov(struct iovec* iov, size_t len) const {
+  IOBuf const* p = this;
+  size_t i = 0;
+  while (i < len) {
+    // some code can get confused by empty iovs, so skip them
+    if (p->length() > 0) {
+      iov[i].iov_base = const_cast<uint8_t*>(p->data());
+      iov[i].iov_len = p->length();
+      i++;
+    }
+    p = p->next();
+    if (p == this) {
+      return i;
+    }
+  }
+  return 0;
+}
+
 size_t IOBufHash::operator()(const IOBuf& buf) const {
   folly::hash::SpookyHashV2 hasher;
   hasher.Init(0, 0);
index 8b0673dc096db48f0b953805c7c5338eefd70cb9..e5fd07b38146736198922f6000a42e3e6e8ecab3 100644 (file)
@@ -1052,6 +1052,18 @@ class IOBuf {
    */
   void appendToIov(folly::fbvector<struct iovec>* iov) const;
 
+  /**
+   * Fill an iovec array with the IOBuf data.
+   *
+   * Returns the number of iovec filled. If there are more buffer than
+   * iovec, returns 0. This version is suitable to use with stack iovec
+   * arrays.
+   *
+   * Naturally, the filled iovec data will be invalid if you modify the
+   * buffer chain.
+   */
+  size_t fillIov(struct iovec* iov, size_t len) const;
+
   /*
    * Overridden operator new and delete.
    * These perform specialized memory management to help support
index 3906fffa403583b39473b8a7c405ec56e7fd4808..f4f0838740ea91a1fab60f312f4614af06d634e0 100644 (file)
@@ -622,21 +622,8 @@ void AsyncSocket::writeChain(WriteCallback* callback, unique_ptr<IOBuf>&& buf,
 
 void AsyncSocket::writeChainImpl(WriteCallback* callback, iovec* vec,
     size_t count, unique_ptr<IOBuf>&& buf, WriteFlags flags) {
-  const IOBuf* head = buf.get();
-  const IOBuf* next = head;
-  unsigned i = 0;
-  do {
-    vec[i].iov_base = const_cast<uint8_t *>(next->data());
-    vec[i].iov_len = next->length();
-    // IOBuf can get confused by empty iovec buffers, so increment the
-    // output pointer only if the iovec buffer is non-empty.  We could
-    // end the loop with i < count, but that's ok.
-    if (vec[i].iov_len != 0) {
-      i++;
-    }
-    next = next->next();
-  } while (next != head);
-  writeImpl(callback, vec, i, std::move(buf), flags);
+  size_t veclen = buf->fillIov(vec, count);
+  writeImpl(callback, vec, veclen, std::move(buf), flags);
 }
 
 void AsyncSocket::writeImpl(WriteCallback* callback, const iovec* vec,
index ed2ad3764797b9b63545cb13119d07c0f881c821..8bea5a5a809622c384f00f0ee946a2c4fcdde573 100644 (file)
@@ -17,6 +17,7 @@
 #include <folly/io/async/AsyncUDPSocket.h>
 
 #include <folly/io/async/EventBase.h>
+#include <folly/Likely.h>
 
 #include <errno.h>
 #include <unistd.h>
@@ -130,19 +131,32 @@ ssize_t AsyncUDPSocket::write(const folly::SocketAddress& address,
                                const std::unique_ptr<folly::IOBuf>& buf) {
   CHECK_NE(-1, fd_) << "Socket not yet bound";
 
-  // XXX: Use `sendmsg` instead of coalescing here
-  buf->coalesce();
+  // UDP's typical MTU size is 1500, so high number of buffers
+  //   really do not make sense. Optimze for buffer chains with
+  //   buffers less than 16, which is the highest I can think of
+  //   for a real use case.
+  iovec vec[16];
+  size_t iovec_len = buf->fillIov(vec, sizeof(vec)/sizeof(vec[0]));
+  if (UNLIKELY(iovec_len == 0)) {
+    buf->coalesce();
+    vec[0].iov_base = const_cast<uint8_t*>(buf->data());
+    vec[0].iov_len = buf->length();
+    iovec_len = 1;
+  }
 
   sockaddr_storage addrStorage;
   address.getAddress(&addrStorage);
-  sockaddr* saddr = reinterpret_cast<sockaddr*>(&addrStorage);
 
-  return ::sendto(fd_,
-                  buf->data(),
-                  buf->length(),
-                  MSG_DONTWAIT,
-                  saddr,
-                  address.getActualSize());
+  struct msghdr msg;
+  msg.msg_name = reinterpret_cast<void*>(&addrStorage);
+  msg.msg_namelen = address.getActualSize();
+  msg.msg_iov = vec;
+  msg.msg_iovlen = iovec_len;
+  msg.msg_control = nullptr;
+  msg.msg_controllen = 0;
+  msg.msg_flags = 0;
+
+  return ::sendmsg(fd_, &msg, 0);
 }
 
 void AsyncUDPSocket::resumeRead(ReadCallback* cob) {