Implementing a callback interface for folly::AsyncSocket allowing to supply an ancill...
[folly.git] / folly / io / async / test / AsyncSocketTest2.cpp
index c42406a8a9e6d3a29dc7cdfb7ababf33ca4a089e..a1988722ef2e41d66231c9483684abcc72fe9008 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+#include <folly/io/async/test/AsyncSocketTest2.h>
+
 #include <folly/ExceptionWrapper.h>
-#include <folly/RWSpinLock.h>
 #include <folly/Random.h>
 #include <folly/SocketAddress.h>
-#include <folly/io/async/AsyncServerSocket.h>
 #include <folly/io/async/AsyncSocket.h>
 #include <folly/io/async/AsyncTimeout.h>
 #include <folly/io/async/EventBase.h>
@@ -50,6 +51,7 @@ using std::chrono::milliseconds;
 using boost::scoped_array;
 
 using namespace folly;
+using namespace folly::test;
 using namespace testing;
 
 namespace fsp = folly::portability::sockets;
@@ -944,7 +946,7 @@ TEST(AsyncSocketTest, ConnectCallbackWrite) {
   testConnectOptWrite(100, 200);
 
   // Test using a large buffer in the connect callback, that should block
-  const size_t largeSize = 8*1024*1024;
+  const size_t largeSize = 32 * 1024 * 1024;
   testConnectOptWrite(100, largeSize);
 
   // Test using a large initial write
@@ -1013,8 +1015,12 @@ TEST(AsyncSocketTest, WriteTimeout) {
     AsyncSocket::newSocket(&evb, server.getAddress(), 30);
   evb.loop(); // loop until the socket is connected
 
-  // write() a large chunk of data, with no-one on the other end reading
-  size_t writeLength = 8*1024*1024;
+  // write() a large chunk of data, with no-one on the other end reading.
+  // Tricky: the kernel caches the connection metrics for recently-used
+  // routes (see tcp_no_metrics_save) so a freshly opened connection can
+  // have a send buffer size bigger than wmem_default.  This makes the test
+  // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
+  size_t writeLength = 32 * 1024 * 1024;
   uint32_t timeout = 200;
   socket->setSendTimeout(timeout);
   scoped_array<char> buf(new char[writeLength]);
@@ -1071,7 +1077,7 @@ TEST(AsyncSocketTest, WritePipeError) {
   acceptedSocket->close();
 
   // write() a large chunk of data
-  size_t writeLength = 8*1024*1024;
+  size_t writeLength = 32 * 1024 * 1024;
   scoped_array<char> buf(new char[writeLength]);
   memset(buf.get(), 'a', writeLength);
   WriteCallback wcb;
@@ -1173,6 +1179,13 @@ TEST(AsyncSocketTest, WriteIOBuf) {
   ReadCallback rcb;
   acceptedSocket->setReadCB(&rcb);
 
+  // Check if EOR tracking flag can be set and reset.
+  EXPECT_FALSE(socket->isEorTrackingEnabled());
+  socket->setEorTracking(true);
+  EXPECT_TRUE(socket->isEorTrackingEnabled());
+  socket->setEorTracking(false);
+  EXPECT_FALSE(socket->isEorTrackingEnabled());
+
   // Write a simple buffer to the socket
   constexpr size_t simpleBufLength = 5;
   char simpleBuf[simpleBufLength];
@@ -1573,214 +1586,6 @@ TEST(AsyncSocket, ConnectReadUninstallRead) {
 ///////////////////////////////////////////////////////////////////////////
 // AsyncServerSocket tests
 ///////////////////////////////////////////////////////////////////////////
-namespace {
-/**
- * Helper ConnectionEventCallback class for the test code.
- * It maintains counters protected by a spin lock.
- */
-class TestConnectionEventCallback :
-  public AsyncServerSocket::ConnectionEventCallback {
- public:
-  virtual void onConnectionAccepted(
-      const int /* socket */,
-      const SocketAddress& /* addr */) noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    connectionAccepted_++;
-  }
-
-  virtual void onConnectionAcceptError(const int /* err */) noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    connectionAcceptedError_++;
-  }
-
-  virtual void onConnectionDropped(
-      const int /* socket */,
-      const SocketAddress& /* addr */) noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    connectionDropped_++;
-  }
-
-  virtual void onConnectionEnqueuedForAcceptorCallback(
-      const int /* socket */,
-      const SocketAddress& /* addr */) noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    connectionEnqueuedForAcceptCallback_++;
-  }
-
-  virtual void onConnectionDequeuedByAcceptorCallback(
-      const int /* socket */,
-      const SocketAddress& /* addr */) noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    connectionDequeuedByAcceptCallback_++;
-  }
-
-  virtual void onBackoffStarted() noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    backoffStarted_++;
-  }
-
-  virtual void onBackoffEnded() noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    backoffEnded_++;
-  }
-
-  virtual void onBackoffError() noexcept override {
-    folly::RWSpinLock::WriteHolder holder(spinLock_);
-    backoffError_++;
-  }
-
-  unsigned int getConnectionAccepted() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return connectionAccepted_;
-  }
-
-  unsigned int getConnectionAcceptedError() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return connectionAcceptedError_;
-  }
-
-  unsigned int getConnectionDropped() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return connectionDropped_;
-  }
-
-  unsigned int getConnectionEnqueuedForAcceptCallback() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return connectionEnqueuedForAcceptCallback_;
-  }
-
-  unsigned int getConnectionDequeuedByAcceptCallback() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return connectionDequeuedByAcceptCallback_;
-  }
-
-  unsigned int getBackoffStarted() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return backoffStarted_;
-  }
-
-  unsigned int getBackoffEnded() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return backoffEnded_;
-  }
-
-  unsigned int getBackoffError() const {
-    folly::RWSpinLock::ReadHolder holder(spinLock_);
-    return backoffError_;
-  }
-
- private:
-  mutable folly::RWSpinLock spinLock_;
-  unsigned int connectionAccepted_{0};
-  unsigned int connectionAcceptedError_{0};
-  unsigned int connectionDropped_{0};
-  unsigned int connectionEnqueuedForAcceptCallback_{0};
-  unsigned int connectionDequeuedByAcceptCallback_{0};
-  unsigned int backoffStarted_{0};
-  unsigned int backoffEnded_{0};
-  unsigned int backoffError_{0};
-};
-
-/**
- * Helper AcceptCallback class for the test code
- * It records the callbacks that were invoked, and also supports calling
- * generic std::function objects in each callback.
- */
-class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
- public:
-  enum EventType {
-    TYPE_START,
-    TYPE_ACCEPT,
-    TYPE_ERROR,
-    TYPE_STOP
-  };
-  struct EventInfo {
-    EventInfo(int fd, const folly::SocketAddress& addr)
-      : type(TYPE_ACCEPT),
-        fd(fd),
-        address(addr),
-        errorMsg() {}
-    explicit EventInfo(const std::string& msg)
-      : type(TYPE_ERROR),
-        fd(-1),
-        address(),
-        errorMsg(msg) {}
-    explicit EventInfo(EventType et)
-      : type(et),
-        fd(-1),
-        address(),
-        errorMsg() {}
-
-    EventType type;
-    int fd;  // valid for TYPE_ACCEPT
-    folly::SocketAddress address;  // valid for TYPE_ACCEPT
-    string errorMsg;  // valid for TYPE_ERROR
-  };
-  typedef std::deque<EventInfo> EventList;
-
-  TestAcceptCallback()
-    : connectionAcceptedFn_(),
-      acceptErrorFn_(),
-      acceptStoppedFn_(),
-      events_() {}
-
-  std::deque<EventInfo>* getEvents() {
-    return &events_;
-  }
-
-  void setConnectionAcceptedFn(
-      const std::function<void(int, const folly::SocketAddress&)>& fn) {
-    connectionAcceptedFn_ = fn;
-  }
-  void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
-    acceptErrorFn_ = fn;
-  }
-  void setAcceptStartedFn(const std::function<void()>& fn) {
-    acceptStartedFn_ = fn;
-  }
-  void setAcceptStoppedFn(const std::function<void()>& fn) {
-    acceptStoppedFn_ = fn;
-  }
-
-  void connectionAccepted(
-      int fd, const folly::SocketAddress& clientAddr) noexcept override {
-    events_.emplace_back(fd, clientAddr);
-
-    if (connectionAcceptedFn_) {
-      connectionAcceptedFn_(fd, clientAddr);
-    }
-  }
-  void acceptError(const std::exception& ex) noexcept override {
-    events_.emplace_back(ex.what());
-
-    if (acceptErrorFn_) {
-      acceptErrorFn_(ex);
-    }
-  }
-  void acceptStarted() noexcept override {
-    events_.emplace_back(TYPE_START);
-
-    if (acceptStartedFn_) {
-      acceptStartedFn_();
-    }
-  }
-  void acceptStopped() noexcept override {
-    events_.emplace_back(TYPE_STOP);
-
-    if (acceptStoppedFn_) {
-      acceptStoppedFn_();
-    }
-  }
-
- private:
-  std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
-  std::function<void(const std::exception&)> acceptErrorFn_;
-  std::function<void()> acceptStartedFn_;
-  std::function<void()> acceptStoppedFn_;
-
-  std::deque<EventInfo> events_;
-};
-}
 
 /**
  * Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
@@ -2992,4 +2797,399 @@ TEST(AsyncSocketTest, ConnectTFOWithBigData) {
   EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
 }
 
-#endif
+#endif // FOLLY_ALLOW_TFO
+
+class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback {
+ public:
+  MOCK_METHOD1(evbAttached, void(AsyncSocket*));
+  MOCK_METHOD1(evbDetached, void(AsyncSocket*));
+};
+
+TEST(AsyncSocketTest, EvbCallbacks) {
+  auto cb = folly::make_unique<MockEvbChangeCallback>();
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+  InSequence seq;
+  EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
+  EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
+
+  socket->setEvbChangedCallback(std::move(cb));
+  socket->detachEventBase();
+  socket->attachEventBase(&evb);
+}
+
+#ifdef MSG_ERRQUEUE
+/* copied from include/uapi/linux/net_tstamp.h */
+/* SO_TIMESTAMPING gets an integer bit field comprised of these values */
+enum SOF_TIMESTAMPING {
+  SOF_TIMESTAMPING_SOFTWARE = (1 << 4),
+  SOF_TIMESTAMPING_OPT_ID = (1 << 7),
+  SOF_TIMESTAMPING_TX_SCHED = (1 << 8),
+  SOF_TIMESTAMPING_OPT_CMSG = (1 << 10),
+  SOF_TIMESTAMPING_OPT_TSONLY = (1 << 11),
+};
+TEST(AsyncSocketTest, ErrMessageCallback) {
+  TestServer server;
+
+  // connect()
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+  LOG(INFO) << "Client socket fd=" << socket->getFd();
+
+  // Let the socket
+  evb.loop();
+
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+  // Set read callback to keep the socket subscribed for event
+  // notifications. Though we're no planning to read anything from
+  // this side of the connection.
+  ReadCallback rcb(1);
+  socket->setReadCB(&rcb);
+
+  // Set up timestamp callbacks
+  TestErrMessageCallback errMsgCB;
+  socket->setErrMessageCB(&errMsgCB);
+  ASSERT_EQ(socket->getErrMessageCallback(),
+            static_cast<folly::AsyncSocket::ErrMessageCallback*>(&errMsgCB));
+
+  // Enable timestamp notifications
+  ASSERT_GT(socket->getFd(), 0);
+  int flags = SOF_TIMESTAMPING_OPT_ID
+              | SOF_TIMESTAMPING_OPT_TSONLY
+              | SOF_TIMESTAMPING_SOFTWARE
+              | SOF_TIMESTAMPING_OPT_CMSG
+              | SOF_TIMESTAMPING_TX_SCHED;
+  AsyncSocket::OptionKey tstampingOpt = {SOL_SOCKET, SO_TIMESTAMPING};
+  EXPECT_EQ(tstampingOpt.apply(socket->getFd(), flags), 0);
+
+  // write()
+  std::vector<uint8_t> wbuf(128, 'a');
+  WriteCallback wcb;
+  socket->write(&wcb, wbuf.data(), wbuf.size());
+
+  // Accept the connection.
+  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+  LOG(INFO) << "Server socket fd=" << acceptedSocket->getSocketFD();
+
+  // Loop
+  evb.loopOnce();
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+
+  // Check that we can read the data that was written to the socket
+  std::vector<uint8_t> rbuf(1 + wbuf.size(), 0);
+  uint32_t bytesRead = acceptedSocket->read(rbuf.data(), rbuf.size());
+  ASSERT_TRUE(std::equal(wbuf.begin(), wbuf.end(), rbuf.begin()));
+  ASSERT_EQ(bytesRead, wbuf.size());
+
+  // Close both sockets
+  acceptedSocket->close();
+  socket->close();
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
+
+  // Check for the timestamp notifications.
+  ASSERT_EQ(errMsgCB.exception_.type_, folly::AsyncSocketException::UNKNOWN);
+  ASSERT_TRUE(errMsgCB.gotByteSeq_);
+  ASSERT_TRUE(errMsgCB.gotTimestamp_);
+}
+#endif // MSG_ERRQUEUE
+
+TEST(AsyncSocket, PreReceivedData) {
+  TestServer server;
+
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->connect(nullptr, server.getAddress(), 30);
+  evb.loop();
+
+  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+  auto acceptedSocket = server.acceptAsync(&evb);
+
+  ReadCallback peekCallback(2);
+  ReadCallback readCallback;
+  peekCallback.dataAvailableCallback = [&]() {
+    peekCallback.verifyData("he", 2);
+    acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("h"));
+    acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("e"));
+    acceptedSocket->setReadCB(nullptr);
+    acceptedSocket->setReadCB(&readCallback);
+  };
+  readCallback.dataAvailableCallback = [&]() {
+    if (readCallback.dataRead() == 5) {
+      readCallback.verifyData("hello", 5);
+      acceptedSocket->setReadCB(nullptr);
+    }
+  };
+
+  acceptedSocket->setReadCB(&peekCallback);
+
+  evb.loop();
+}
+
+TEST(AsyncSocket, PreReceivedDataOnly) {
+  TestServer server;
+
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->connect(nullptr, server.getAddress(), 30);
+  evb.loop();
+
+  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+  auto acceptedSocket = server.acceptAsync(&evb);
+
+  ReadCallback peekCallback;
+  ReadCallback readCallback;
+  peekCallback.dataAvailableCallback = [&]() {
+    peekCallback.verifyData("hello", 5);
+    acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
+    acceptedSocket->setReadCB(&readCallback);
+  };
+  readCallback.dataAvailableCallback = [&]() {
+    readCallback.verifyData("hello", 5);
+    acceptedSocket->setReadCB(nullptr);
+  };
+
+  acceptedSocket->setReadCB(&peekCallback);
+
+  evb.loop();
+}
+
+TEST(AsyncSocket, PreReceivedDataPartial) {
+  TestServer server;
+
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->connect(nullptr, server.getAddress(), 30);
+  evb.loop();
+
+  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+  auto acceptedSocket = server.acceptAsync(&evb);
+
+  ReadCallback peekCallback;
+  ReadCallback smallReadCallback(3);
+  ReadCallback normalReadCallback;
+  peekCallback.dataAvailableCallback = [&]() {
+    peekCallback.verifyData("hello", 5);
+    acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
+    acceptedSocket->setReadCB(&smallReadCallback);
+  };
+  smallReadCallback.dataAvailableCallback = [&]() {
+    smallReadCallback.verifyData("hel", 3);
+    acceptedSocket->setReadCB(&normalReadCallback);
+  };
+  normalReadCallback.dataAvailableCallback = [&]() {
+    normalReadCallback.verifyData("lo", 2);
+    acceptedSocket->setReadCB(nullptr);
+  };
+
+  acceptedSocket->setReadCB(&peekCallback);
+
+  evb.loop();
+}
+
+TEST(AsyncSocket, PreReceivedDataTakeover) {
+  TestServer server;
+
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+  socket->connect(nullptr, server.getAddress(), 30);
+  evb.loop();
+
+  socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+  auto acceptedSocket =
+      AsyncSocket::UniquePtr(new AsyncSocket(&evb, server.acceptFD()));
+  AsyncSocket::UniquePtr takeoverSocket;
+
+  ReadCallback peekCallback(3);
+  ReadCallback readCallback;
+  peekCallback.dataAvailableCallback = [&]() {
+    peekCallback.verifyData("hel", 3);
+    acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
+    acceptedSocket->setReadCB(nullptr);
+    takeoverSocket =
+        AsyncSocket::UniquePtr(new AsyncSocket(std::move(acceptedSocket)));
+    takeoverSocket->setReadCB(&readCallback);
+  };
+  readCallback.dataAvailableCallback = [&]() {
+    readCallback.verifyData("hello", 5);
+    takeoverSocket->setReadCB(nullptr);
+  };
+
+  acceptedSocket->setReadCB(&peekCallback);
+
+  evb.loop();
+}
+
+TEST(AsyncSocketTest, SendMessageFlags) {
+  TestServer server;
+  TestSendMsgParamsCallback sendMsgCB(
+      MSG_DONTWAIT|MSG_NOSIGNAL|MSG_MORE, 0, nullptr);
+
+  // connect()
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+  ConnCallback ccb;
+  socket->connect(&ccb, server.getAddress(), 30);
+  std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+
+  evb.loop();
+  ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+  // Set SendMsgParamsCallback
+  socket->setSendMsgParamCB(&sendMsgCB);
+  ASSERT_EQ(socket->getSendMsgParamsCB(), &sendMsgCB);
+
+  // Write the first portion of data. This data is expected to be
+  // sent out immediately.
+  std::vector<uint8_t> buf(128, 'a');
+  WriteCallback wcb;
+  sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL);
+  socket->write(&wcb, buf.data(), buf.size());
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+  ASSERT_TRUE(sendMsgCB.queriedFlags_);
+  ASSERT_FALSE(sendMsgCB.queriedData_);
+
+  // Using different flags for the second write operation.
+  // MSG_MORE flag is expected to delay sending this
+  // data to the wire.
+  sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE);
+  socket->write(&wcb, buf.data(), buf.size());
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+  ASSERT_TRUE(sendMsgCB.queriedFlags_);
+  ASSERT_FALSE(sendMsgCB.queriedData_);
+
+  // Make sure the accepted socket saw only the data from
+  // the first write request.
+  std::vector<uint8_t> readbuf(2 * buf.size());
+  uint32_t bytesRead = acceptedSocket->read(readbuf.data(), readbuf.size());
+  ASSERT_TRUE(std::equal(buf.begin(), buf.end(), readbuf.begin()));
+  ASSERT_EQ(bytesRead, buf.size());
+
+  // Make sure the server got a connection and received the data
+  acceptedSocket->close();
+  socket->close();
+
+  ASSERT_TRUE(socket->isClosedBySelf());
+  ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+TEST(AsyncSocketTest, SendMessageAncillaryData) {
+  struct sockaddr_un addr = {AF_UNIX,
+                             "AsyncSocketTest.SendMessageAncillaryData\0"};
+
+  // Clean up the name in the name space we're going to use
+  ASSERT_FALSE(remove(addr.sun_path) == -1 && errno != ENOENT);
+
+  // Set up listening socket
+  int lfd = fsp::socket(AF_UNIX, SOCK_STREAM, 0);
+  ASSERT_NE(lfd, -1);
+  ASSERT_NE(bind(lfd, (struct sockaddr*)&addr, sizeof(addr)), -1)
+      << "Bind failed: " << errno;
+
+  // Create the connecting socket
+  int csd = fsp::socket(AF_UNIX, SOCK_STREAM, 0);
+  ASSERT_NE(csd, -1);
+
+  // Listen for incoming connect
+  ASSERT_NE(listen(lfd, 5), -1);
+
+  // Connect to the listening socket
+  ASSERT_NE(fsp::connect(csd, (struct sockaddr*)&addr, sizeof(addr)), -1)
+      << "Connect request failed: " << errno;
+
+  // Accept the connection
+  int sfd = accept(lfd, nullptr, nullptr);
+  ASSERT_NE(sfd, -1);
+
+  // Instantiate AsyncSocket object for the connected socket
+  EventBase evb;
+  std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, csd);
+
+  // Open a temporary file and write a magic string to it
+  // We'll transfer the file handle to test the message parameters
+  // callback logic.
+  int tmpfd = open("/var/tmp", O_RDWR | O_TMPFILE);
+  ASSERT_NE(tmpfd, -1) << "Failed to open a temporary file";
+  std::string magicString("Magic string");
+  ASSERT_EQ(write(tmpfd, magicString.c_str(), magicString.length()),
+            magicString.length());
+
+  // Send message
+  union {
+    // Space large enough to hold an 'int'
+    char control[CMSG_SPACE(sizeof(int))];
+    struct cmsghdr cmh;
+  } s_u;
+  s_u.cmh.cmsg_len = CMSG_LEN(sizeof(int));
+  s_u.cmh.cmsg_level = SOL_SOCKET;
+  s_u.cmh.cmsg_type = SCM_RIGHTS;
+  memcpy(CMSG_DATA(&s_u.cmh), &tmpfd, sizeof(int));
+
+  // Set up the callback providing message parameters
+  TestSendMsgParamsCallback sendMsgCB(
+      MSG_DONTWAIT | MSG_NOSIGNAL, sizeof(s_u.control), s_u.control);
+  socket->setSendMsgParamCB(&sendMsgCB);
+
+  // We must transmit at least 1 byte of real data in order
+  // to send ancillary data
+  int s_data = 12345;
+  WriteCallback wcb;
+  socket->write(&wcb, &s_data, sizeof(s_data));
+  ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+
+  // Receive the message
+  union {
+    // Space large enough to hold an 'int'
+    char control[CMSG_SPACE(sizeof(int))];
+    struct cmsghdr cmh;
+  } r_u;
+  struct msghdr msgh;
+  struct iovec iov;
+  int r_data = 0;
+
+  msgh.msg_control = r_u.control;
+  msgh.msg_controllen = sizeof(r_u.control);
+  msgh.msg_name = nullptr;
+  msgh.msg_namelen = 0;
+  msgh.msg_iov = &iov;
+  msgh.msg_iovlen = 1;
+  iov.iov_base = &r_data;
+  iov.iov_len = sizeof(r_data);
+
+  // Receive data
+  ASSERT_NE(recvmsg(sfd, &msgh, 0), -1) << "recvmsg failed: " << errno;
+
+  // Validate the received message
+  ASSERT_EQ(r_u.cmh.cmsg_len, CMSG_LEN(sizeof(int)));
+  ASSERT_EQ(r_u.cmh.cmsg_level, SOL_SOCKET);
+  ASSERT_EQ(r_u.cmh.cmsg_type, SCM_RIGHTS);
+  ASSERT_EQ(r_data, s_data);
+  int fd = 0;
+  memcpy(&fd, CMSG_DATA(&r_u.cmh), sizeof(int));
+  ASSERT_NE(fd, 0);
+
+  std::vector<uint8_t> transferredMagicString(magicString.length() + 1, 0);
+
+  // Reposition to the beginning of the file
+  ASSERT_EQ(0, lseek(fd, 0, SEEK_SET));
+
+  // Read the magic string back, and compare it with the original
+  ASSERT_EQ(
+      magicString.length(),
+      read(fd, transferredMagicString.data(), transferredMagicString.size()));
+  ASSERT_TRUE(std::equal(
+      magicString.begin(),
+      magicString.end(),
+      transferredMagicString.begin()));
+}