Fix Chatty subprocess test, call callback on hangup
[folly.git] / folly / Subprocess.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/Subprocess.h"
18
19 #if __linux__
20 #include <sys/prctl.h>
21 #endif
22 #include <fcntl.h>
23 #include <poll.h>
24 #include <unistd.h>
25
26 #include <array>
27 #include <algorithm>
28 #include <system_error>
29
30 #include <boost/container/flat_set.hpp>
31 #include <boost/range/adaptors.hpp>
32
33 #include <glog/logging.h>
34
35 #include "folly/Conv.h"
36 #include "folly/Exception.h"
37 #include "folly/FileUtil.h"
38 #include "folly/ScopeGuard.h"
39 #include "folly/String.h"
40 #include "folly/io/Cursor.h"
41
42 extern char** environ;
43
44 constexpr int kExecFailure = 127;
45 constexpr int kChildFailure = 126;
46
47 namespace folly {
48
49 ProcessReturnCode::State ProcessReturnCode::state() const {
50   if (rawStatus_ == RV_NOT_STARTED) return NOT_STARTED;
51   if (rawStatus_ == RV_RUNNING) return RUNNING;
52   if (WIFEXITED(rawStatus_)) return EXITED;
53   if (WIFSIGNALED(rawStatus_)) return KILLED;
54   throw std::runtime_error(to<std::string>(
55       "Invalid ProcessReturnCode: ", rawStatus_));
56 }
57
58 void ProcessReturnCode::enforce(State expected) const {
59   State s = state();
60   if (s != expected) {
61     throw std::logic_error(to<std::string>("Invalid state ", s,
62                                            " expected ", expected));
63   }
64 }
65
66 int ProcessReturnCode::exitStatus() const {
67   enforce(EXITED);
68   return WEXITSTATUS(rawStatus_);
69 }
70
71 int ProcessReturnCode::killSignal() const {
72   enforce(KILLED);
73   return WTERMSIG(rawStatus_);
74 }
75
76 bool ProcessReturnCode::coreDumped() const {
77   enforce(KILLED);
78   return WCOREDUMP(rawStatus_);
79 }
80
81 std::string ProcessReturnCode::str() const {
82   switch (state()) {
83   case NOT_STARTED:
84     return "not started";
85   case RUNNING:
86     return "running";
87   case EXITED:
88     return to<std::string>("exited with status ", exitStatus());
89   case KILLED:
90     return to<std::string>("killed by signal ", killSignal(),
91                            (coreDumped() ? " (core dumped)" : ""));
92   }
93   CHECK(false);  // unreached
94 }
95
96 CalledProcessError::CalledProcessError(ProcessReturnCode rc)
97   : returnCode_(rc),
98     what_(returnCode_.str()) {
99 }
100
101 SubprocessSpawnError::SubprocessSpawnError(const char* executable,
102                                            int errCode,
103                                            int errnoValue)
104   : errnoValue_(errnoValue),
105     what_(to<std::string>(errCode == kExecFailure ?
106                             "failed to execute " :
107                             "error preparing to execute ",
108                           executable, ": ", errnoStr(errnoValue))) {
109 }
110
111 namespace {
112
113 // Copy pointers to the given strings in a format suitable for posix_spawn
114 std::unique_ptr<const char*[]> cloneStrings(const std::vector<std::string>& s) {
115   std::unique_ptr<const char*[]> d(new const char*[s.size() + 1]);
116   for (int i = 0; i < s.size(); i++) {
117     d[i] = s[i].c_str();
118   }
119   d[s.size()] = nullptr;
120   return d;
121 }
122
123 // Check a wait() status, throw on non-successful
124 void checkStatus(ProcessReturnCode returnCode) {
125   if (returnCode.state() != ProcessReturnCode::EXITED ||
126       returnCode.exitStatus() != 0) {
127     throw CalledProcessError(returnCode);
128   }
129 }
130
131 }  // namespace
132
133 Subprocess::Options& Subprocess::Options::fd(int fd, int action) {
134   if (action == Subprocess::PIPE) {
135     if (fd == 0) {
136       action = Subprocess::PIPE_IN;
137     } else if (fd == 1 || fd == 2) {
138       action = Subprocess::PIPE_OUT;
139     } else {
140       throw std::invalid_argument(
141           to<std::string>("Only fds 0, 1, 2 are valid for action=PIPE: ", fd));
142     }
143   }
144   fdActions_[fd] = action;
145   return *this;
146 }
147
148 Subprocess::Subprocess(
149     const std::vector<std::string>& argv,
150     const Options& options,
151     const char* executable,
152     const std::vector<std::string>* env)
153   : pid_(-1),
154     returnCode_(RV_NOT_STARTED) {
155   if (argv.empty()) {
156     throw std::invalid_argument("argv must not be empty");
157   }
158   if (!executable) executable = argv[0].c_str();
159   spawn(cloneStrings(argv), executable, options, env);
160 }
161
162 Subprocess::Subprocess(
163     const std::string& cmd,
164     const Options& options,
165     const std::vector<std::string>* env)
166   : pid_(-1),
167     returnCode_(RV_NOT_STARTED) {
168   if (options.usePath_) {
169     throw std::invalid_argument("usePath() not allowed when running in shell");
170   }
171   const char* shell = getenv("SHELL");
172   if (!shell) {
173     shell = "/bin/sh";
174   }
175
176   std::unique_ptr<const char*[]> argv(new const char*[4]);
177   argv[0] = shell;
178   argv[1] = "-c";
179   argv[2] = cmd.c_str();
180   argv[3] = nullptr;
181   spawn(std::move(argv), shell, options, env);
182 }
183
184 Subprocess::~Subprocess() {
185   CHECK_NE(returnCode_.state(), ProcessReturnCode::RUNNING)
186     << "Subprocess destroyed without reaping child";
187   closeAll();
188 }
189
190 namespace {
191 void closeChecked(int fd) {
192   checkUnixError(::close(fd), "close");
193 }
194
195 struct ChildErrorInfo {
196   int errCode;
197   int errnoValue;
198 };
199
200 FOLLY_NORETURN void childError(int errFd, int errCode, int errnoValue);
201 void childError(int errFd, int errCode, int errnoValue) {
202   ChildErrorInfo info = {errCode, errnoValue};
203   // Write the error information over the pipe to our parent process.
204   // We can't really do anything else if this write call fails.
205   writeNoInt(errFd, &info, sizeof(info));
206   // exit
207   _exit(errCode);
208 }
209
210 }  // namespace
211
212 void Subprocess::closeAll() {
213   for (auto& p : pipes_) {
214     closeChecked(p.parentFd);
215   }
216   pipes_.clear();
217 }
218
219 void Subprocess::setAllNonBlocking() {
220   for (auto& p : pipes_) {
221     int fd = p.parentFd;
222     int flags = ::fcntl(fd, F_GETFL);
223     checkUnixError(flags, "fcntl");
224     int r = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
225     checkUnixError(r, "fcntl");
226   }
227 }
228
229 void Subprocess::spawn(
230     std::unique_ptr<const char*[]> argv,
231     const char* executable,
232     const Options& optionsIn,
233     const std::vector<std::string>* env) {
234   if (optionsIn.usePath_ && env) {
235     throw std::invalid_argument(
236         "usePath() not allowed when overriding environment");
237   }
238
239   // Make a copy, we'll mutate options
240   Options options(optionsIn);
241
242   // On error, close all of the pipes_
243   auto pipesGuard = makeGuard([&] {
244     for (auto& p : this->pipes_) {
245       CHECK_ERR(::close(p.parentFd));
246     }
247   });
248
249   // Create a pipe to use to receive error information from the child,
250   // in case it fails before calling exec()
251   int errFds[2];
252   int r = ::pipe(errFds);
253   checkUnixError(r, "pipe");
254   SCOPE_EXIT {
255     CHECK_ERR(::close(errFds[0]));
256     if (errFds[1] >= 0) {
257       CHECK_ERR(::close(errFds[1]));
258     }
259   };
260   // Ask the child to close the read end of the error pipe.
261   options.fdActions_[errFds[0]] = CLOSE;
262   // Set the close-on-exec flag on the write side of the pipe.
263   // This way the pipe will be closed automatically in the child if execve()
264   // succeeds.  If the exec fails the child can write error information to the
265   // pipe.
266   r = fcntl(errFds[1], F_SETFD, FD_CLOEXEC);
267   checkUnixError(r, "set FD_CLOEXEC");
268
269   // Perform the actual work of setting up pipes then forking and
270   // executing the child.
271   spawnInternal(std::move(argv), executable, options, env, errFds[1]);
272
273   // After spawnInternal() returns the child is alive.  We have to be very
274   // careful about throwing after this point.  We are inside the constructor,
275   // so if we throw the Subprocess object will have never existed, and the
276   // destructor will never be called.
277   //
278   // We should only throw if we got an error via the errFd, and we know the
279   // child has exited and can be immediately waited for.  In all other cases,
280   // we have no way of cleaning up the child.
281
282   // Close writable side of the errFd pipe in the parent process
283   CHECK_ERR(::close(errFds[1]));
284   errFds[1] = -1;
285
286   // Read from the errFd pipe, to tell if the child ran into any errors before
287   // calling exec()
288   readChildErrorPipe(errFds[0], executable);
289
290   // We have fully succeeded now, so release the guard on pipes_
291   pipesGuard.dismiss();
292 }
293
294 void Subprocess::spawnInternal(
295     std::unique_ptr<const char*[]> argv,
296     const char* executable,
297     Options& options,
298     const std::vector<std::string>* env,
299     int errFd) {
300   // Parent work, pre-fork: create pipes
301   std::vector<int> childFds;
302   // Close all of the childFds as we leave this scope
303   SCOPE_EXIT {
304     // These are only pipes, closing them shouldn't fail
305     for (int cfd : childFds) {
306       CHECK_ERR(::close(cfd));
307     }
308   };
309
310   int r;
311   for (auto& p : options.fdActions_) {
312     if (p.second == PIPE_IN || p.second == PIPE_OUT) {
313       int fds[2];
314       r = ::pipe(fds);
315       checkUnixError(r, "pipe");
316       PipeInfo pinfo;
317       pinfo.direction = p.second;
318       int cfd;
319       if (p.second == PIPE_IN) {
320         // Child gets reading end
321         pinfo.parentFd = fds[1];
322         cfd = fds[0];
323       } else {
324         pinfo.parentFd = fds[0];
325         cfd = fds[1];
326       }
327       p.second = cfd;  // ensure it gets dup2()ed
328       pinfo.childFd = p.first;
329       childFds.push_back(cfd);
330       pipes_.push_back(pinfo);
331     }
332   }
333
334   // This should already be sorted, as options.fdActions_ is
335   DCHECK(std::is_sorted(pipes_.begin(), pipes_.end()));
336
337   // Note that the const casts below are legit, per
338   // http://pubs.opengroup.org/onlinepubs/009695399/functions/exec.html
339
340   char** argVec = const_cast<char**>(argv.get());
341
342   // Set up environment
343   std::unique_ptr<const char*[]> envHolder;
344   char** envVec;
345   if (env) {
346     envHolder = cloneStrings(*env);
347     envVec = const_cast<char**>(envHolder.get());
348   } else {
349     envVec = environ;
350   }
351
352   // Block all signals around vfork; see http://ewontfix.com/7/.
353   //
354   // As the child may run in the same address space as the parent until
355   // the actual execve() system call, any (custom) signal handlers that
356   // the parent has might alter parent's memory if invoked in the child,
357   // with undefined results.  So we block all signals in the parent before
358   // vfork(), which will cause them to be blocked in the child as well (we
359   // rely on the fact that Linux, just like all sane implementations, only
360   // clones the calling thread).  Then, in the child, we reset all signals
361   // to their default dispositions (while still blocked), and unblock them
362   // (so the exec()ed process inherits the parent's signal mask)
363   //
364   // The parent also unblocks all signals as soon as vfork() returns.
365   sigset_t allBlocked;
366   r = sigfillset(&allBlocked);
367   checkUnixError(r, "sigfillset");
368   sigset_t oldSignals;
369
370   r = pthread_sigmask(SIG_SETMASK, &allBlocked, &oldSignals);
371   checkPosixError(r, "pthread_sigmask");
372   SCOPE_EXIT {
373     // Restore signal mask
374     r = pthread_sigmask(SIG_SETMASK, &oldSignals, nullptr);
375     CHECK_EQ(r, 0) << "pthread_sigmask: " << errnoStr(r);  // shouldn't fail
376   };
377
378   pid_t pid = vfork();
379   if (pid == 0) {
380     int errnoValue = prepareChild(options, &oldSignals);
381     if (errnoValue != 0) {
382       childError(errFd, kChildFailure, errnoValue);
383     }
384
385     errnoValue = runChild(executable, argVec, envVec, options);
386     // If we get here, exec() failed.
387     childError(errFd, kExecFailure, errnoValue);
388   }
389   // In parent.  Make sure vfork() succeeded.
390   checkUnixError(pid, errno, "vfork");
391
392   // Child is alive.  We have to be very careful about throwing after this
393   // point.  We are inside the constructor, so if we throw the Subprocess
394   // object will have never existed, and the destructor will never be called.
395   //
396   // We should only throw if we got an error via the errFd, and we know the
397   // child has exited and can be immediately waited for.  In all other cases,
398   // we have no way of cleaning up the child.
399   pid_ = pid;
400   returnCode_ = ProcessReturnCode(RV_RUNNING);
401 }
402
403 int Subprocess::prepareChild(const Options& options,
404                              const sigset_t* sigmask) const {
405   // While all signals are blocked, we must reset their
406   // dispositions to default.
407   for (int sig = 1; sig < NSIG; ++sig) {
408     ::signal(sig, SIG_DFL);
409   }
410   // Unblock signals; restore signal mask.
411   int r = pthread_sigmask(SIG_SETMASK, sigmask, nullptr);
412   if (r != 0) {
413     return r;  // pthread_sigmask() returns an errno value
414   }
415
416   // Change the working directory, if one is given
417   if (!options.childDir_.empty()) {
418     r = ::chdir(options.childDir_.c_str());
419     if (r == -1) {
420       return errno;
421     }
422   }
423
424   // Close parent's ends of all pipes
425   for (auto& p : pipes_) {
426     r = ::close(p.parentFd);
427     if (r == -1) {
428       return errno;
429     }
430   }
431
432   // Close all fds that we're supposed to close.
433   // Note that we're ignoring errors here, in case some of these
434   // fds were set to close on exec.
435   for (auto& p : options.fdActions_) {
436     if (p.second == CLOSE) {
437       ::close(p.first);
438     } else {
439       r = ::dup2(p.second, p.first);
440       if (r == -1) {
441         return errno;
442       }
443     }
444   }
445
446   // If requested, close all other file descriptors.  Don't close
447   // any fds in options.fdActions_, and don't touch stdin, stdout, stderr.
448   // Ignore errors.
449   if (options.closeOtherFds_) {
450     for (int fd = getdtablesize() - 1; fd >= 3; --fd) {
451       if (options.fdActions_.count(fd) == 0) {
452         ::close(fd);
453       }
454     }
455   }
456
457 #if __linux__
458   // Opt to receive signal on parent death, if requested
459   if (options.parentDeathSignal_ != 0) {
460     r = prctl(PR_SET_PDEATHSIG, options.parentDeathSignal_, 0, 0, 0);
461     if (r == -1) {
462       return errno;
463     }
464   }
465 #endif
466
467   return 0;
468 }
469
470 int Subprocess::runChild(const char* executable,
471                          char** argv, char** env,
472                          const Options& options) const {
473   // Now, finally, exec.
474   int r;
475   if (options.usePath_) {
476     ::execvp(executable, argv);
477   } else {
478     ::execve(executable, argv, env);
479   }
480   return errno;
481 }
482
483 void Subprocess::readChildErrorPipe(int pfd, const char* executable) {
484   ChildErrorInfo info;
485   auto rc = readNoInt(pfd, &info, sizeof(info));
486   if (rc == 0) {
487     // No data means the child executed successfully, and the pipe
488     // was closed due to the close-on-exec flag being set.
489     return;
490   } else if (rc != sizeof(ChildErrorInfo)) {
491     // An error occurred trying to read from the pipe, or we got a partial read.
492     // Neither of these cases should really occur in practice.
493     //
494     // We can't get any error data from the child in this case, and we don't
495     // know if it is successfully running or not.  All we can do is to return
496     // normally, as if the child executed successfully.  If something bad
497     // happened the caller should at least get a non-normal exit status from
498     // the child.
499     LOG(ERROR) << "unexpected error trying to read from child error pipe " <<
500       "rc=" << rc << ", errno=" << errno;
501     return;
502   }
503
504   // We got error data from the child.  The child should exit immediately in
505   // this case, so wait on it to clean up.
506   wait();
507
508   // Throw to signal the error
509   throw SubprocessSpawnError(executable, info.errCode, info.errnoValue);
510 }
511
512 ProcessReturnCode Subprocess::poll() {
513   returnCode_.enforce(ProcessReturnCode::RUNNING);
514   DCHECK_GT(pid_, 0);
515   int status;
516   pid_t found = ::waitpid(pid_, &status, WNOHANG);
517   checkUnixError(found, "waitpid");
518   if (found != 0) {
519     returnCode_ = ProcessReturnCode(status);
520     pid_ = -1;
521   }
522   return returnCode_;
523 }
524
525 bool Subprocess::pollChecked() {
526   if (poll().state() == ProcessReturnCode::RUNNING) {
527     return false;
528   }
529   checkStatus(returnCode_);
530   return true;
531 }
532
533 ProcessReturnCode Subprocess::wait() {
534   returnCode_.enforce(ProcessReturnCode::RUNNING);
535   DCHECK_GT(pid_, 0);
536   int status;
537   pid_t found;
538   do {
539     found = ::waitpid(pid_, &status, 0);
540   } while (found == -1 && errno == EINTR);
541   checkUnixError(found, "waitpid");
542   DCHECK_EQ(found, pid_);
543   returnCode_ = ProcessReturnCode(status);
544   pid_ = -1;
545   return returnCode_;
546 }
547
548 void Subprocess::waitChecked() {
549   wait();
550   checkStatus(returnCode_);
551 }
552
553 void Subprocess::sendSignal(int signal) {
554   returnCode_.enforce(ProcessReturnCode::RUNNING);
555   int r = ::kill(pid_, signal);
556   checkUnixError(r, "kill");
557 }
558
559 pid_t Subprocess::pid() const {
560   return pid_;
561 }
562
563 namespace {
564
565 std::pair<const uint8_t*, size_t> queueFront(const IOBufQueue& queue) {
566   auto* p = queue.front();
567   if (!p) return std::make_pair(nullptr, 0);
568   return io::Cursor(p).peek();
569 }
570
571 // fd write
572 bool handleWrite(int fd, IOBufQueue& queue) {
573   for (;;) {
574     auto p = queueFront(queue);
575     if (p.second == 0) {
576       return true;  // EOF
577     }
578
579     ssize_t n = writeNoInt(fd, p.first, p.second);
580     if (n == -1 && errno == EAGAIN) {
581       return false;
582     }
583     checkUnixError(n, "write");
584     queue.trimStart(n);
585   }
586 }
587
588 // fd read
589 bool handleRead(int fd, IOBufQueue& queue) {
590   for (;;) {
591     auto p = queue.preallocate(100, 65000);
592     ssize_t n = readNoInt(fd, p.first, p.second);
593     if (n == -1 && errno == EAGAIN) {
594       return false;
595     }
596     checkUnixError(n, "read");
597     if (n == 0) {
598       return true;
599     }
600     queue.postallocate(n);
601   }
602 }
603
604 bool discardRead(int fd) {
605   static const size_t bufSize = 65000;
606   // Thread unsafe, but it doesn't matter.
607   static std::unique_ptr<char[]> buf(new char[bufSize]);
608
609   for (;;) {
610     ssize_t n = readNoInt(fd, buf.get(), bufSize);
611     if (n == -1 && errno == EAGAIN) {
612       return false;
613     }
614     checkUnixError(n, "read");
615     if (n == 0) {
616       return true;
617     }
618   }
619 }
620
621 }  // namespace
622
623 std::pair<std::string, std::string> Subprocess::communicate(
624     StringPiece input) {
625   IOBufQueue inputQueue;
626   inputQueue.wrapBuffer(input.data(), input.size());
627
628   auto outQueues = communicateIOBuf(std::move(inputQueue));
629   auto outBufs = std::make_pair(outQueues.first.move(),
630                                 outQueues.second.move());
631   std::pair<std::string, std::string> out;
632   if (outBufs.first) {
633     outBufs.first->coalesce();
634     out.first.assign(reinterpret_cast<const char*>(outBufs.first->data()),
635                      outBufs.first->length());
636   }
637   if (outBufs.second) {
638     outBufs.second->coalesce();
639     out.second.assign(reinterpret_cast<const char*>(outBufs.second->data()),
640                      outBufs.second->length());
641   }
642   return out;
643 }
644
645 std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
646     IOBufQueue input) {
647   // If the user supplied a non-empty input buffer, make sure
648   // that stdin is a pipe so we can write the data.
649   if (!input.empty()) {
650     // findByChildFd() will throw std::invalid_argument if no pipe for
651     // STDIN_FILENO exists
652     findByChildFd(STDIN_FILENO);
653   }
654
655   std::pair<IOBufQueue, IOBufQueue> out;
656
657   auto readCallback = [&] (int pfd, int cfd) -> bool {
658     if (cfd == STDOUT_FILENO) {
659       return handleRead(pfd, out.first);
660     } else if (cfd == STDERR_FILENO) {
661       return handleRead(pfd, out.second);
662     } else {
663       // Don't close the file descriptor, the child might not like SIGPIPE,
664       // just read and throw the data away.
665       return discardRead(pfd);
666     }
667   };
668
669   auto writeCallback = [&] (int pfd, int cfd) -> bool {
670     if (cfd == STDIN_FILENO) {
671       return handleWrite(pfd, input);
672     } else {
673       // If we don't want to write to this fd, just close it.
674       return true;
675     }
676   };
677
678   communicate(std::move(readCallback), std::move(writeCallback));
679
680   return out;
681 }
682
683 void Subprocess::communicate(FdCallback readCallback,
684                              FdCallback writeCallback) {
685   returnCode_.enforce(ProcessReturnCode::RUNNING);
686   setAllNonBlocking();
687
688   std::vector<pollfd> fds;
689   fds.reserve(pipes_.size());
690   std::vector<int> toClose;
691   toClose.reserve(pipes_.size());
692
693   while (!pipes_.empty()) {
694     fds.clear();
695     toClose.clear();
696
697     for (auto& p : pipes_) {
698       pollfd pfd;
699       pfd.fd = p.parentFd;
700       // Yes, backwards, PIPE_IN / PIPE_OUT are defined from the
701       // child's point of view.
702       if (!p.enabled) {
703         // Still keeping fd in watched set so we get notified of POLLHUP /
704         // POLLERR
705         pfd.events = 0;
706       } else if (p.direction == PIPE_IN) {
707         pfd.events = POLLOUT;
708       } else {
709         pfd.events = POLLIN;
710       }
711       fds.push_back(pfd);
712     }
713
714     int r;
715     do {
716       r = ::poll(fds.data(), fds.size(), -1);
717     } while (r == -1 && errno == EINTR);
718     checkUnixError(r, "poll");
719
720     for (int i = 0; i < pipes_.size(); ++i) {
721       auto& p = pipes_[i];
722       DCHECK_EQ(fds[i].fd, p.parentFd);
723       short events = fds[i].revents;
724
725       bool closed = false;
726       if (events & POLLOUT) {
727         DCHECK(!(events & POLLIN));
728         if (writeCallback(p.parentFd, p.childFd)) {
729           toClose.push_back(i);
730           closed = true;
731         }
732       }
733
734       // Call read callback on POLLHUP, to give it a chance to read (and act
735       // on) end of file
736       if (events & (POLLIN | POLLHUP)) {
737         DCHECK(!(events & POLLOUT));
738         if (readCallback(p.parentFd, p.childFd)) {
739           toClose.push_back(i);
740           closed = true;
741         }
742       }
743
744       if ((events & (POLLHUP | POLLERR)) && !closed) {
745         toClose.push_back(i);
746         closed = true;
747       }
748     }
749
750     // Close the fds in reverse order so the indexes hold after erase()
751     for (int idx : boost::adaptors::reverse(toClose)) {
752       auto pos = pipes_.begin() + idx;
753       closeChecked(pos->parentFd);
754       pipes_.erase(pos);
755     }
756   }
757 }
758
759 void Subprocess::enableNotifications(int childFd, bool enabled) {
760   pipes_[findByChildFd(childFd)].enabled = enabled;
761 }
762
763 bool Subprocess::notificationsEnabled(int childFd) const {
764   return pipes_[findByChildFd(childFd)].enabled;
765 }
766
767 int Subprocess::findByChildFd(int childFd) const {
768   auto pos = std::lower_bound(
769       pipes_.begin(), pipes_.end(), childFd,
770       [] (const PipeInfo& info, int fd) { return info.childFd < fd; });
771   if (pos == pipes_.end() || pos->childFd != childFd) {
772     throw std::invalid_argument(folly::to<std::string>(
773         "child fd not found ", childFd));
774   }
775   return pos - pipes_.begin();
776 }
777
778 void Subprocess::closeParentFd(int childFd) {
779   int idx = findByChildFd(childFd);
780   closeChecked(pipes_[idx].parentFd);
781   pipes_.erase(pipes_.begin() + idx);
782 }
783
784 namespace {
785
786 class Initializer {
787  public:
788   Initializer() {
789     // We like EPIPE, thanks.
790     ::signal(SIGPIPE, SIG_IGN);
791   }
792 };
793
794 Initializer initializer;
795
796 }  // namespace
797
798 }  // namespace folly
799