/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+#include <folly/io/async/test/AsyncSocketTest2.h>
+
#include <folly/ExceptionWrapper.h>
-#include <folly/RWSpinLock.h>
#include <folly/Random.h>
#include <folly/SocketAddress.h>
-#include <folly/io/async/AsyncServerSocket.h>
#include <folly/io/async/AsyncSocket.h>
#include <folly/io/async/AsyncTimeout.h>
#include <folly/io/async/EventBase.h>
using boost::scoped_array;
using namespace folly;
+using namespace folly::test;
using namespace testing;
namespace fsp = folly::portability::sockets;
testConnectOptWrite(100, 200);
// Test using a large buffer in the connect callback, that should block
- const size_t largeSize = 8*1024*1024;
+ const size_t largeSize = 32 * 1024 * 1024;
testConnectOptWrite(100, largeSize);
// Test using a large initial write
AsyncSocket::newSocket(&evb, server.getAddress(), 30);
evb.loop(); // loop until the socket is connected
- // write() a large chunk of data, with no-one on the other end reading
- size_t writeLength = 8*1024*1024;
+ // write() a large chunk of data, with no-one on the other end reading.
+ // Tricky: the kernel caches the connection metrics for recently-used
+ // routes (see tcp_no_metrics_save) so a freshly opened connection can
+ // have a send buffer size bigger than wmem_default. This makes the test
+ // flaky on contbuild if writeLength is < wmem_max (20M on our systems).
+ size_t writeLength = 32 * 1024 * 1024;
uint32_t timeout = 200;
socket->setSendTimeout(timeout);
scoped_array<char> buf(new char[writeLength]);
acceptedSocket->close();
// write() a large chunk of data
- size_t writeLength = 8*1024*1024;
+ size_t writeLength = 32 * 1024 * 1024;
scoped_array<char> buf(new char[writeLength]);
memset(buf.get(), 'a', writeLength);
WriteCallback wcb;
ReadCallback rcb;
acceptedSocket->setReadCB(&rcb);
+ // Check if EOR tracking flag can be set and reset.
+ EXPECT_FALSE(socket->isEorTrackingEnabled());
+ socket->setEorTracking(true);
+ EXPECT_TRUE(socket->isEorTrackingEnabled());
+ socket->setEorTracking(false);
+ EXPECT_FALSE(socket->isEorTrackingEnabled());
+
// Write a simple buffer to the socket
constexpr size_t simpleBufLength = 5;
char simpleBuf[simpleBufLength];
///////////////////////////////////////////////////////////////////////////
// AsyncServerSocket tests
///////////////////////////////////////////////////////////////////////////
-namespace {
-/**
- * Helper ConnectionEventCallback class for the test code.
- * It maintains counters protected by a spin lock.
- */
-class TestConnectionEventCallback :
- public AsyncServerSocket::ConnectionEventCallback {
- public:
- virtual void onConnectionAccepted(
- const int /* socket */,
- const SocketAddress& /* addr */) noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- connectionAccepted_++;
- }
-
- virtual void onConnectionAcceptError(const int /* err */) noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- connectionAcceptedError_++;
- }
-
- virtual void onConnectionDropped(
- const int /* socket */,
- const SocketAddress& /* addr */) noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- connectionDropped_++;
- }
-
- virtual void onConnectionEnqueuedForAcceptorCallback(
- const int /* socket */,
- const SocketAddress& /* addr */) noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- connectionEnqueuedForAcceptCallback_++;
- }
-
- virtual void onConnectionDequeuedByAcceptorCallback(
- const int /* socket */,
- const SocketAddress& /* addr */) noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- connectionDequeuedByAcceptCallback_++;
- }
-
- virtual void onBackoffStarted() noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- backoffStarted_++;
- }
-
- virtual void onBackoffEnded() noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- backoffEnded_++;
- }
-
- virtual void onBackoffError() noexcept override {
- folly::RWSpinLock::WriteHolder holder(spinLock_);
- backoffError_++;
- }
-
- unsigned int getConnectionAccepted() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return connectionAccepted_;
- }
-
- unsigned int getConnectionAcceptedError() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return connectionAcceptedError_;
- }
-
- unsigned int getConnectionDropped() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return connectionDropped_;
- }
-
- unsigned int getConnectionEnqueuedForAcceptCallback() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return connectionEnqueuedForAcceptCallback_;
- }
-
- unsigned int getConnectionDequeuedByAcceptCallback() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return connectionDequeuedByAcceptCallback_;
- }
-
- unsigned int getBackoffStarted() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return backoffStarted_;
- }
-
- unsigned int getBackoffEnded() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return backoffEnded_;
- }
-
- unsigned int getBackoffError() const {
- folly::RWSpinLock::ReadHolder holder(spinLock_);
- return backoffError_;
- }
-
- private:
- mutable folly::RWSpinLock spinLock_;
- unsigned int connectionAccepted_{0};
- unsigned int connectionAcceptedError_{0};
- unsigned int connectionDropped_{0};
- unsigned int connectionEnqueuedForAcceptCallback_{0};
- unsigned int connectionDequeuedByAcceptCallback_{0};
- unsigned int backoffStarted_{0};
- unsigned int backoffEnded_{0};
- unsigned int backoffError_{0};
-};
-
-/**
- * Helper AcceptCallback class for the test code
- * It records the callbacks that were invoked, and also supports calling
- * generic std::function objects in each callback.
- */
-class TestAcceptCallback : public AsyncServerSocket::AcceptCallback {
- public:
- enum EventType {
- TYPE_START,
- TYPE_ACCEPT,
- TYPE_ERROR,
- TYPE_STOP
- };
- struct EventInfo {
- EventInfo(int fd, const folly::SocketAddress& addr)
- : type(TYPE_ACCEPT),
- fd(fd),
- address(addr),
- errorMsg() {}
- explicit EventInfo(const std::string& msg)
- : type(TYPE_ERROR),
- fd(-1),
- address(),
- errorMsg(msg) {}
- explicit EventInfo(EventType et)
- : type(et),
- fd(-1),
- address(),
- errorMsg() {}
-
- EventType type;
- int fd; // valid for TYPE_ACCEPT
- folly::SocketAddress address; // valid for TYPE_ACCEPT
- string errorMsg; // valid for TYPE_ERROR
- };
- typedef std::deque<EventInfo> EventList;
-
- TestAcceptCallback()
- : connectionAcceptedFn_(),
- acceptErrorFn_(),
- acceptStoppedFn_(),
- events_() {}
-
- std::deque<EventInfo>* getEvents() {
- return &events_;
- }
-
- void setConnectionAcceptedFn(
- const std::function<void(int, const folly::SocketAddress&)>& fn) {
- connectionAcceptedFn_ = fn;
- }
- void setAcceptErrorFn(const std::function<void(const std::exception&)>& fn) {
- acceptErrorFn_ = fn;
- }
- void setAcceptStartedFn(const std::function<void()>& fn) {
- acceptStartedFn_ = fn;
- }
- void setAcceptStoppedFn(const std::function<void()>& fn) {
- acceptStoppedFn_ = fn;
- }
-
- void connectionAccepted(
- int fd, const folly::SocketAddress& clientAddr) noexcept override {
- events_.emplace_back(fd, clientAddr);
-
- if (connectionAcceptedFn_) {
- connectionAcceptedFn_(fd, clientAddr);
- }
- }
- void acceptError(const std::exception& ex) noexcept override {
- events_.emplace_back(ex.what());
-
- if (acceptErrorFn_) {
- acceptErrorFn_(ex);
- }
- }
- void acceptStarted() noexcept override {
- events_.emplace_back(TYPE_START);
-
- if (acceptStartedFn_) {
- acceptStartedFn_();
- }
- }
- void acceptStopped() noexcept override {
- events_.emplace_back(TYPE_STOP);
-
- if (acceptStoppedFn_) {
- acceptStoppedFn_();
- }
- }
-
- private:
- std::function<void(int, const folly::SocketAddress&)> connectionAcceptedFn_;
- std::function<void(const std::exception&)> acceptErrorFn_;
- std::function<void()> acceptStartedFn_;
- std::function<void()> acceptStoppedFn_;
-
- std::deque<EventInfo> events_;
-};
-}
/**
* Make sure accepted sockets have O_NONBLOCK and TCP_NODELAY set
EXPECT_EQ(socket->getTFOFinished(), socket->getTFOSucceded());
}
-#endif
+#endif // FOLLY_ALLOW_TFO
+
+class MockEvbChangeCallback : public AsyncSocket::EvbChangeCallback {
+ public:
+ MOCK_METHOD1(evbAttached, void(AsyncSocket*));
+ MOCK_METHOD1(evbDetached, void(AsyncSocket*));
+};
+
+TEST(AsyncSocketTest, EvbCallbacks) {
+ auto cb = folly::make_unique<MockEvbChangeCallback>();
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+ InSequence seq;
+ EXPECT_CALL(*cb, evbDetached(socket.get())).Times(1);
+ EXPECT_CALL(*cb, evbAttached(socket.get())).Times(1);
+
+ socket->setEvbChangedCallback(std::move(cb));
+ socket->detachEventBase();
+ socket->attachEventBase(&evb);
+}
+
+#ifdef MSG_ERRQUEUE
+/* copied from include/uapi/linux/net_tstamp.h */
+/* SO_TIMESTAMPING gets an integer bit field comprised of these values */
+enum SOF_TIMESTAMPING {
+ SOF_TIMESTAMPING_SOFTWARE = (1 << 4),
+ SOF_TIMESTAMPING_OPT_ID = (1 << 7),
+ SOF_TIMESTAMPING_TX_SCHED = (1 << 8),
+ SOF_TIMESTAMPING_OPT_CMSG = (1 << 10),
+ SOF_TIMESTAMPING_OPT_TSONLY = (1 << 11),
+};
+TEST(AsyncSocketTest, ErrMessageCallback) {
+ TestServer server;
+
+ // connect()
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+ LOG(INFO) << "Client socket fd=" << socket->getFd();
+
+ // Let the socket
+ evb.loop();
+
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+ // Set read callback to keep the socket subscribed for event
+ // notifications. Though we're no planning to read anything from
+ // this side of the connection.
+ ReadCallback rcb(1);
+ socket->setReadCB(&rcb);
+
+ // Set up timestamp callbacks
+ TestErrMessageCallback errMsgCB;
+ socket->setErrMessageCB(&errMsgCB);
+ ASSERT_EQ(socket->getErrMessageCallback(),
+ static_cast<folly::AsyncSocket::ErrMessageCallback*>(&errMsgCB));
+
+ // Enable timestamp notifications
+ ASSERT_GT(socket->getFd(), 0);
+ int flags = SOF_TIMESTAMPING_OPT_ID
+ | SOF_TIMESTAMPING_OPT_TSONLY
+ | SOF_TIMESTAMPING_SOFTWARE
+ | SOF_TIMESTAMPING_OPT_CMSG
+ | SOF_TIMESTAMPING_TX_SCHED;
+ AsyncSocket::OptionKey tstampingOpt = {SOL_SOCKET, SO_TIMESTAMPING};
+ EXPECT_EQ(tstampingOpt.apply(socket->getFd(), flags), 0);
+
+ // write()
+ std::vector<uint8_t> wbuf(128, 'a');
+ WriteCallback wcb;
+ socket->write(&wcb, wbuf.data(), wbuf.size());
+
+ // Accept the connection.
+ std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+ LOG(INFO) << "Server socket fd=" << acceptedSocket->getSocketFD();
+
+ // Loop
+ evb.loopOnce();
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+
+ // Check that we can read the data that was written to the socket
+ std::vector<uint8_t> rbuf(1 + wbuf.size(), 0);
+ uint32_t bytesRead = acceptedSocket->read(rbuf.data(), rbuf.size());
+ ASSERT_TRUE(std::equal(wbuf.begin(), wbuf.end(), rbuf.begin()));
+ ASSERT_EQ(bytesRead, wbuf.size());
+
+ // Close both sockets
+ acceptedSocket->close();
+ socket->close();
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+
+ // Check for the timestamp notifications.
+ ASSERT_EQ(errMsgCB.exception_.type_, folly::AsyncSocketException::UNKNOWN);
+ ASSERT_TRUE(errMsgCB.gotByteSeq_);
+ ASSERT_TRUE(errMsgCB.gotTimestamp_);
+}
+#endif // MSG_ERRQUEUE
+
+TEST(AsyncSocket, PreReceivedData) {
+ TestServer server;
+
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->connect(nullptr, server.getAddress(), 30);
+ evb.loop();
+
+ socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+ auto acceptedSocket = server.acceptAsync(&evb);
+
+ ReadCallback peekCallback(2);
+ ReadCallback readCallback;
+ peekCallback.dataAvailableCallback = [&]() {
+ peekCallback.verifyData("he", 2);
+ acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("h"));
+ acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("e"));
+ acceptedSocket->setReadCB(nullptr);
+ acceptedSocket->setReadCB(&readCallback);
+ };
+ readCallback.dataAvailableCallback = [&]() {
+ if (readCallback.dataRead() == 5) {
+ readCallback.verifyData("hello", 5);
+ acceptedSocket->setReadCB(nullptr);
+ }
+ };
+
+ acceptedSocket->setReadCB(&peekCallback);
+
+ evb.loop();
+}
+
+TEST(AsyncSocket, PreReceivedDataOnly) {
+ TestServer server;
+
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->connect(nullptr, server.getAddress(), 30);
+ evb.loop();
+
+ socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+ auto acceptedSocket = server.acceptAsync(&evb);
+
+ ReadCallback peekCallback;
+ ReadCallback readCallback;
+ peekCallback.dataAvailableCallback = [&]() {
+ peekCallback.verifyData("hello", 5);
+ acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
+ acceptedSocket->setReadCB(&readCallback);
+ };
+ readCallback.dataAvailableCallback = [&]() {
+ readCallback.verifyData("hello", 5);
+ acceptedSocket->setReadCB(nullptr);
+ };
+
+ acceptedSocket->setReadCB(&peekCallback);
+
+ evb.loop();
+}
+
+TEST(AsyncSocket, PreReceivedDataPartial) {
+ TestServer server;
+
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->connect(nullptr, server.getAddress(), 30);
+ evb.loop();
+
+ socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+ auto acceptedSocket = server.acceptAsync(&evb);
+
+ ReadCallback peekCallback;
+ ReadCallback smallReadCallback(3);
+ ReadCallback normalReadCallback;
+ peekCallback.dataAvailableCallback = [&]() {
+ peekCallback.verifyData("hello", 5);
+ acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
+ acceptedSocket->setReadCB(&smallReadCallback);
+ };
+ smallReadCallback.dataAvailableCallback = [&]() {
+ smallReadCallback.verifyData("hel", 3);
+ acceptedSocket->setReadCB(&normalReadCallback);
+ };
+ normalReadCallback.dataAvailableCallback = [&]() {
+ normalReadCallback.verifyData("lo", 2);
+ acceptedSocket->setReadCB(nullptr);
+ };
+
+ acceptedSocket->setReadCB(&peekCallback);
+
+ evb.loop();
+}
+
+TEST(AsyncSocket, PreReceivedDataTakeover) {
+ TestServer server;
+
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+ socket->connect(nullptr, server.getAddress(), 30);
+ evb.loop();
+
+ socket->writeChain(nullptr, IOBuf::copyBuffer("hello"));
+
+ auto acceptedSocket =
+ AsyncSocket::UniquePtr(new AsyncSocket(&evb, server.acceptFD()));
+ AsyncSocket::UniquePtr takeoverSocket;
+
+ ReadCallback peekCallback(3);
+ ReadCallback readCallback;
+ peekCallback.dataAvailableCallback = [&]() {
+ peekCallback.verifyData("hel", 3);
+ acceptedSocket->setPreReceivedData(IOBuf::copyBuffer("hello"));
+ acceptedSocket->setReadCB(nullptr);
+ takeoverSocket =
+ AsyncSocket::UniquePtr(new AsyncSocket(std::move(acceptedSocket)));
+ takeoverSocket->setReadCB(&readCallback);
+ };
+ readCallback.dataAvailableCallback = [&]() {
+ readCallback.verifyData("hello", 5);
+ takeoverSocket->setReadCB(nullptr);
+ };
+
+ acceptedSocket->setReadCB(&peekCallback);
+
+ evb.loop();
+}
+
+TEST(AsyncSocketTest, SendMessageFlags) {
+ TestServer server;
+ TestSendMsgParamsCallback sendMsgCB(
+ MSG_DONTWAIT|MSG_NOSIGNAL|MSG_MORE, 0, nullptr);
+
+ // connect()
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);
+
+ ConnCallback ccb;
+ socket->connect(&ccb, server.getAddress(), 30);
+ std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
+
+ evb.loop();
+ ASSERT_EQ(ccb.state, STATE_SUCCEEDED);
+
+ // Set SendMsgParamsCallback
+ socket->setSendMsgParamCB(&sendMsgCB);
+ ASSERT_EQ(socket->getSendMsgParamsCB(), &sendMsgCB);
+
+ // Write the first portion of data. This data is expected to be
+ // sent out immediately.
+ std::vector<uint8_t> buf(128, 'a');
+ WriteCallback wcb;
+ sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL);
+ socket->write(&wcb, buf.data(), buf.size());
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_TRUE(sendMsgCB.queriedFlags_);
+ ASSERT_FALSE(sendMsgCB.queriedData_);
+
+ // Using different flags for the second write operation.
+ // MSG_MORE flag is expected to delay sending this
+ // data to the wire.
+ sendMsgCB.reset(MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE);
+ socket->write(&wcb, buf.data(), buf.size());
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+ ASSERT_TRUE(sendMsgCB.queriedFlags_);
+ ASSERT_FALSE(sendMsgCB.queriedData_);
+
+ // Make sure the accepted socket saw only the data from
+ // the first write request.
+ std::vector<uint8_t> readbuf(2 * buf.size());
+ uint32_t bytesRead = acceptedSocket->read(readbuf.data(), readbuf.size());
+ ASSERT_TRUE(std::equal(buf.begin(), buf.end(), readbuf.begin()));
+ ASSERT_EQ(bytesRead, buf.size());
+
+ // Make sure the server got a connection and received the data
+ acceptedSocket->close();
+ socket->close();
+
+ ASSERT_TRUE(socket->isClosedBySelf());
+ ASSERT_FALSE(socket->isClosedByPeer());
+}
+
+TEST(AsyncSocketTest, SendMessageAncillaryData) {
+ struct sockaddr_un addr = {AF_UNIX,
+ "AsyncSocketTest.SendMessageAncillaryData\0"};
+
+ // Clean up the name in the name space we're going to use
+ ASSERT_FALSE(remove(addr.sun_path) == -1 && errno != ENOENT);
+
+ // Set up listening socket
+ int lfd = fsp::socket(AF_UNIX, SOCK_STREAM, 0);
+ ASSERT_NE(lfd, -1);
+ ASSERT_NE(bind(lfd, (struct sockaddr*)&addr, sizeof(addr)), -1)
+ << "Bind failed: " << errno;
+
+ // Create the connecting socket
+ int csd = fsp::socket(AF_UNIX, SOCK_STREAM, 0);
+ ASSERT_NE(csd, -1);
+
+ // Listen for incoming connect
+ ASSERT_NE(listen(lfd, 5), -1);
+
+ // Connect to the listening socket
+ ASSERT_NE(fsp::connect(csd, (struct sockaddr*)&addr, sizeof(addr)), -1)
+ << "Connect request failed: " << errno;
+
+ // Accept the connection
+ int sfd = accept(lfd, nullptr, nullptr);
+ ASSERT_NE(sfd, -1);
+
+ // Instantiate AsyncSocket object for the connected socket
+ EventBase evb;
+ std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb, csd);
+
+ // Open a temporary file and write a magic string to it
+ // We'll transfer the file handle to test the message parameters
+ // callback logic.
+ int tmpfd = open("/var/tmp", O_RDWR | O_TMPFILE);
+ ASSERT_NE(tmpfd, -1) << "Failed to open a temporary file";
+ std::string magicString("Magic string");
+ ASSERT_EQ(write(tmpfd, magicString.c_str(), magicString.length()),
+ magicString.length());
+
+ // Send message
+ union {
+ // Space large enough to hold an 'int'
+ char control[CMSG_SPACE(sizeof(int))];
+ struct cmsghdr cmh;
+ } s_u;
+ s_u.cmh.cmsg_len = CMSG_LEN(sizeof(int));
+ s_u.cmh.cmsg_level = SOL_SOCKET;
+ s_u.cmh.cmsg_type = SCM_RIGHTS;
+ memcpy(CMSG_DATA(&s_u.cmh), &tmpfd, sizeof(int));
+
+ // Set up the callback providing message parameters
+ TestSendMsgParamsCallback sendMsgCB(
+ MSG_DONTWAIT | MSG_NOSIGNAL, sizeof(s_u.control), s_u.control);
+ socket->setSendMsgParamCB(&sendMsgCB);
+
+ // We must transmit at least 1 byte of real data in order
+ // to send ancillary data
+ int s_data = 12345;
+ WriteCallback wcb;
+ socket->write(&wcb, &s_data, sizeof(s_data));
+ ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
+
+ // Receive the message
+ union {
+ // Space large enough to hold an 'int'
+ char control[CMSG_SPACE(sizeof(int))];
+ struct cmsghdr cmh;
+ } r_u;
+ struct msghdr msgh;
+ struct iovec iov;
+ int r_data = 0;
+
+ msgh.msg_control = r_u.control;
+ msgh.msg_controllen = sizeof(r_u.control);
+ msgh.msg_name = nullptr;
+ msgh.msg_namelen = 0;
+ msgh.msg_iov = &iov;
+ msgh.msg_iovlen = 1;
+ iov.iov_base = &r_data;
+ iov.iov_len = sizeof(r_data);
+
+ // Receive data
+ ASSERT_NE(recvmsg(sfd, &msgh, 0), -1) << "recvmsg failed: " << errno;
+
+ // Validate the received message
+ ASSERT_EQ(r_u.cmh.cmsg_len, CMSG_LEN(sizeof(int)));
+ ASSERT_EQ(r_u.cmh.cmsg_level, SOL_SOCKET);
+ ASSERT_EQ(r_u.cmh.cmsg_type, SCM_RIGHTS);
+ ASSERT_EQ(r_data, s_data);
+ int fd = 0;
+ memcpy(&fd, CMSG_DATA(&r_u.cmh), sizeof(int));
+ ASSERT_NE(fd, 0);
+
+ std::vector<uint8_t> transferredMagicString(magicString.length() + 1, 0);
+
+ // Reposition to the beginning of the file
+ ASSERT_EQ(0, lseek(fd, 0, SEEK_SET));
+
+ // Read the magic string back, and compare it with the original
+ ASSERT_EQ(
+ magicString.length(),
+ read(fd, transferredMagicString.data(), transferredMagicString.size()));
+ ASSERT_TRUE(std::equal(
+ magicString.begin(),
+ magicString.end(),
+ transferredMagicString.begin()));
+}