Support read and write from blocking and non-blocking pipes and sockets
authorChristopher Dykes <cdykes@fb.com>
Tue, 9 Aug 2016 23:58:56 +0000 (16:58 -0700)
committerFacebook Github Bot 3 <facebook-github-bot-3-bot@fb.com>
Wed, 10 Aug 2016 00:08:32 +0000 (17:08 -0700)
Summary: Also switch `pipe` to return a socket pair instead, as libevent can't poll against a pipe on Windows. And lastly, fix the file descriptor for sockets leaking when close is called.

Reviewed By: yfeldblum

Differential Revision: D3691255

fbshipit-source-id: c684242d255ac5158a4c805d432d345dac1b3bd8

folly/portability/Unistd.cpp
folly/portability/Unistd.h

index 33a8f7a4ec9db165427e960c106a3e72bbcaf461..27fc7c7723b4b58b65d2cac09668b62ba5dc5b79 100755 (executable)
@@ -54,10 +54,24 @@ int access(char const* fn, int am) { return _access(fn, am); }
 
 int chdir(const char* path) { return _chdir(path); }
 
+// We aren't hooking into the internals of the CRT, nope, not at all.
+extern "C" int __cdecl _free_osfhnd(int const fh);
 int close(int fh) {
   if (folly::portability::sockets::is_fh_socket(fh)) {
     SOCKET h = (SOCKET)_get_osfhandle(fh);
-    return closesocket(h);
+
+    // If we were to just call _close on the descriptor, it would
+    // close the HANDLE, but it wouldn't free any of the resources
+    // associated to the SOCKET, and we can't call _close after
+    // calling closesocket, because closesocket has already closed
+    // the HANDLE, and _close would attempt to close the HANDLE
+    // again, resulting in a double free.
+    // Luckily though, there is a function in the internals of the
+    // CRT that is used to free only the file descriptor, so we
+    // can call that to avoid leaking the file descriptor itself.
+    auto c = closesocket(h);
+    _free_osfhnd(fh);
+    return c;
   }
   return _close(fh);
 }
@@ -114,7 +128,11 @@ long lseek(int fh, long off, int orig) { return _lseek(fh, off, orig); }
 
 int rmdir(const char* path) { return _rmdir(path); }
 
-int pipe(int* pth) { return _pipe(pth, 0, _O_BINARY); }
+int pipe(int pth[2]) {
+  // We need to be able to listen to pipes with
+  // libevent, so they need to be actual sockets.
+  return socketpair(PF_UNIX, SOCK_STREAM, 0, pth);
+}
 
 int pread(int fd, void* buf, size_t count, off_t offset) {
   return wrapPositional(_read, fd, offset, buf, (unsigned int)count);
@@ -124,7 +142,26 @@ int pwrite(int fd, const void* buf, size_t count, off_t offset) {
   return wrapPositional(_write, fd, offset, buf, (unsigned int)count);
 }
 
-int read(int fh, void* buf, unsigned int mcc) { return _read(fh, buf, mcc); }
+int read(int fh, void* buf, unsigned int mcc) {
+  if (folly::portability::sockets::is_fh_socket(fh)) {
+    SOCKET s = (SOCKET)_get_osfhandle(fh);
+    if (s != INVALID_SOCKET) {
+      auto r = folly::portability::sockets::recv(fh, buf, (size_t)mcc, 0);
+      if (r == -1 && WSAGetLastError() == WSAEWOULDBLOCK) {
+        errno = EAGAIN;
+      }
+      return r;
+    }
+  }
+  auto r = _read(fh, buf, mcc);
+  if (r == -1 && GetLastError() == ERROR_NO_DATA) {
+    // This only happens if the file was non-blocking and
+    // no data was present. We have to translate the error
+    // to a form that the rest of the world is expecting.
+    errno = EAGAIN;
+  }
+  return r;
+}
 
 ssize_t readlink(const char* path, char* buf, size_t buflen) {
   if (!buflen) {
@@ -198,8 +235,36 @@ int usleep(unsigned int ms) {
   return 0;
 }
 
-int write(int fh, void const* buf, unsigned int mcc) {
-  return _write(fh, buf, mcc);
+int write(int fh, void const* buf, unsigned int count) {
+  if (folly::portability::sockets::is_fh_socket(fh)) {
+    SOCKET s = (SOCKET)_get_osfhandle(fh);
+    if (s != INVALID_SOCKET) {
+      auto r = folly::portability::sockets::send(fh, buf, (size_t)count, 0);
+      if (r == -1 && WSAGetLastError() == WSAEWOULDBLOCK) {
+        errno = EAGAIN;
+      }
+      return r;
+    }
+  }
+  auto r = _write(fh, buf, count);
+  if ((r > 0 && r != count) || (r == -1 && errno == ENOSPC)) {
+    // Writing to a pipe with a full buffer doesn't generate
+    // any error type, unless it caused us to write exactly 0
+    // bytes, so we have to see if we have a pipe first. We
+    // don't touch the errno for anything else.
+    HANDLE h = (HANDLE)_get_osfhandle(fh);
+    if (GetFileType(h) == FILE_TYPE_PIPE) {
+      DWORD state = 0;
+      if (GetNamedPipeHandleState(
+              h, &state, nullptr, nullptr, nullptr, nullptr, 0)) {
+        if ((state & PIPE_NOWAIT) == PIPE_NOWAIT) {
+          errno = EAGAIN;
+          return -1;
+        }
+      }
+    }
+  }
+  return r;
 }
 }
 }
index 717fe03d30bca714385fea50f316a2f96d2bd258..4b0097e3d72b502c711e29ef69e717bc192905a6 100755 (executable)
@@ -71,7 +71,7 @@ int lockf(int fd, int cmd, off_t len);
 long lseek(int fh, long off, int orig);
 int read(int fh, void* buf, unsigned int mcc);
 int rmdir(const char* path);
-int pipe(int* pth);
+int pipe(int pth[2]);
 int pread(int fd, void* buf, size_t count, off_t offset);
 int pwrite(int fd, const void* buf, size_t count, off_t offset);
 ssize_t readlink(const char* path, char* buf, size_t buflen);