Small readability improvements
[folly.git] / folly / Subprocess.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/Subprocess.h"
18
19 #if __linux__
20 #include <sys/prctl.h>
21 #endif
22 #include <fcntl.h>
23 #include <poll.h>
24 #include <unistd.h>
25
26 #include <array>
27 #include <algorithm>
28 #include <system_error>
29
30 #include <boost/container/flat_set.hpp>
31 #include <boost/range/adaptors.hpp>
32
33 #include <glog/logging.h>
34
35 #include "folly/Conv.h"
36 #include "folly/Exception.h"
37 #include "folly/FileUtil.h"
38 #include "folly/ScopeGuard.h"
39 #include "folly/String.h"
40 #include "folly/io/Cursor.h"
41
42 extern char** environ;
43
44 constexpr int kExecFailure = 127;
45 constexpr int kChildFailure = 126;
46
47 namespace folly {
48
49 ProcessReturnCode::State ProcessReturnCode::state() const {
50   if (rawStatus_ == RV_NOT_STARTED) return NOT_STARTED;
51   if (rawStatus_ == RV_RUNNING) return RUNNING;
52   if (WIFEXITED(rawStatus_)) return EXITED;
53   if (WIFSIGNALED(rawStatus_)) return KILLED;
54   throw std::runtime_error(to<std::string>(
55       "Invalid ProcessReturnCode: ", rawStatus_));
56 }
57
58 void ProcessReturnCode::enforce(State expected) const {
59   State s = state();
60   if (s != expected) {
61     throw std::logic_error(to<std::string>(
62       "Bad use of ProcessReturnCode; state is ", s, " expected ", expected
63     ));
64   }
65 }
66
67 int ProcessReturnCode::exitStatus() const {
68   enforce(EXITED);
69   return WEXITSTATUS(rawStatus_);
70 }
71
72 int ProcessReturnCode::killSignal() const {
73   enforce(KILLED);
74   return WTERMSIG(rawStatus_);
75 }
76
77 bool ProcessReturnCode::coreDumped() const {
78   enforce(KILLED);
79   return WCOREDUMP(rawStatus_);
80 }
81
82 std::string ProcessReturnCode::str() const {
83   switch (state()) {
84   case NOT_STARTED:
85     return "not started";
86   case RUNNING:
87     return "running";
88   case EXITED:
89     return to<std::string>("exited with status ", exitStatus());
90   case KILLED:
91     return to<std::string>("killed by signal ", killSignal(),
92                            (coreDumped() ? " (core dumped)" : ""));
93   }
94   CHECK(false);  // unreached
95 }
96
97 CalledProcessError::CalledProcessError(ProcessReturnCode rc)
98   : returnCode_(rc),
99     what_(returnCode_.str()) {
100 }
101
102 SubprocessSpawnError::SubprocessSpawnError(const char* executable,
103                                            int errCode,
104                                            int errnoValue)
105   : errnoValue_(errnoValue),
106     what_(to<std::string>(errCode == kExecFailure ?
107                             "failed to execute " :
108                             "error preparing to execute ",
109                           executable, ": ", errnoStr(errnoValue))) {
110 }
111
112 namespace {
113
114 // Copy pointers to the given strings in a format suitable for posix_spawn
115 std::unique_ptr<const char*[]> cloneStrings(const std::vector<std::string>& s) {
116   std::unique_ptr<const char*[]> d(new const char*[s.size() + 1]);
117   for (int i = 0; i < s.size(); i++) {
118     d[i] = s[i].c_str();
119   }
120   d[s.size()] = nullptr;
121   return d;
122 }
123
124 // Check a wait() status, throw on non-successful
125 void checkStatus(ProcessReturnCode returnCode) {
126   if (returnCode.state() != ProcessReturnCode::EXITED ||
127       returnCode.exitStatus() != 0) {
128     throw CalledProcessError(returnCode);
129   }
130 }
131
132 }  // namespace
133
134 Subprocess::Options& Subprocess::Options::fd(int fd, int action) {
135   if (action == Subprocess::PIPE) {
136     if (fd == 0) {
137       action = Subprocess::PIPE_IN;
138     } else if (fd == 1 || fd == 2) {
139       action = Subprocess::PIPE_OUT;
140     } else {
141       throw std::invalid_argument(
142           to<std::string>("Only fds 0, 1, 2 are valid for action=PIPE: ", fd));
143     }
144   }
145   fdActions_[fd] = action;
146   return *this;
147 }
148
149 Subprocess::Subprocess(
150     const std::vector<std::string>& argv,
151     const Options& options,
152     const char* executable,
153     const std::vector<std::string>* env)
154   : pid_(-1),
155     returnCode_(RV_NOT_STARTED) {
156   if (argv.empty()) {
157     throw std::invalid_argument("argv must not be empty");
158   }
159   if (!executable) executable = argv[0].c_str();
160   spawn(cloneStrings(argv), executable, options, env);
161 }
162
163 Subprocess::Subprocess(
164     const std::string& cmd,
165     const Options& options,
166     const std::vector<std::string>* env)
167   : pid_(-1),
168     returnCode_(RV_NOT_STARTED) {
169   if (options.usePath_) {
170     throw std::invalid_argument("usePath() not allowed when running in shell");
171   }
172   const char* shell = getenv("SHELL");
173   if (!shell) {
174     shell = "/bin/sh";
175   }
176
177   std::unique_ptr<const char*[]> argv(new const char*[4]);
178   argv[0] = shell;
179   argv[1] = "-c";
180   argv[2] = cmd.c_str();
181   argv[3] = nullptr;
182   spawn(std::move(argv), shell, options, env);
183 }
184
185 Subprocess::~Subprocess() {
186   CHECK_NE(returnCode_.state(), ProcessReturnCode::RUNNING)
187     << "Subprocess destroyed without reaping child";
188   closeAll();
189 }
190
191 namespace {
192 void closeChecked(int fd) {
193   checkUnixError(::close(fd), "close");
194 }
195
196 struct ChildErrorInfo {
197   int errCode;
198   int errnoValue;
199 };
200
201 FOLLY_NORETURN void childError(int errFd, int errCode, int errnoValue);
202 void childError(int errFd, int errCode, int errnoValue) {
203   ChildErrorInfo info = {errCode, errnoValue};
204   // Write the error information over the pipe to our parent process.
205   // We can't really do anything else if this write call fails.
206   writeNoInt(errFd, &info, sizeof(info));
207   // exit
208   _exit(errCode);
209 }
210
211 }  // namespace
212
213 void Subprocess::closeAll() {
214   for (auto& p : pipes_) {
215     closeChecked(p.parentFd);
216   }
217   pipes_.clear();
218 }
219
220 void Subprocess::setAllNonBlocking() {
221   for (auto& p : pipes_) {
222     int fd = p.parentFd;
223     int flags = ::fcntl(fd, F_GETFL);
224     checkUnixError(flags, "fcntl");
225     int r = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
226     checkUnixError(r, "fcntl");
227   }
228 }
229
230 void Subprocess::spawn(
231     std::unique_ptr<const char*[]> argv,
232     const char* executable,
233     const Options& optionsIn,
234     const std::vector<std::string>* env) {
235   if (optionsIn.usePath_ && env) {
236     throw std::invalid_argument(
237         "usePath() not allowed when overriding environment");
238   }
239
240   // Make a copy, we'll mutate options
241   Options options(optionsIn);
242
243   // On error, close all of the pipes_
244   auto pipesGuard = makeGuard([&] {
245     for (auto& p : this->pipes_) {
246       CHECK_ERR(::close(p.parentFd));
247     }
248   });
249
250   // Create a pipe to use to receive error information from the child,
251   // in case it fails before calling exec()
252   int errFds[2];
253   int r = ::pipe(errFds);
254   checkUnixError(r, "pipe");
255   SCOPE_EXIT {
256     CHECK_ERR(::close(errFds[0]));
257     if (errFds[1] >= 0) {
258       CHECK_ERR(::close(errFds[1]));
259     }
260   };
261   // Ask the child to close the read end of the error pipe.
262   options.fdActions_[errFds[0]] = CLOSE;
263   // Set the close-on-exec flag on the write side of the pipe.
264   // This way the pipe will be closed automatically in the child if execve()
265   // succeeds.  If the exec fails the child can write error information to the
266   // pipe.
267   r = fcntl(errFds[1], F_SETFD, FD_CLOEXEC);
268   checkUnixError(r, "set FD_CLOEXEC");
269
270   // Perform the actual work of setting up pipes then forking and
271   // executing the child.
272   spawnInternal(std::move(argv), executable, options, env, errFds[1]);
273
274   // After spawnInternal() returns the child is alive.  We have to be very
275   // careful about throwing after this point.  We are inside the constructor,
276   // so if we throw the Subprocess object will have never existed, and the
277   // destructor will never be called.
278   //
279   // We should only throw if we got an error via the errFd, and we know the
280   // child has exited and can be immediately waited for.  In all other cases,
281   // we have no way of cleaning up the child.
282
283   // Close writable side of the errFd pipe in the parent process
284   CHECK_ERR(::close(errFds[1]));
285   errFds[1] = -1;
286
287   // Read from the errFd pipe, to tell if the child ran into any errors before
288   // calling exec()
289   readChildErrorPipe(errFds[0], executable);
290
291   // We have fully succeeded now, so release the guard on pipes_
292   pipesGuard.dismiss();
293 }
294
295 void Subprocess::spawnInternal(
296     std::unique_ptr<const char*[]> argv,
297     const char* executable,
298     Options& options,
299     const std::vector<std::string>* env,
300     int errFd) {
301   // Parent work, pre-fork: create pipes
302   std::vector<int> childFds;
303   // Close all of the childFds as we leave this scope
304   SCOPE_EXIT {
305     // These are only pipes, closing them shouldn't fail
306     for (int cfd : childFds) {
307       CHECK_ERR(::close(cfd));
308     }
309   };
310
311   int r;
312   for (auto& p : options.fdActions_) {
313     if (p.second == PIPE_IN || p.second == PIPE_OUT) {
314       int fds[2];
315       r = ::pipe(fds);
316       checkUnixError(r, "pipe");
317       PipeInfo pinfo;
318       pinfo.direction = p.second;
319       int cfd;
320       if (p.second == PIPE_IN) {
321         // Child gets reading end
322         pinfo.parentFd = fds[1];
323         cfd = fds[0];
324       } else {
325         pinfo.parentFd = fds[0];
326         cfd = fds[1];
327       }
328       p.second = cfd;  // ensure it gets dup2()ed
329       pinfo.childFd = p.first;
330       childFds.push_back(cfd);
331       pipes_.push_back(pinfo);
332     }
333   }
334
335   // This should already be sorted, as options.fdActions_ is
336   DCHECK(std::is_sorted(pipes_.begin(), pipes_.end()));
337
338   // Note that the const casts below are legit, per
339   // http://pubs.opengroup.org/onlinepubs/009695399/functions/exec.html
340
341   char** argVec = const_cast<char**>(argv.get());
342
343   // Set up environment
344   std::unique_ptr<const char*[]> envHolder;
345   char** envVec;
346   if (env) {
347     envHolder = cloneStrings(*env);
348     envVec = const_cast<char**>(envHolder.get());
349   } else {
350     envVec = environ;
351   }
352
353   // Block all signals around vfork; see http://ewontfix.com/7/.
354   //
355   // As the child may run in the same address space as the parent until
356   // the actual execve() system call, any (custom) signal handlers that
357   // the parent has might alter parent's memory if invoked in the child,
358   // with undefined results.  So we block all signals in the parent before
359   // vfork(), which will cause them to be blocked in the child as well (we
360   // rely on the fact that Linux, just like all sane implementations, only
361   // clones the calling thread).  Then, in the child, we reset all signals
362   // to their default dispositions (while still blocked), and unblock them
363   // (so the exec()ed process inherits the parent's signal mask)
364   //
365   // The parent also unblocks all signals as soon as vfork() returns.
366   sigset_t allBlocked;
367   r = sigfillset(&allBlocked);
368   checkUnixError(r, "sigfillset");
369   sigset_t oldSignals;
370
371   r = pthread_sigmask(SIG_SETMASK, &allBlocked, &oldSignals);
372   checkPosixError(r, "pthread_sigmask");
373   SCOPE_EXIT {
374     // Restore signal mask
375     r = pthread_sigmask(SIG_SETMASK, &oldSignals, nullptr);
376     CHECK_EQ(r, 0) << "pthread_sigmask: " << errnoStr(r);  // shouldn't fail
377   };
378
379   pid_t pid = vfork();
380   if (pid == 0) {
381     int errnoValue = prepareChild(options, &oldSignals);
382     if (errnoValue != 0) {
383       childError(errFd, kChildFailure, errnoValue);
384     }
385
386     errnoValue = runChild(executable, argVec, envVec, options);
387     // If we get here, exec() failed.
388     childError(errFd, kExecFailure, errnoValue);
389   }
390   // In parent.  Make sure vfork() succeeded.
391   checkUnixError(pid, errno, "vfork");
392
393   // Child is alive.  We have to be very careful about throwing after this
394   // point.  We are inside the constructor, so if we throw the Subprocess
395   // object will have never existed, and the destructor will never be called.
396   //
397   // We should only throw if we got an error via the errFd, and we know the
398   // child has exited and can be immediately waited for.  In all other cases,
399   // we have no way of cleaning up the child.
400   pid_ = pid;
401   returnCode_ = ProcessReturnCode(RV_RUNNING);
402 }
403
404 int Subprocess::prepareChild(const Options& options,
405                              const sigset_t* sigmask) const {
406   // While all signals are blocked, we must reset their
407   // dispositions to default.
408   for (int sig = 1; sig < NSIG; ++sig) {
409     ::signal(sig, SIG_DFL);
410   }
411   // Unblock signals; restore signal mask.
412   int r = pthread_sigmask(SIG_SETMASK, sigmask, nullptr);
413   if (r != 0) {
414     return r;  // pthread_sigmask() returns an errno value
415   }
416
417   // Change the working directory, if one is given
418   if (!options.childDir_.empty()) {
419     r = ::chdir(options.childDir_.c_str());
420     if (r == -1) {
421       return errno;
422     }
423   }
424
425   // Close parent's ends of all pipes
426   for (auto& p : pipes_) {
427     r = ::close(p.parentFd);
428     if (r == -1) {
429       return errno;
430     }
431   }
432
433   // Close all fds that we're supposed to close.
434   // Note that we're ignoring errors here, in case some of these
435   // fds were set to close on exec.
436   for (auto& p : options.fdActions_) {
437     if (p.second == CLOSE) {
438       ::close(p.first);
439     } else {
440       r = ::dup2(p.second, p.first);
441       if (r == -1) {
442         return errno;
443       }
444     }
445   }
446
447   // If requested, close all other file descriptors.  Don't close
448   // any fds in options.fdActions_, and don't touch stdin, stdout, stderr.
449   // Ignore errors.
450   if (options.closeOtherFds_) {
451     for (int fd = getdtablesize() - 1; fd >= 3; --fd) {
452       if (options.fdActions_.count(fd) == 0) {
453         ::close(fd);
454       }
455     }
456   }
457
458 #if __linux__
459   // Opt to receive signal on parent death, if requested
460   if (options.parentDeathSignal_ != 0) {
461     r = prctl(PR_SET_PDEATHSIG, options.parentDeathSignal_, 0, 0, 0);
462     if (r == -1) {
463       return errno;
464     }
465   }
466 #endif
467
468   return 0;
469 }
470
471 int Subprocess::runChild(const char* executable,
472                          char** argv, char** env,
473                          const Options& options) const {
474   // Now, finally, exec.
475   int r;
476   if (options.usePath_) {
477     ::execvp(executable, argv);
478   } else {
479     ::execve(executable, argv, env);
480   }
481   return errno;
482 }
483
484 void Subprocess::readChildErrorPipe(int pfd, const char* executable) {
485   ChildErrorInfo info;
486   auto rc = readNoInt(pfd, &info, sizeof(info));
487   if (rc == 0) {
488     // No data means the child executed successfully, and the pipe
489     // was closed due to the close-on-exec flag being set.
490     return;
491   } else if (rc != sizeof(ChildErrorInfo)) {
492     // An error occurred trying to read from the pipe, or we got a partial read.
493     // Neither of these cases should really occur in practice.
494     //
495     // We can't get any error data from the child in this case, and we don't
496     // know if it is successfully running or not.  All we can do is to return
497     // normally, as if the child executed successfully.  If something bad
498     // happened the caller should at least get a non-normal exit status from
499     // the child.
500     LOG(ERROR) << "unexpected error trying to read from child error pipe " <<
501       "rc=" << rc << ", errno=" << errno;
502     return;
503   }
504
505   // We got error data from the child.  The child should exit immediately in
506   // this case, so wait on it to clean up.
507   wait();
508
509   // Throw to signal the error
510   throw SubprocessSpawnError(executable, info.errCode, info.errnoValue);
511 }
512
513 ProcessReturnCode Subprocess::poll() {
514   returnCode_.enforce(ProcessReturnCode::RUNNING);
515   DCHECK_GT(pid_, 0);
516   int status;
517   pid_t found = ::waitpid(pid_, &status, WNOHANG);
518   checkUnixError(found, "waitpid");
519   if (found != 0) {
520     returnCode_ = ProcessReturnCode(status);
521     pid_ = -1;
522   }
523   return returnCode_;
524 }
525
526 bool Subprocess::pollChecked() {
527   if (poll().state() == ProcessReturnCode::RUNNING) {
528     return false;
529   }
530   checkStatus(returnCode_);
531   return true;
532 }
533
534 ProcessReturnCode Subprocess::wait() {
535   returnCode_.enforce(ProcessReturnCode::RUNNING);
536   DCHECK_GT(pid_, 0);
537   int status;
538   pid_t found;
539   do {
540     found = ::waitpid(pid_, &status, 0);
541   } while (found == -1 && errno == EINTR);
542   checkUnixError(found, "waitpid");
543   DCHECK_EQ(found, pid_);
544   returnCode_ = ProcessReturnCode(status);
545   pid_ = -1;
546   return returnCode_;
547 }
548
549 void Subprocess::waitChecked() {
550   wait();
551   checkStatus(returnCode_);
552 }
553
554 void Subprocess::sendSignal(int signal) {
555   returnCode_.enforce(ProcessReturnCode::RUNNING);
556   int r = ::kill(pid_, signal);
557   checkUnixError(r, "kill");
558 }
559
560 pid_t Subprocess::pid() const {
561   return pid_;
562 }
563
564 namespace {
565
566 std::pair<const uint8_t*, size_t> queueFront(const IOBufQueue& queue) {
567   auto* p = queue.front();
568   if (!p) return std::make_pair(nullptr, 0);
569   return io::Cursor(p).peek();
570 }
571
572 // fd write
573 bool handleWrite(int fd, IOBufQueue& queue) {
574   for (;;) {
575     auto p = queueFront(queue);
576     if (p.second == 0) {
577       return true;  // EOF
578     }
579
580     ssize_t n = writeNoInt(fd, p.first, p.second);
581     if (n == -1 && errno == EAGAIN) {
582       return false;
583     }
584     checkUnixError(n, "write");
585     queue.trimStart(n);
586   }
587 }
588
589 // fd read
590 bool handleRead(int fd, IOBufQueue& queue) {
591   for (;;) {
592     auto p = queue.preallocate(100, 65000);
593     ssize_t n = readNoInt(fd, p.first, p.second);
594     if (n == -1 && errno == EAGAIN) {
595       return false;
596     }
597     checkUnixError(n, "read");
598     if (n == 0) {
599       return true;
600     }
601     queue.postallocate(n);
602   }
603 }
604
605 bool discardRead(int fd) {
606   static const size_t bufSize = 65000;
607   // Thread unsafe, but it doesn't matter.
608   static std::unique_ptr<char[]> buf(new char[bufSize]);
609
610   for (;;) {
611     ssize_t n = readNoInt(fd, buf.get(), bufSize);
612     if (n == -1 && errno == EAGAIN) {
613       return false;
614     }
615     checkUnixError(n, "read");
616     if (n == 0) {
617       return true;
618     }
619   }
620 }
621
622 }  // namespace
623
624 std::pair<std::string, std::string> Subprocess::communicate(
625     StringPiece input) {
626   IOBufQueue inputQueue;
627   inputQueue.wrapBuffer(input.data(), input.size());
628
629   auto outQueues = communicateIOBuf(std::move(inputQueue));
630   auto outBufs = std::make_pair(outQueues.first.move(),
631                                 outQueues.second.move());
632   std::pair<std::string, std::string> out;
633   if (outBufs.first) {
634     outBufs.first->coalesce();
635     out.first.assign(reinterpret_cast<const char*>(outBufs.first->data()),
636                      outBufs.first->length());
637   }
638   if (outBufs.second) {
639     outBufs.second->coalesce();
640     out.second.assign(reinterpret_cast<const char*>(outBufs.second->data()),
641                      outBufs.second->length());
642   }
643   return out;
644 }
645
646 std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
647     IOBufQueue input) {
648   // If the user supplied a non-empty input buffer, make sure
649   // that stdin is a pipe so we can write the data.
650   if (!input.empty()) {
651     // findByChildFd() will throw std::invalid_argument if no pipe for
652     // STDIN_FILENO exists
653     findByChildFd(STDIN_FILENO);
654   }
655
656   std::pair<IOBufQueue, IOBufQueue> out;
657
658   auto readCallback = [&] (int pfd, int cfd) -> bool {
659     if (cfd == STDOUT_FILENO) {
660       return handleRead(pfd, out.first);
661     } else if (cfd == STDERR_FILENO) {
662       return handleRead(pfd, out.second);
663     } else {
664       // Don't close the file descriptor, the child might not like SIGPIPE,
665       // just read and throw the data away.
666       return discardRead(pfd);
667     }
668   };
669
670   auto writeCallback = [&] (int pfd, int cfd) -> bool {
671     if (cfd == STDIN_FILENO) {
672       return handleWrite(pfd, input);
673     } else {
674       // If we don't want to write to this fd, just close it.
675       return true;
676     }
677   };
678
679   communicate(std::move(readCallback), std::move(writeCallback));
680
681   return out;
682 }
683
684 void Subprocess::communicate(FdCallback readCallback,
685                              FdCallback writeCallback) {
686   returnCode_.enforce(ProcessReturnCode::RUNNING);
687   setAllNonBlocking();
688
689   std::vector<pollfd> fds;
690   fds.reserve(pipes_.size());
691   std::vector<int> toClose;
692   toClose.reserve(pipes_.size());
693
694   while (!pipes_.empty()) {
695     fds.clear();
696     toClose.clear();
697
698     for (auto& p : pipes_) {
699       pollfd pfd;
700       pfd.fd = p.parentFd;
701       // Yes, backwards, PIPE_IN / PIPE_OUT are defined from the
702       // child's point of view.
703       if (!p.enabled) {
704         // Still keeping fd in watched set so we get notified of POLLHUP /
705         // POLLERR
706         pfd.events = 0;
707       } else if (p.direction == PIPE_IN) {
708         pfd.events = POLLOUT;
709       } else {
710         pfd.events = POLLIN;
711       }
712       fds.push_back(pfd);
713     }
714
715     int r;
716     do {
717       r = ::poll(fds.data(), fds.size(), -1);
718     } while (r == -1 && errno == EINTR);
719     checkUnixError(r, "poll");
720
721     for (int i = 0; i < pipes_.size(); ++i) {
722       auto& p = pipes_[i];
723       DCHECK_EQ(fds[i].fd, p.parentFd);
724       short events = fds[i].revents;
725
726       bool closed = false;
727       if (events & POLLOUT) {
728         DCHECK(!(events & POLLIN));
729         if (writeCallback(p.parentFd, p.childFd)) {
730           toClose.push_back(i);
731           closed = true;
732         }
733       }
734
735       // Call read callback on POLLHUP, to give it a chance to read (and act
736       // on) end of file
737       if (events & (POLLIN | POLLHUP)) {
738         DCHECK(!(events & POLLOUT));
739         if (readCallback(p.parentFd, p.childFd)) {
740           toClose.push_back(i);
741           closed = true;
742         }
743       }
744
745       if ((events & (POLLHUP | POLLERR)) && !closed) {
746         toClose.push_back(i);
747         closed = true;
748       }
749     }
750
751     // Close the fds in reverse order so the indexes hold after erase()
752     for (int idx : boost::adaptors::reverse(toClose)) {
753       auto pos = pipes_.begin() + idx;
754       closeChecked(pos->parentFd);
755       pipes_.erase(pos);
756     }
757   }
758 }
759
760 void Subprocess::enableNotifications(int childFd, bool enabled) {
761   pipes_[findByChildFd(childFd)].enabled = enabled;
762 }
763
764 bool Subprocess::notificationsEnabled(int childFd) const {
765   return pipes_[findByChildFd(childFd)].enabled;
766 }
767
768 int Subprocess::findByChildFd(int childFd) const {
769   auto pos = std::lower_bound(
770       pipes_.begin(), pipes_.end(), childFd,
771       [] (const PipeInfo& info, int fd) { return info.childFd < fd; });
772   if (pos == pipes_.end() || pos->childFd != childFd) {
773     throw std::invalid_argument(folly::to<std::string>(
774         "child fd not found ", childFd));
775   }
776   return pos - pipes_.begin();
777 }
778
779 void Subprocess::closeParentFd(int childFd) {
780   int idx = findByChildFd(childFd);
781   closeChecked(pipes_[idx].parentFd);
782   pipes_.erase(pipes_.begin() + idx);
783 }
784
785 namespace {
786
787 class Initializer {
788  public:
789   Initializer() {
790     // We like EPIPE, thanks.
791     ::signal(SIGPIPE, SIG_IGN);
792   }
793 };
794
795 Initializer initializer;
796
797 }  // namespace
798
799 }  // namespace folly
800