3f25f7ffb68d78cc2525b291be95c227142e93fa
[folly.git] / folly / Subprocess.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/Subprocess.h"
18
19 #if __linux__
20 #include <sys/prctl.h>
21 #endif
22 #include <fcntl.h>
23 #include <poll.h>
24 #include <unistd.h>
25
26 #include <array>
27 #include <algorithm>
28 #include <system_error>
29
30 #include <boost/container/flat_set.hpp>
31 #include <boost/range/adaptors.hpp>
32
33 #include <glog/logging.h>
34
35 #include "folly/Conv.h"
36 #include "folly/Exception.h"
37 #include "folly/FileUtil.h"
38 #include "folly/ScopeGuard.h"
39 #include "folly/String.h"
40 #include "folly/io/Cursor.h"
41
42 extern char** environ;
43
44 constexpr int kExecFailure = 127;
45 constexpr int kChildFailure = 126;
46
47 namespace folly {
48
49 ProcessReturnCode::State ProcessReturnCode::state() const {
50   if (rawStatus_ == RV_NOT_STARTED) return NOT_STARTED;
51   if (rawStatus_ == RV_RUNNING) return RUNNING;
52   if (WIFEXITED(rawStatus_)) return EXITED;
53   if (WIFSIGNALED(rawStatus_)) return KILLED;
54   throw std::runtime_error(to<std::string>(
55       "Invalid ProcessReturnCode: ", rawStatus_));
56 }
57
58 void ProcessReturnCode::enforce(State expected) const {
59   State s = state();
60   if (s != expected) {
61     throw std::logic_error(to<std::string>(
62       "Bad use of ProcessReturnCode; state is ", s, " expected ", expected
63     ));
64   }
65 }
66
67 int ProcessReturnCode::exitStatus() const {
68   enforce(EXITED);
69   return WEXITSTATUS(rawStatus_);
70 }
71
72 int ProcessReturnCode::killSignal() const {
73   enforce(KILLED);
74   return WTERMSIG(rawStatus_);
75 }
76
77 bool ProcessReturnCode::coreDumped() const {
78   enforce(KILLED);
79   return WCOREDUMP(rawStatus_);
80 }
81
82 std::string ProcessReturnCode::str() const {
83   switch (state()) {
84   case NOT_STARTED:
85     return "not started";
86   case RUNNING:
87     return "running";
88   case EXITED:
89     return to<std::string>("exited with status ", exitStatus());
90   case KILLED:
91     return to<std::string>("killed by signal ", killSignal(),
92                            (coreDumped() ? " (core dumped)" : ""));
93   }
94   CHECK(false);  // unreached
95 }
96
97 CalledProcessError::CalledProcessError(ProcessReturnCode rc)
98   : returnCode_(rc),
99     what_(returnCode_.str()) {
100 }
101
102 SubprocessSpawnError::SubprocessSpawnError(const char* executable,
103                                            int errCode,
104                                            int errnoValue)
105   : errnoValue_(errnoValue),
106     what_(to<std::string>(errCode == kExecFailure ?
107                             "failed to execute " :
108                             "error preparing to execute ",
109                           executable, ": ", errnoStr(errnoValue))) {
110 }
111
112 namespace {
113
114 // Copy pointers to the given strings in a format suitable for posix_spawn
115 std::unique_ptr<const char*[]> cloneStrings(const std::vector<std::string>& s) {
116   std::unique_ptr<const char*[]> d(new const char*[s.size() + 1]);
117   for (int i = 0; i < s.size(); i++) {
118     d[i] = s[i].c_str();
119   }
120   d[s.size()] = nullptr;
121   return d;
122 }
123
124 // Check a wait() status, throw on non-successful
125 void checkStatus(ProcessReturnCode returnCode) {
126   if (returnCode.state() != ProcessReturnCode::EXITED ||
127       returnCode.exitStatus() != 0) {
128     throw CalledProcessError(returnCode);
129   }
130 }
131
132 }  // namespace
133
134 Subprocess::Options& Subprocess::Options::fd(int fd, int action) {
135   if (action == Subprocess::PIPE) {
136     if (fd == 0) {
137       action = Subprocess::PIPE_IN;
138     } else if (fd == 1 || fd == 2) {
139       action = Subprocess::PIPE_OUT;
140     } else {
141       throw std::invalid_argument(
142           to<std::string>("Only fds 0, 1, 2 are valid for action=PIPE: ", fd));
143     }
144   }
145   fdActions_[fd] = action;
146   return *this;
147 }
148
149 Subprocess::Subprocess(
150     const std::vector<std::string>& argv,
151     const Options& options,
152     const char* executable,
153     const std::vector<std::string>* env)
154   : pid_(-1),
155     returnCode_(RV_NOT_STARTED) {
156   if (argv.empty()) {
157     throw std::invalid_argument("argv must not be empty");
158   }
159   if (!executable) executable = argv[0].c_str();
160   spawn(cloneStrings(argv), executable, options, env);
161 }
162
163 Subprocess::Subprocess(
164     const std::string& cmd,
165     const Options& options,
166     const std::vector<std::string>* env)
167   : pid_(-1),
168     returnCode_(RV_NOT_STARTED) {
169   if (options.usePath_) {
170     throw std::invalid_argument("usePath() not allowed when running in shell");
171   }
172   const char* shell = getenv("SHELL");
173   if (!shell) {
174     shell = "/bin/sh";
175   }
176
177   std::unique_ptr<const char*[]> argv(new const char*[4]);
178   argv[0] = shell;
179   argv[1] = "-c";
180   argv[2] = cmd.c_str();
181   argv[3] = nullptr;
182   spawn(std::move(argv), shell, options, env);
183 }
184
185 Subprocess::~Subprocess() {
186   CHECK_NE(returnCode_.state(), ProcessReturnCode::RUNNING)
187     << "Subprocess destroyed without reaping child";
188   closeAll();
189 }
190
191 namespace {
192 void closeChecked(int fd) {
193   checkUnixError(::close(fd), "close");
194 }
195
196 struct ChildErrorInfo {
197   int errCode;
198   int errnoValue;
199 };
200
201 FOLLY_NORETURN void childError(int errFd, int errCode, int errnoValue);
202 void childError(int errFd, int errCode, int errnoValue) {
203   ChildErrorInfo info = {errCode, errnoValue};
204   // Write the error information over the pipe to our parent process.
205   // We can't really do anything else if this write call fails.
206   writeNoInt(errFd, &info, sizeof(info));
207   // exit
208   _exit(errCode);
209 }
210
211 }  // namespace
212
213 void Subprocess::closeAll() {
214   for (auto& p : pipes_) {
215     closeChecked(p.parentFd);
216   }
217   pipes_.clear();
218 }
219
220 void Subprocess::setAllNonBlocking() {
221   for (auto& p : pipes_) {
222     int fd = p.parentFd;
223     int flags = ::fcntl(fd, F_GETFL);
224     checkUnixError(flags, "fcntl");
225     int r = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
226     checkUnixError(r, "fcntl");
227   }
228 }
229
230 void Subprocess::spawn(
231     std::unique_ptr<const char*[]> argv,
232     const char* executable,
233     const Options& optionsIn,
234     const std::vector<std::string>* env) {
235   if (optionsIn.usePath_ && env) {
236     throw std::invalid_argument(
237         "usePath() not allowed when overriding environment");
238   }
239
240   // Make a copy, we'll mutate options
241   Options options(optionsIn);
242
243   // On error, close all of the pipes_
244   auto pipesGuard = makeGuard([&] {
245     for (auto& p : this->pipes_) {
246       CHECK_ERR(::close(p.parentFd));
247     }
248   });
249
250   // Create a pipe to use to receive error information from the child,
251   // in case it fails before calling exec(), setting the close-on-exec flag
252   // on both sides of the pipe.
253   // This way the pipe will be closed automatically in the child if execve()
254   // succeeds.  If the exec fails the child can write error information to the
255   // pipe.
256   // Note that O_CLOEXEC must be set in a single call while we are creating
257   // the pipe instead of doing pipe()/fcntl separately, which might race if a
258   // another thread calls fork()/exec() concurrently and both sides of the pipe
259   // may be inherited by the corresponding child process without being closed.
260   int errFds[2];
261   int r = ::pipe2(errFds, O_CLOEXEC);
262   checkUnixError(r, "pipe2");
263   SCOPE_EXIT {
264     CHECK_ERR(::close(errFds[0]));
265     if (errFds[1] >= 0) {
266       CHECK_ERR(::close(errFds[1]));
267     }
268   };
269
270   // Perform the actual work of setting up pipes then forking and
271   // executing the child.
272   spawnInternal(std::move(argv), executable, options, env, errFds[1]);
273
274   // After spawnInternal() returns the child is alive.  We have to be very
275   // careful about throwing after this point.  We are inside the constructor,
276   // so if we throw the Subprocess object will have never existed, and the
277   // destructor will never be called.
278   //
279   // We should only throw if we got an error via the errFd, and we know the
280   // child has exited and can be immediately waited for.  In all other cases,
281   // we have no way of cleaning up the child.
282
283   // Close writable side of the errFd pipe in the parent process
284   CHECK_ERR(::close(errFds[1]));
285   errFds[1] = -1;
286
287   // Read from the errFd pipe, to tell if the child ran into any errors before
288   // calling exec()
289   readChildErrorPipe(errFds[0], executable);
290
291   // We have fully succeeded now, so release the guard on pipes_
292   pipesGuard.dismiss();
293 }
294
295 void Subprocess::spawnInternal(
296     std::unique_ptr<const char*[]> argv,
297     const char* executable,
298     Options& options,
299     const std::vector<std::string>* env,
300     int errFd) {
301   // Parent work, pre-fork: create pipes
302   std::vector<int> childFds;
303   // Close all of the childFds as we leave this scope
304   SCOPE_EXIT {
305     // These are only pipes, closing them shouldn't fail
306     for (int cfd : childFds) {
307       CHECK_ERR(::close(cfd));
308     }
309   };
310
311   int r;
312   for (auto& p : options.fdActions_) {
313     if (p.second == PIPE_IN || p.second == PIPE_OUT) {
314       int fds[2];
315       // Set O_CLOEXEC on both ends of the pipe atomically while creating
316       // the pipe. The child will clear O_CLOEXEC on its side of the pipe
317       // before calling exec() so that stays open afterwards.
318       // This way even if a concurrently constructed Subprocess inherits
319       // both ends of this pipe, they will be automatically closed
320       // after the corresponding exec().
321       r = ::pipe2(fds, O_CLOEXEC);
322       checkUnixError(r, "pipe2");
323       PipeInfo pinfo;
324       pinfo.direction = p.second;
325       int cfd;
326       if (p.second == PIPE_IN) {
327         // Child gets reading end
328         pinfo.parentFd = fds[1];
329         cfd = fds[0];
330       } else {
331         pinfo.parentFd = fds[0];
332         cfd = fds[1];
333       }
334       p.second = cfd;  // ensure it gets dup2()ed
335       pinfo.childFd = p.first;
336       childFds.push_back(cfd);
337       pipes_.push_back(pinfo);
338     }
339   }
340
341   // This should already be sorted, as options.fdActions_ is
342   DCHECK(std::is_sorted(pipes_.begin(), pipes_.end()));
343
344   // Note that the const casts below are legit, per
345   // http://pubs.opengroup.org/onlinepubs/009695399/functions/exec.html
346
347   char** argVec = const_cast<char**>(argv.get());
348
349   // Set up environment
350   std::unique_ptr<const char*[]> envHolder;
351   char** envVec;
352   if (env) {
353     envHolder = cloneStrings(*env);
354     envVec = const_cast<char**>(envHolder.get());
355   } else {
356     envVec = environ;
357   }
358
359   // Block all signals around vfork; see http://ewontfix.com/7/.
360   //
361   // As the child may run in the same address space as the parent until
362   // the actual execve() system call, any (custom) signal handlers that
363   // the parent has might alter parent's memory if invoked in the child,
364   // with undefined results.  So we block all signals in the parent before
365   // vfork(), which will cause them to be blocked in the child as well (we
366   // rely on the fact that Linux, just like all sane implementations, only
367   // clones the calling thread).  Then, in the child, we reset all signals
368   // to their default dispositions (while still blocked), and unblock them
369   // (so the exec()ed process inherits the parent's signal mask)
370   //
371   // The parent also unblocks all signals as soon as vfork() returns.
372   sigset_t allBlocked;
373   r = sigfillset(&allBlocked);
374   checkUnixError(r, "sigfillset");
375   sigset_t oldSignals;
376
377   r = pthread_sigmask(SIG_SETMASK, &allBlocked, &oldSignals);
378   checkPosixError(r, "pthread_sigmask");
379   SCOPE_EXIT {
380     // Restore signal mask
381     r = pthread_sigmask(SIG_SETMASK, &oldSignals, nullptr);
382     CHECK_EQ(r, 0) << "pthread_sigmask: " << errnoStr(r);  // shouldn't fail
383   };
384
385   pid_t pid = vfork();
386   if (pid == 0) {
387     int errnoValue = prepareChild(options, &oldSignals);
388     if (errnoValue != 0) {
389       childError(errFd, kChildFailure, errnoValue);
390     }
391
392     errnoValue = runChild(executable, argVec, envVec, options);
393     // If we get here, exec() failed.
394     childError(errFd, kExecFailure, errnoValue);
395   }
396   // In parent.  Make sure vfork() succeeded.
397   checkUnixError(pid, errno, "vfork");
398
399   // Child is alive.  We have to be very careful about throwing after this
400   // point.  We are inside the constructor, so if we throw the Subprocess
401   // object will have never existed, and the destructor will never be called.
402   //
403   // We should only throw if we got an error via the errFd, and we know the
404   // child has exited and can be immediately waited for.  In all other cases,
405   // we have no way of cleaning up the child.
406   pid_ = pid;
407   returnCode_ = ProcessReturnCode(RV_RUNNING);
408 }
409
410 int Subprocess::prepareChild(const Options& options,
411                              const sigset_t* sigmask) const {
412   // While all signals are blocked, we must reset their
413   // dispositions to default.
414   for (int sig = 1; sig < NSIG; ++sig) {
415     ::signal(sig, SIG_DFL);
416   }
417   // Unblock signals; restore signal mask.
418   int r = pthread_sigmask(SIG_SETMASK, sigmask, nullptr);
419   if (r != 0) {
420     return r;  // pthread_sigmask() returns an errno value
421   }
422
423   // Change the working directory, if one is given
424   if (!options.childDir_.empty()) {
425     r = ::chdir(options.childDir_.c_str());
426     if (r == -1) {
427       return errno;
428     }
429   }
430
431   for (auto& p : pipes_) {
432     // Clear FD_CLOEXEC on the child side of the pipe so
433     // it stays open after exec() (so that the child could
434     // access it).
435     // See spawnInternal() for why FD_CLOEXEC must be set
436     // by default on pipes.
437     r = fcntl(p.childFd, F_SETFD, 0);
438     if (r == -1) {
439       return errno;
440     }
441   }
442
443   // Close all fds that we're supposed to close.
444   // Note that we're ignoring errors here, in case some of these
445   // fds were set to close on exec.
446   for (auto& p : options.fdActions_) {
447     if (p.second == CLOSE) {
448       ::close(p.first);
449     } else {
450       r = ::dup2(p.second, p.first);
451       if (r == -1) {
452         return errno;
453       }
454     }
455   }
456
457   // If requested, close all other file descriptors.  Don't close
458   // any fds in options.fdActions_, and don't touch stdin, stdout, stderr.
459   // Ignore errors.
460   if (options.closeOtherFds_) {
461     for (int fd = getdtablesize() - 1; fd >= 3; --fd) {
462       if (options.fdActions_.count(fd) == 0) {
463         ::close(fd);
464       }
465     }
466   }
467
468 #if __linux__
469   // Opt to receive signal on parent death, if requested
470   if (options.parentDeathSignal_ != 0) {
471     r = prctl(PR_SET_PDEATHSIG, options.parentDeathSignal_, 0, 0, 0);
472     if (r == -1) {
473       return errno;
474     }
475   }
476 #endif
477
478   return 0;
479 }
480
481 int Subprocess::runChild(const char* executable,
482                          char** argv, char** env,
483                          const Options& options) const {
484   // Now, finally, exec.
485   int r;
486   if (options.usePath_) {
487     ::execvp(executable, argv);
488   } else {
489     ::execve(executable, argv, env);
490   }
491   return errno;
492 }
493
494 void Subprocess::readChildErrorPipe(int pfd, const char* executable) {
495   ChildErrorInfo info;
496   auto rc = readNoInt(pfd, &info, sizeof(info));
497   if (rc == 0) {
498     // No data means the child executed successfully, and the pipe
499     // was closed due to the close-on-exec flag being set.
500     return;
501   } else if (rc != sizeof(ChildErrorInfo)) {
502     // An error occurred trying to read from the pipe, or we got a partial read.
503     // Neither of these cases should really occur in practice.
504     //
505     // We can't get any error data from the child in this case, and we don't
506     // know if it is successfully running or not.  All we can do is to return
507     // normally, as if the child executed successfully.  If something bad
508     // happened the caller should at least get a non-normal exit status from
509     // the child.
510     LOG(ERROR) << "unexpected error trying to read from child error pipe " <<
511       "rc=" << rc << ", errno=" << errno;
512     return;
513   }
514
515   // We got error data from the child.  The child should exit immediately in
516   // this case, so wait on it to clean up.
517   wait();
518
519   // Throw to signal the error
520   throw SubprocessSpawnError(executable, info.errCode, info.errnoValue);
521 }
522
523 ProcessReturnCode Subprocess::poll() {
524   returnCode_.enforce(ProcessReturnCode::RUNNING);
525   DCHECK_GT(pid_, 0);
526   int status;
527   pid_t found = ::waitpid(pid_, &status, WNOHANG);
528   checkUnixError(found, "waitpid");
529   if (found != 0) {
530     returnCode_ = ProcessReturnCode(status);
531     pid_ = -1;
532   }
533   return returnCode_;
534 }
535
536 bool Subprocess::pollChecked() {
537   if (poll().state() == ProcessReturnCode::RUNNING) {
538     return false;
539   }
540   checkStatus(returnCode_);
541   return true;
542 }
543
544 ProcessReturnCode Subprocess::wait() {
545   returnCode_.enforce(ProcessReturnCode::RUNNING);
546   DCHECK_GT(pid_, 0);
547   int status;
548   pid_t found;
549   do {
550     found = ::waitpid(pid_, &status, 0);
551   } while (found == -1 && errno == EINTR);
552   checkUnixError(found, "waitpid");
553   DCHECK_EQ(found, pid_);
554   returnCode_ = ProcessReturnCode(status);
555   pid_ = -1;
556   return returnCode_;
557 }
558
559 void Subprocess::waitChecked() {
560   wait();
561   checkStatus(returnCode_);
562 }
563
564 void Subprocess::sendSignal(int signal) {
565   returnCode_.enforce(ProcessReturnCode::RUNNING);
566   int r = ::kill(pid_, signal);
567   checkUnixError(r, "kill");
568 }
569
570 pid_t Subprocess::pid() const {
571   return pid_;
572 }
573
574 namespace {
575
576 std::pair<const uint8_t*, size_t> queueFront(const IOBufQueue& queue) {
577   auto* p = queue.front();
578   if (!p) return std::make_pair(nullptr, 0);
579   return io::Cursor(p).peek();
580 }
581
582 // fd write
583 bool handleWrite(int fd, IOBufQueue& queue) {
584   for (;;) {
585     auto p = queueFront(queue);
586     if (p.second == 0) {
587       return true;  // EOF
588     }
589
590     ssize_t n = writeNoInt(fd, p.first, p.second);
591     if (n == -1 && errno == EAGAIN) {
592       return false;
593     }
594     checkUnixError(n, "write");
595     queue.trimStart(n);
596   }
597 }
598
599 // fd read
600 bool handleRead(int fd, IOBufQueue& queue) {
601   for (;;) {
602     auto p = queue.preallocate(100, 65000);
603     ssize_t n = readNoInt(fd, p.first, p.second);
604     if (n == -1 && errno == EAGAIN) {
605       return false;
606     }
607     checkUnixError(n, "read");
608     if (n == 0) {
609       return true;
610     }
611     queue.postallocate(n);
612   }
613 }
614
615 bool discardRead(int fd) {
616   static const size_t bufSize = 65000;
617   // Thread unsafe, but it doesn't matter.
618   static std::unique_ptr<char[]> buf(new char[bufSize]);
619
620   for (;;) {
621     ssize_t n = readNoInt(fd, buf.get(), bufSize);
622     if (n == -1 && errno == EAGAIN) {
623       return false;
624     }
625     checkUnixError(n, "read");
626     if (n == 0) {
627       return true;
628     }
629   }
630 }
631
632 }  // namespace
633
634 std::pair<std::string, std::string> Subprocess::communicate(
635     StringPiece input) {
636   IOBufQueue inputQueue;
637   inputQueue.wrapBuffer(input.data(), input.size());
638
639   auto outQueues = communicateIOBuf(std::move(inputQueue));
640   auto outBufs = std::make_pair(outQueues.first.move(),
641                                 outQueues.second.move());
642   std::pair<std::string, std::string> out;
643   if (outBufs.first) {
644     outBufs.first->coalesce();
645     out.first.assign(reinterpret_cast<const char*>(outBufs.first->data()),
646                      outBufs.first->length());
647   }
648   if (outBufs.second) {
649     outBufs.second->coalesce();
650     out.second.assign(reinterpret_cast<const char*>(outBufs.second->data()),
651                      outBufs.second->length());
652   }
653   return out;
654 }
655
656 std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
657     IOBufQueue input) {
658   // If the user supplied a non-empty input buffer, make sure
659   // that stdin is a pipe so we can write the data.
660   if (!input.empty()) {
661     // findByChildFd() will throw std::invalid_argument if no pipe for
662     // STDIN_FILENO exists
663     findByChildFd(STDIN_FILENO);
664   }
665
666   std::pair<IOBufQueue, IOBufQueue> out;
667
668   auto readCallback = [&] (int pfd, int cfd) -> bool {
669     if (cfd == STDOUT_FILENO) {
670       return handleRead(pfd, out.first);
671     } else if (cfd == STDERR_FILENO) {
672       return handleRead(pfd, out.second);
673     } else {
674       // Don't close the file descriptor, the child might not like SIGPIPE,
675       // just read and throw the data away.
676       return discardRead(pfd);
677     }
678   };
679
680   auto writeCallback = [&] (int pfd, int cfd) -> bool {
681     if (cfd == STDIN_FILENO) {
682       return handleWrite(pfd, input);
683     } else {
684       // If we don't want to write to this fd, just close it.
685       return true;
686     }
687   };
688
689   communicate(std::move(readCallback), std::move(writeCallback));
690
691   return out;
692 }
693
694 void Subprocess::communicate(FdCallback readCallback,
695                              FdCallback writeCallback) {
696   returnCode_.enforce(ProcessReturnCode::RUNNING);
697   setAllNonBlocking();
698
699   std::vector<pollfd> fds;
700   fds.reserve(pipes_.size());
701   std::vector<int> toClose;
702   toClose.reserve(pipes_.size());
703
704   while (!pipes_.empty()) {
705     fds.clear();
706     toClose.clear();
707
708     for (auto& p : pipes_) {
709       pollfd pfd;
710       pfd.fd = p.parentFd;
711       // Yes, backwards, PIPE_IN / PIPE_OUT are defined from the
712       // child's point of view.
713       if (!p.enabled) {
714         // Still keeping fd in watched set so we get notified of POLLHUP /
715         // POLLERR
716         pfd.events = 0;
717       } else if (p.direction == PIPE_IN) {
718         pfd.events = POLLOUT;
719       } else {
720         pfd.events = POLLIN;
721       }
722       fds.push_back(pfd);
723     }
724
725     int r;
726     do {
727       r = ::poll(fds.data(), fds.size(), -1);
728     } while (r == -1 && errno == EINTR);
729     checkUnixError(r, "poll");
730
731     for (int i = 0; i < pipes_.size(); ++i) {
732       auto& p = pipes_[i];
733       DCHECK_EQ(fds[i].fd, p.parentFd);
734       short events = fds[i].revents;
735
736       bool closed = false;
737       if (events & POLLOUT) {
738         DCHECK(!(events & POLLIN));
739         if (writeCallback(p.parentFd, p.childFd)) {
740           toClose.push_back(i);
741           closed = true;
742         }
743       }
744
745       // Call read callback on POLLHUP, to give it a chance to read (and act
746       // on) end of file
747       if (events & (POLLIN | POLLHUP)) {
748         DCHECK(!(events & POLLOUT));
749         if (readCallback(p.parentFd, p.childFd)) {
750           toClose.push_back(i);
751           closed = true;
752         }
753       }
754
755       if ((events & (POLLHUP | POLLERR)) && !closed) {
756         toClose.push_back(i);
757         closed = true;
758       }
759     }
760
761     // Close the fds in reverse order so the indexes hold after erase()
762     for (int idx : boost::adaptors::reverse(toClose)) {
763       auto pos = pipes_.begin() + idx;
764       closeChecked(pos->parentFd);
765       pipes_.erase(pos);
766     }
767   }
768 }
769
770 void Subprocess::enableNotifications(int childFd, bool enabled) {
771   pipes_[findByChildFd(childFd)].enabled = enabled;
772 }
773
774 bool Subprocess::notificationsEnabled(int childFd) const {
775   return pipes_[findByChildFd(childFd)].enabled;
776 }
777
778 int Subprocess::findByChildFd(int childFd) const {
779   auto pos = std::lower_bound(
780       pipes_.begin(), pipes_.end(), childFd,
781       [] (const PipeInfo& info, int fd) { return info.childFd < fd; });
782   if (pos == pipes_.end() || pos->childFd != childFd) {
783     throw std::invalid_argument(folly::to<std::string>(
784         "child fd not found ", childFd));
785   }
786   return pos - pipes_.begin();
787 }
788
789 void Subprocess::closeParentFd(int childFd) {
790   int idx = findByChildFd(childFd);
791   closeChecked(pipes_[idx].parentFd);
792   pipes_.erase(pipes_.begin() + idx);
793 }
794
795 namespace {
796
797 class Initializer {
798  public:
799   Initializer() {
800     // We like EPIPE, thanks.
801     ::signal(SIGPIPE, SIG_IGN);
802   }
803 };
804
805 Initializer initializer;
806
807 }  // namespace
808
809 }  // namespace folly
810