Add element construction/destruction hooks to IndexedMemPool
[folly.git] / folly / Subprocess.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef _GNU_SOURCE
18 #define _GNU_SOURCE
19 #endif
20
21 #include <folly/Subprocess.h>
22
23 #if __linux__
24 #include <sys/prctl.h>
25 #endif
26 #include <fcntl.h>
27
28 #include <array>
29 #include <algorithm>
30 #include <system_error>
31
32 #include <boost/container/flat_set.hpp>
33 #include <boost/range/adaptors.hpp>
34
35 #include <glog/logging.h>
36
37 #include <folly/Assume.h>
38 #include <folly/Conv.h>
39 #include <folly/Exception.h>
40 #include <folly/ScopeGuard.h>
41 #include <folly/Shell.h>
42 #include <folly/String.h>
43 #include <folly/io/Cursor.h>
44 #include <folly/portability/Sockets.h>
45 #include <folly/portability/Stdlib.h>
46 #include <folly/portability/SysSyscall.h>
47 #include <folly/portability/Unistd.h>
48
49 constexpr int kExecFailure = 127;
50 constexpr int kChildFailure = 126;
51
52 namespace folly {
53
54 ProcessReturnCode::ProcessReturnCode(ProcessReturnCode&& p) noexcept
55   : rawStatus_(p.rawStatus_) {
56   p.rawStatus_ = ProcessReturnCode::RV_NOT_STARTED;
57 }
58
59 ProcessReturnCode& ProcessReturnCode::operator=(ProcessReturnCode&& p)
60     noexcept {
61   rawStatus_ = p.rawStatus_;
62   p.rawStatus_ = ProcessReturnCode::RV_NOT_STARTED;
63   return *this;
64 }
65
66 ProcessReturnCode::State ProcessReturnCode::state() const {
67   if (rawStatus_ == RV_NOT_STARTED) return NOT_STARTED;
68   if (rawStatus_ == RV_RUNNING) return RUNNING;
69   if (WIFEXITED(rawStatus_)) return EXITED;
70   if (WIFSIGNALED(rawStatus_)) return KILLED;
71   throw std::runtime_error(to<std::string>(
72       "Invalid ProcessReturnCode: ", rawStatus_));
73 }
74
75 void ProcessReturnCode::enforce(State expected) const {
76   State s = state();
77   if (s != expected) {
78     throw std::logic_error(to<std::string>(
79       "Bad use of ProcessReturnCode; state is ", s, " expected ", expected
80     ));
81   }
82 }
83
84 int ProcessReturnCode::exitStatus() const {
85   enforce(EXITED);
86   return WEXITSTATUS(rawStatus_);
87 }
88
89 int ProcessReturnCode::killSignal() const {
90   enforce(KILLED);
91   return WTERMSIG(rawStatus_);
92 }
93
94 bool ProcessReturnCode::coreDumped() const {
95   enforce(KILLED);
96   return WCOREDUMP(rawStatus_);
97 }
98
99 std::string ProcessReturnCode::str() const {
100   switch (state()) {
101   case NOT_STARTED:
102     return "not started";
103   case RUNNING:
104     return "running";
105   case EXITED:
106     return to<std::string>("exited with status ", exitStatus());
107   case KILLED:
108     return to<std::string>("killed by signal ", killSignal(),
109                            (coreDumped() ? " (core dumped)" : ""));
110   }
111   assume_unreachable();
112 }
113
114 CalledProcessError::CalledProcessError(ProcessReturnCode rc)
115   : returnCode_(rc),
116     what_(returnCode_.str()) {
117 }
118
119 SubprocessSpawnError::SubprocessSpawnError(const char* executable,
120                                            int errCode,
121                                            int errnoValue)
122   : errnoValue_(errnoValue),
123     what_(to<std::string>(errCode == kExecFailure ?
124                             "failed to execute " :
125                             "error preparing to execute ",
126                           executable, ": ", errnoStr(errnoValue))) {
127 }
128
129 namespace {
130
131 // Copy pointers to the given strings in a format suitable for posix_spawn
132 std::unique_ptr<const char*[]> cloneStrings(const std::vector<std::string>& s) {
133   std::unique_ptr<const char*[]> d(new const char*[s.size() + 1]);
134   for (size_t i = 0; i < s.size(); i++) {
135     d[i] = s[i].c_str();
136   }
137   d[s.size()] = nullptr;
138   return d;
139 }
140
141 // Check a wait() status, throw on non-successful
142 void checkStatus(ProcessReturnCode returnCode) {
143   if (returnCode.state() != ProcessReturnCode::EXITED ||
144       returnCode.exitStatus() != 0) {
145     throw CalledProcessError(returnCode);
146   }
147 }
148
149 }  // namespace
150
151 Subprocess::Options& Subprocess::Options::fd(int fd, int action) {
152   if (action == Subprocess::PIPE) {
153     if (fd == 0) {
154       action = Subprocess::PIPE_IN;
155     } else if (fd == 1 || fd == 2) {
156       action = Subprocess::PIPE_OUT;
157     } else {
158       throw std::invalid_argument(
159           to<std::string>("Only fds 0, 1, 2 are valid for action=PIPE: ", fd));
160     }
161   }
162   fdActions_[fd] = action;
163   return *this;
164 }
165
166 Subprocess::Subprocess() {}
167
168 Subprocess::Subprocess(
169     const std::vector<std::string>& argv,
170     const Options& options,
171     const char* executable,
172     const std::vector<std::string>* env) {
173   if (argv.empty()) {
174     throw std::invalid_argument("argv must not be empty");
175   }
176   if (!executable) executable = argv[0].c_str();
177   spawn(cloneStrings(argv), executable, options, env);
178 }
179
180 Subprocess::Subprocess(
181     const std::string& cmd,
182     const Options& options,
183     const std::vector<std::string>* env) {
184   if (options.usePath_) {
185     throw std::invalid_argument("usePath() not allowed when running in shell");
186   }
187
188   std::vector<std::string> argv = {"/bin/sh", "-c", cmd};
189   spawn(cloneStrings(argv), argv[0].c_str(), options, env);
190 }
191
192 Subprocess::~Subprocess() {
193   CHECK_NE(returnCode_.state(), ProcessReturnCode::RUNNING)
194     << "Subprocess destroyed without reaping child";
195 }
196
197 namespace {
198
199 struct ChildErrorInfo {
200   int errCode;
201   int errnoValue;
202 };
203
204 [[noreturn]] void childError(int errFd, int errCode, int errnoValue) {
205   ChildErrorInfo info = {errCode, errnoValue};
206   // Write the error information over the pipe to our parent process.
207   // We can't really do anything else if this write call fails.
208   writeNoInt(errFd, &info, sizeof(info));
209   // exit
210   _exit(errCode);
211 }
212
213 }  // namespace
214
215 void Subprocess::setAllNonBlocking() {
216   for (auto& p : pipes_) {
217     int fd = p.pipe.fd();
218     int flags = ::fcntl(fd, F_GETFL);
219     checkUnixError(flags, "fcntl");
220     int r = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
221     checkUnixError(r, "fcntl");
222   }
223 }
224
225 void Subprocess::spawn(
226     std::unique_ptr<const char*[]> argv,
227     const char* executable,
228     const Options& optionsIn,
229     const std::vector<std::string>* env) {
230   if (optionsIn.usePath_ && env) {
231     throw std::invalid_argument(
232         "usePath() not allowed when overriding environment");
233   }
234
235   // Make a copy, we'll mutate options
236   Options options(optionsIn);
237
238   // On error, close all pipes_ (ignoring errors, but that seems fine here).
239   auto pipesGuard = makeGuard([this] { pipes_.clear(); });
240
241   // Create a pipe to use to receive error information from the child,
242   // in case it fails before calling exec()
243   int errFds[2];
244 #if FOLLY_HAVE_PIPE2
245   checkUnixError(::pipe2(errFds, O_CLOEXEC), "pipe2");
246 #else
247   checkUnixError(::pipe(errFds), "pipe");
248 #endif
249   SCOPE_EXIT {
250     CHECK_ERR(::close(errFds[0]));
251     if (errFds[1] >= 0) {
252       CHECK_ERR(::close(errFds[1]));
253     }
254   };
255
256 #if !FOLLY_HAVE_PIPE2
257   // Ask the child to close the read end of the error pipe.
258   checkUnixError(fcntl(errFds[0], F_SETFD, FD_CLOEXEC), "set FD_CLOEXEC");
259   // Set the close-on-exec flag on the write side of the pipe.
260   // This way the pipe will be closed automatically in the child if execve()
261   // succeeds.  If the exec fails the child can write error information to the
262   // pipe.
263   checkUnixError(fcntl(errFds[1], F_SETFD, FD_CLOEXEC), "set FD_CLOEXEC");
264 #endif
265
266   // Perform the actual work of setting up pipes then forking and
267   // executing the child.
268   spawnInternal(std::move(argv), executable, options, env, errFds[1]);
269
270   // After spawnInternal() returns the child is alive.  We have to be very
271   // careful about throwing after this point.  We are inside the constructor,
272   // so if we throw the Subprocess object will have never existed, and the
273   // destructor will never be called.
274   //
275   // We should only throw if we got an error via the errFd, and we know the
276   // child has exited and can be immediately waited for.  In all other cases,
277   // we have no way of cleaning up the child.
278
279   // Close writable side of the errFd pipe in the parent process
280   CHECK_ERR(::close(errFds[1]));
281   errFds[1] = -1;
282
283   // Read from the errFd pipe, to tell if the child ran into any errors before
284   // calling exec()
285   readChildErrorPipe(errFds[0], executable);
286
287   // We have fully succeeded now, so release the guard on pipes_
288   pipesGuard.dismiss();
289 }
290
291 void Subprocess::spawnInternal(
292     std::unique_ptr<const char*[]> argv,
293     const char* executable,
294     Options& options,
295     const std::vector<std::string>* env,
296     int errFd) {
297   // Parent work, pre-fork: create pipes
298   std::vector<int> childFds;
299   // Close all of the childFds as we leave this scope
300   SCOPE_EXIT {
301     // These are only pipes, closing them shouldn't fail
302     for (int cfd : childFds) {
303       CHECK_ERR(::close(cfd));
304     }
305   };
306
307   int r;
308   for (auto& p : options.fdActions_) {
309     if (p.second == PIPE_IN || p.second == PIPE_OUT) {
310       int fds[2];
311       // We're setting both ends of the pipe as close-on-exec. The child
312       // doesn't need to reset the flag on its end, as we always dup2() the fd,
313       // and dup2() fds don't share the close-on-exec flag.
314 #if FOLLY_HAVE_PIPE2
315       // If possible, set close-on-exec atomically. Otherwise, a concurrent
316       // Subprocess invocation can fork() between "pipe" and "fnctl",
317       // causing FDs to leak.
318       r = ::pipe2(fds, O_CLOEXEC);
319       checkUnixError(r, "pipe2");
320 #else
321       r = ::pipe(fds);
322       checkUnixError(r, "pipe");
323       r = fcntl(fds[0], F_SETFD, FD_CLOEXEC);
324       checkUnixError(r, "set FD_CLOEXEC");
325       r = fcntl(fds[1], F_SETFD, FD_CLOEXEC);
326       checkUnixError(r, "set FD_CLOEXEC");
327 #endif
328       pipes_.emplace_back();
329       Pipe& pipe = pipes_.back();
330       pipe.direction = p.second;
331       int cfd;
332       if (p.second == PIPE_IN) {
333         // Child gets reading end
334         pipe.pipe = folly::File(fds[1], /*owns_fd=*/ true);
335         cfd = fds[0];
336       } else {
337         pipe.pipe = folly::File(fds[0], /*owns_fd=*/ true);
338         cfd = fds[1];
339       }
340       p.second = cfd;  // ensure it gets dup2()ed
341       pipe.childFd = p.first;
342       childFds.push_back(cfd);
343     }
344   }
345
346   // This should already be sorted, as options.fdActions_ is
347   DCHECK(std::is_sorted(pipes_.begin(), pipes_.end()));
348
349   // Note that the const casts below are legit, per
350   // http://pubs.opengroup.org/onlinepubs/009695399/functions/exec.html
351
352   char** argVec = const_cast<char**>(argv.get());
353
354   // Set up environment
355   std::unique_ptr<const char*[]> envHolder;
356   char** envVec;
357   if (env) {
358     envHolder = cloneStrings(*env);
359     envVec = const_cast<char**>(envHolder.get());
360   } else {
361     envVec = environ;
362   }
363
364   // Block all signals around vfork; see http://ewontfix.com/7/.
365   //
366   // As the child may run in the same address space as the parent until
367   // the actual execve() system call, any (custom) signal handlers that
368   // the parent has might alter parent's memory if invoked in the child,
369   // with undefined results.  So we block all signals in the parent before
370   // vfork(), which will cause them to be blocked in the child as well (we
371   // rely on the fact that Linux, just like all sane implementations, only
372   // clones the calling thread).  Then, in the child, we reset all signals
373   // to their default dispositions (while still blocked), and unblock them
374   // (so the exec()ed process inherits the parent's signal mask)
375   //
376   // The parent also unblocks all signals as soon as vfork() returns.
377   sigset_t allBlocked;
378   r = sigfillset(&allBlocked);
379   checkUnixError(r, "sigfillset");
380   sigset_t oldSignals;
381
382   r = pthread_sigmask(SIG_SETMASK, &allBlocked, &oldSignals);
383   checkPosixError(r, "pthread_sigmask");
384   SCOPE_EXIT {
385     // Restore signal mask
386     r = pthread_sigmask(SIG_SETMASK, &oldSignals, nullptr);
387     CHECK_EQ(r, 0) << "pthread_sigmask: " << errnoStr(r);  // shouldn't fail
388   };
389
390   // Call c_str() here, as it's not necessarily safe after fork.
391   const char* childDir =
392     options.childDir_.empty() ? nullptr : options.childDir_.c_str();
393
394   pid_t pid;
395 #ifdef __linux__
396   if (options.cloneFlags_) {
397     pid = syscall(SYS_clone, *options.cloneFlags_, 0, nullptr, nullptr);
398     checkUnixError(pid, errno, "clone");
399   } else {
400 #endif
401     pid = vfork();
402     checkUnixError(pid, errno, "vfork");
403 #ifdef __linux__
404   }
405 #endif
406   if (pid == 0) {
407     int errnoValue = prepareChild(options, &oldSignals, childDir);
408     if (errnoValue != 0) {
409       childError(errFd, kChildFailure, errnoValue);
410     }
411
412     errnoValue = runChild(executable, argVec, envVec, options);
413     // If we get here, exec() failed.
414     childError(errFd, kExecFailure, errnoValue);
415   }
416
417   // Child is alive.  We have to be very careful about throwing after this
418   // point.  We are inside the constructor, so if we throw the Subprocess
419   // object will have never existed, and the destructor will never be called.
420   //
421   // We should only throw if we got an error via the errFd, and we know the
422   // child has exited and can be immediately waited for.  In all other cases,
423   // we have no way of cleaning up the child.
424   pid_ = pid;
425   returnCode_ = ProcessReturnCode(RV_RUNNING);
426 }
427
428 int Subprocess::prepareChild(const Options& options,
429                              const sigset_t* sigmask,
430                              const char* childDir) const {
431   // While all signals are blocked, we must reset their
432   // dispositions to default.
433   for (int sig = 1; sig < NSIG; ++sig) {
434     ::signal(sig, SIG_DFL);
435   }
436
437   {
438     // Unblock signals; restore signal mask.
439     int r = pthread_sigmask(SIG_SETMASK, sigmask, nullptr);
440     if (r != 0) {
441       return r;  // pthread_sigmask() returns an errno value
442     }
443   }
444
445   // Change the working directory, if one is given
446   if (childDir) {
447     if (::chdir(childDir) == -1) {
448       return errno;
449     }
450   }
451
452   // We don't have to explicitly close the parent's end of all pipes,
453   // as they all have the FD_CLOEXEC flag set and will be closed at
454   // exec time.
455
456   // Close all fds that we're supposed to close.
457   for (auto& p : options.fdActions_) {
458     if (p.second == CLOSE) {
459       if (::close(p.first) == -1) {
460         return errno;
461       }
462     } else if (p.second != p.first) {
463       if (::dup2(p.second, p.first) == -1) {
464         return errno;
465       }
466     }
467   }
468
469   // If requested, close all other file descriptors.  Don't close
470   // any fds in options.fdActions_, and don't touch stdin, stdout, stderr.
471   // Ignore errors.
472   if (options.closeOtherFds_) {
473     for (int fd = getdtablesize() - 1; fd >= 3; --fd) {
474       if (options.fdActions_.count(fd) == 0) {
475         ::close(fd);
476       }
477     }
478   }
479
480 #if __linux__
481   // Opt to receive signal on parent death, if requested
482   if (options.parentDeathSignal_ != 0) {
483     const auto parentDeathSignal =
484         static_cast<unsigned long>(options.parentDeathSignal_);
485     if (prctl(PR_SET_PDEATHSIG, parentDeathSignal, 0, 0, 0) == -1) {
486       return errno;
487     }
488   }
489 #endif
490
491   if (options.processGroupLeader_) {
492     if (setpgrp() == -1) {
493       return errno;
494     }
495   }
496
497   // The user callback comes last, so that the child is otherwise all set up.
498   if (options.dangerousPostForkPreExecCallback_) {
499     if (int error = (*options.dangerousPostForkPreExecCallback_)()) {
500       return error;
501     }
502   }
503
504   return 0;
505 }
506
507 int Subprocess::runChild(const char* executable,
508                          char** argv, char** env,
509                          const Options& options) const {
510   // Now, finally, exec.
511   if (options.usePath_) {
512     ::execvp(executable, argv);
513   } else {
514     ::execve(executable, argv, env);
515   }
516   return errno;
517 }
518
519 void Subprocess::readChildErrorPipe(int pfd, const char* executable) {
520   ChildErrorInfo info;
521   auto rc = readNoInt(pfd, &info, sizeof(info));
522   if (rc == 0) {
523     // No data means the child executed successfully, and the pipe
524     // was closed due to the close-on-exec flag being set.
525     return;
526   } else if (rc != sizeof(ChildErrorInfo)) {
527     // An error occurred trying to read from the pipe, or we got a partial read.
528     // Neither of these cases should really occur in practice.
529     //
530     // We can't get any error data from the child in this case, and we don't
531     // know if it is successfully running or not.  All we can do is to return
532     // normally, as if the child executed successfully.  If something bad
533     // happened the caller should at least get a non-normal exit status from
534     // the child.
535     LOG(ERROR) << "unexpected error trying to read from child error pipe " <<
536       "rc=" << rc << ", errno=" << errno;
537     return;
538   }
539
540   // We got error data from the child.  The child should exit immediately in
541   // this case, so wait on it to clean up.
542   wait();
543
544   // Throw to signal the error
545   throw SubprocessSpawnError(executable, info.errCode, info.errnoValue);
546 }
547
548 ProcessReturnCode Subprocess::poll(struct rusage* ru) {
549   returnCode_.enforce(ProcessReturnCode::RUNNING);
550   DCHECK_GT(pid_, 0);
551   int status;
552   pid_t found = ::wait4(pid_, &status, WNOHANG, ru);
553   // The spec guarantees that EINTR does not occur with WNOHANG, so the only
554   // two remaining errors are ECHILD (other code reaped the child?), or
555   // EINVAL (cosmic rays?), both of which merit an abort:
556   PCHECK(found != -1) << "waitpid(" << pid_ << ", &status, WNOHANG)";
557   if (found != 0) {
558     // Though the child process had quit, this call does not close the pipes
559     // since its descendants may still be using them.
560     returnCode_ = ProcessReturnCode(status);
561     pid_ = -1;
562   }
563   return returnCode_;
564 }
565
566 bool Subprocess::pollChecked() {
567   if (poll().state() == ProcessReturnCode::RUNNING) {
568     return false;
569   }
570   checkStatus(returnCode_);
571   return true;
572 }
573
574 ProcessReturnCode Subprocess::wait() {
575   returnCode_.enforce(ProcessReturnCode::RUNNING);
576   DCHECK_GT(pid_, 0);
577   int status;
578   pid_t found;
579   do {
580     found = ::waitpid(pid_, &status, 0);
581   } while (found == -1 && errno == EINTR);
582   // The only two remaining errors are ECHILD (other code reaped the
583   // child?), or EINVAL (cosmic rays?), and both merit an abort:
584   PCHECK(found != -1) << "waitpid(" << pid_ << ", &status, WNOHANG)";
585   // Though the child process had quit, this call does not close the pipes
586   // since its descendants may still be using them.
587   DCHECK_EQ(found, pid_);
588   returnCode_ = ProcessReturnCode(status);
589   pid_ = -1;
590   return returnCode_;
591 }
592
593 void Subprocess::waitChecked() {
594   wait();
595   checkStatus(returnCode_);
596 }
597
598 void Subprocess::sendSignal(int signal) {
599   returnCode_.enforce(ProcessReturnCode::RUNNING);
600   int r = ::kill(pid_, signal);
601   checkUnixError(r, "kill");
602 }
603
604 pid_t Subprocess::pid() const {
605   return pid_;
606 }
607
608 namespace {
609
610 ByteRange queueFront(const IOBufQueue& queue) {
611   auto* p = queue.front();
612   if (!p) {
613     return ByteRange{};
614   }
615   return io::Cursor(p).peekBytes();
616 }
617
618 // fd write
619 bool handleWrite(int fd, IOBufQueue& queue) {
620   for (;;) {
621     auto b = queueFront(queue);
622     if (b.empty()) {
623       return true;  // EOF
624     }
625
626     ssize_t n = writeNoInt(fd, b.data(), b.size());
627     if (n == -1 && errno == EAGAIN) {
628       return false;
629     }
630     checkUnixError(n, "write");
631     queue.trimStart(n);
632   }
633 }
634
635 // fd read
636 bool handleRead(int fd, IOBufQueue& queue) {
637   for (;;) {
638     auto p = queue.preallocate(100, 65000);
639     ssize_t n = readNoInt(fd, p.first, p.second);
640     if (n == -1 && errno == EAGAIN) {
641       return false;
642     }
643     checkUnixError(n, "read");
644     if (n == 0) {
645       return true;
646     }
647     queue.postallocate(n);
648   }
649 }
650
651 bool discardRead(int fd) {
652   static const size_t bufSize = 65000;
653   // Thread unsafe, but it doesn't matter.
654   static std::unique_ptr<char[]> buf(new char[bufSize]);
655
656   for (;;) {
657     ssize_t n = readNoInt(fd, buf.get(), bufSize);
658     if (n == -1 && errno == EAGAIN) {
659       return false;
660     }
661     checkUnixError(n, "read");
662     if (n == 0) {
663       return true;
664     }
665   }
666 }
667
668 }  // namespace
669
670 std::pair<std::string, std::string> Subprocess::communicate(
671     StringPiece input) {
672   IOBufQueue inputQueue;
673   inputQueue.wrapBuffer(input.data(), input.size());
674
675   auto outQueues = communicateIOBuf(std::move(inputQueue));
676   auto outBufs = std::make_pair(outQueues.first.move(),
677                                 outQueues.second.move());
678   std::pair<std::string, std::string> out;
679   if (outBufs.first) {
680     outBufs.first->coalesce();
681     out.first.assign(reinterpret_cast<const char*>(outBufs.first->data()),
682                      outBufs.first->length());
683   }
684   if (outBufs.second) {
685     outBufs.second->coalesce();
686     out.second.assign(reinterpret_cast<const char*>(outBufs.second->data()),
687                      outBufs.second->length());
688   }
689   return out;
690 }
691
692 std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
693     IOBufQueue input) {
694   // If the user supplied a non-empty input buffer, make sure
695   // that stdin is a pipe so we can write the data.
696   if (!input.empty()) {
697     // findByChildFd() will throw std::invalid_argument if no pipe for
698     // STDIN_FILENO exists
699     findByChildFd(STDIN_FILENO);
700   }
701
702   std::pair<IOBufQueue, IOBufQueue> out;
703
704   auto readCallback = [&] (int pfd, int cfd) -> bool {
705     if (cfd == STDOUT_FILENO) {
706       return handleRead(pfd, out.first);
707     } else if (cfd == STDERR_FILENO) {
708       return handleRead(pfd, out.second);
709     } else {
710       // Don't close the file descriptor, the child might not like SIGPIPE,
711       // just read and throw the data away.
712       return discardRead(pfd);
713     }
714   };
715
716   auto writeCallback = [&] (int pfd, int cfd) -> bool {
717     if (cfd == STDIN_FILENO) {
718       return handleWrite(pfd, input);
719     } else {
720       // If we don't want to write to this fd, just close it.
721       return true;
722     }
723   };
724
725   communicate(std::move(readCallback), std::move(writeCallback));
726
727   return out;
728 }
729
730 void Subprocess::communicate(FdCallback readCallback,
731                              FdCallback writeCallback) {
732   // This serves to prevent wait() followed by communicate(), but if you
733   // legitimately need that, send a patch to delete this line.
734   returnCode_.enforce(ProcessReturnCode::RUNNING);
735   setAllNonBlocking();
736
737   std::vector<pollfd> fds;
738   fds.reserve(pipes_.size());
739   std::vector<size_t> toClose;  // indexes into pipes_
740   toClose.reserve(pipes_.size());
741
742   while (!pipes_.empty()) {
743     fds.clear();
744     toClose.clear();
745
746     for (auto& p : pipes_) {
747       pollfd pfd;
748       pfd.fd = p.pipe.fd();
749       // Yes, backwards, PIPE_IN / PIPE_OUT are defined from the
750       // child's point of view.
751       if (!p.enabled) {
752         // Still keeping fd in watched set so we get notified of POLLHUP /
753         // POLLERR
754         pfd.events = 0;
755       } else if (p.direction == PIPE_IN) {
756         pfd.events = POLLOUT;
757       } else {
758         pfd.events = POLLIN;
759       }
760       fds.push_back(pfd);
761     }
762
763     int r;
764     do {
765       r = ::poll(fds.data(), fds.size(), -1);
766     } while (r == -1 && errno == EINTR);
767     checkUnixError(r, "poll");
768
769     for (size_t i = 0; i < pipes_.size(); ++i) {
770       auto& p = pipes_[i];
771       auto parentFd = p.pipe.fd();
772       DCHECK_EQ(fds[i].fd, parentFd);
773       short events = fds[i].revents;
774
775       bool closed = false;
776       if (events & POLLOUT) {
777         DCHECK(!(events & POLLIN));
778         if (writeCallback(parentFd, p.childFd)) {
779           toClose.push_back(i);
780           closed = true;
781         }
782       }
783
784       // Call read callback on POLLHUP, to give it a chance to read (and act
785       // on) end of file
786       if (events & (POLLIN | POLLHUP)) {
787         DCHECK(!(events & POLLOUT));
788         if (readCallback(parentFd, p.childFd)) {
789           toClose.push_back(i);
790           closed = true;
791         }
792       }
793
794       if ((events & (POLLHUP | POLLERR)) && !closed) {
795         toClose.push_back(i);
796         closed = true;
797       }
798     }
799
800     // Close the fds in reverse order so the indexes hold after erase()
801     for (int idx : boost::adaptors::reverse(toClose)) {
802       auto pos = pipes_.begin() + idx;
803       pos->pipe.close();  // Throws on error
804       pipes_.erase(pos);
805     }
806   }
807 }
808
809 void Subprocess::enableNotifications(int childFd, bool enabled) {
810   pipes_[findByChildFd(childFd)].enabled = enabled;
811 }
812
813 bool Subprocess::notificationsEnabled(int childFd) const {
814   return pipes_[findByChildFd(childFd)].enabled;
815 }
816
817 size_t Subprocess::findByChildFd(int childFd) const {
818   auto pos = std::lower_bound(
819       pipes_.begin(), pipes_.end(), childFd,
820       [] (const Pipe& pipe, int fd) { return pipe.childFd < fd; });
821   if (pos == pipes_.end() || pos->childFd != childFd) {
822     throw std::invalid_argument(folly::to<std::string>(
823         "child fd not found ", childFd));
824   }
825   return pos - pipes_.begin();
826 }
827
828 void Subprocess::closeParentFd(int childFd) {
829   int idx = findByChildFd(childFd);
830   pipes_[idx].pipe.close();  // May throw
831   pipes_.erase(pipes_.begin() + idx);
832 }
833
834 std::vector<Subprocess::ChildPipe> Subprocess::takeOwnershipOfPipes() {
835   std::vector<Subprocess::ChildPipe> pipes;
836   for (auto& p : pipes_) {
837     pipes.emplace_back(p.childFd, std::move(p.pipe));
838   }
839   // release memory
840   std::vector<Pipe>().swap(pipes_);
841   return pipes;
842 }
843
844 namespace {
845
846 class Initializer {
847  public:
848   Initializer() {
849     // We like EPIPE, thanks.
850     ::signal(SIGPIPE, SIG_IGN);
851   }
852 };
853
854 Initializer initializer;
855
856 }  // namespace
857
858 }  // namespace folly