Retry wait() on EINTR; clean up signal handling
[folly.git] / folly / Subprocess.cpp
1 /*
2  * Copyright 2012 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/Subprocess.h"
18
19 #include <fcntl.h>
20 #include <poll.h>
21 #include <unistd.h>
22 #include <wait.h>
23
24 #include <array>
25 #include <algorithm>
26 #include <system_error>
27
28 #include <boost/container/flat_set.hpp>
29 #include <boost/range/adaptors.hpp>
30
31 #include <glog/logging.h>
32
33 #include "folly/Conv.h"
34 #include "folly/ScopeGuard.h"
35 #include "folly/String.h"
36 #include "folly/experimental/io/Cursor.h"
37
38 extern char** environ;
39
40 namespace folly {
41
42 ProcessReturnCode::State ProcessReturnCode::state() const {
43   if (rawStatus_ == RV_NOT_STARTED) return NOT_STARTED;
44   if (rawStatus_ == RV_RUNNING) return RUNNING;
45   if (WIFEXITED(rawStatus_)) return EXITED;
46   if (WIFSIGNALED(rawStatus_)) return KILLED;
47   throw std::runtime_error(to<std::string>(
48       "Invalid ProcessReturnCode: ", rawStatus_));
49 }
50
51 void ProcessReturnCode::enforce(State s) const {
52   if (state() != s) {
53     throw std::logic_error(to<std::string>("Invalid state ", s));
54   }
55 }
56
57 int ProcessReturnCode::exitStatus() const {
58   enforce(EXITED);
59   return WEXITSTATUS(rawStatus_);
60 }
61
62 int ProcessReturnCode::killSignal() const {
63   enforce(KILLED);
64   return WTERMSIG(rawStatus_);
65 }
66
67 bool ProcessReturnCode::coreDumped() const {
68   enforce(KILLED);
69   return WCOREDUMP(rawStatus_);
70 }
71
72 std::string ProcessReturnCode::str() const {
73   switch (state()) {
74   case NOT_STARTED:
75     return "not started";
76   case RUNNING:
77     return "running";
78   case EXITED:
79     return to<std::string>("exited with status ", exitStatus());
80   case KILLED:
81     return to<std::string>("killed by signal ", killSignal(),
82                            (coreDumped() ? " (core dumped)" : ""));
83   }
84   CHECK(false);  // unreached
85 }
86
87 CalledProcessError::CalledProcessError(ProcessReturnCode rc)
88   : returnCode_(rc),
89     what_(returnCode_.str()) {
90 }
91
92 namespace {
93
94 // Copy pointers to the given strings in a format suitable for posix_spawn
95 std::unique_ptr<const char*[]> cloneStrings(const std::vector<std::string>& s) {
96   std::unique_ptr<const char*[]> d(new const char*[s.size() + 1]);
97   for (int i = 0; i < s.size(); i++) {
98     d[i] = s[i].c_str();
99   }
100   d[s.size()] = nullptr;
101   return d;
102 }
103
104 // Helper to throw std::system_error
105 void throwSystemError(int err, const char* msg) __attribute__((noreturn));
106 void throwSystemError(int err, const char* msg) {
107   throw std::system_error(err, std::system_category(), msg);
108 }
109
110 // Helper to throw std::system_error from errno
111 void throwSystemError(const char* msg) __attribute__((noreturn));
112 void throwSystemError(const char* msg) {
113   throwSystemError(errno, msg);
114 }
115
116 // Check a Posix return code (0 on success, error number on error), throw
117 // on error.
118 void checkPosixError(int err, const char* msg) {
119   if (err != 0) {
120     throwSystemError(err, msg);
121   }
122 }
123
124 // Check a traditional Uinx return code (-1 and sets errno on error), throw
125 // on error.
126 void checkUnixError(ssize_t ret, const char* msg) {
127   if (ret == -1) {
128     throwSystemError(msg);
129   }
130 }
131 void checkUnixError(ssize_t ret, int savedErrno, const char* msg) {
132   if (ret == -1) {
133     throwSystemError(savedErrno, msg);
134   }
135 }
136
137 // Check a wait() status, throw on non-successful
138 void checkStatus(ProcessReturnCode returnCode) {
139   if (returnCode.state() != ProcessReturnCode::EXITED ||
140       returnCode.exitStatus() != 0) {
141     throw CalledProcessError(returnCode);
142   }
143 }
144
145 }  // namespace
146
147 Subprocess::Options& Subprocess::Options::fd(int fd, int action) {
148   if (action == Subprocess::PIPE) {
149     if (fd == 0) {
150       action = Subprocess::PIPE_IN;
151     } else if (fd == 1 || fd == 2) {
152       action = Subprocess::PIPE_OUT;
153     } else {
154       throw std::invalid_argument(
155           to<std::string>("Only fds 0, 1, 2 are valid for action=PIPE: ", fd));
156     }
157   }
158   fdActions_[fd] = action;
159   return *this;
160 }
161
162 Subprocess::Subprocess(
163     const std::vector<std::string>& argv,
164     const Options& options,
165     const char* executable,
166     const std::vector<std::string>* env)
167   : pid_(-1),
168     returnCode_(RV_NOT_STARTED) {
169   if (argv.empty()) {
170     throw std::invalid_argument("argv must not be empty");
171   }
172   if (!executable) executable = argv[0].c_str();
173   spawn(cloneStrings(argv), executable, options, env);
174 }
175
176 Subprocess::Subprocess(
177     const std::string& cmd,
178     const Options& options,
179     const std::vector<std::string>* env)
180   : pid_(-1),
181     returnCode_(RV_NOT_STARTED) {
182   if (options.usePath_) {
183     throw std::invalid_argument("usePath() not allowed when running in shell");
184   }
185   const char* shell = getenv("SHELL");
186   if (!shell) {
187     shell = "/bin/sh";
188   }
189
190   std::unique_ptr<const char*[]> argv(new const char*[4]);
191   argv[0] = shell;
192   argv[1] = "-c";
193   argv[2] = cmd.c_str();
194   argv[3] = nullptr;
195   spawn(std::move(argv), shell, options, env);
196 }
197
198 Subprocess::~Subprocess() {
199   if (returnCode_.state() == ProcessReturnCode::RUNNING) {
200     LOG(ERROR) << "Subprocess destroyed without reaping; killing child.";
201     try {
202       kill();
203       wait();
204     } catch (...) {
205       LOG(FATAL) << "Killing child failed, terminating: "
206                  << exceptionStr(std::current_exception());
207     }
208   }
209   try {
210     closeAll();
211   } catch (...) {
212     LOG(FATAL) << "close failed, terminating: "
213                << exceptionStr(std::current_exception());
214   }
215 }
216
217 namespace {
218 void closeChecked(int fd) {
219   checkUnixError(::close(fd), "close");
220 }
221 }  // namespace
222
223 void Subprocess::closeAll() {
224   for (auto& p : pipes_) {
225     closeChecked(p.parentFd);
226   }
227   pipes_.clear();
228 }
229
230 void Subprocess::setAllNonBlocking() {
231   for (auto& p : pipes_) {
232     int fd = p.parentFd;
233     int flags = ::fcntl(fd, F_GETFL);
234     checkUnixError(flags, "fcntl");
235     int r = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
236     checkUnixError(r, "fcntl");
237   }
238 }
239
240 void Subprocess::spawn(
241     std::unique_ptr<const char*[]> argv,
242     const char* executable,
243     const Options& optionsIn,
244     const std::vector<std::string>* env) {
245   if (optionsIn.usePath_ && env) {
246     throw std::invalid_argument(
247         "usePath() not allowed when overriding environment");
248   }
249
250   // Make a copy, we'll mutate options
251   Options options(optionsIn);
252
253   // Parent work, pre-fork: create pipes
254   std::vector<int> childFds;
255   for (auto& p : options.fdActions_) {
256     if (p.second == PIPE_IN || p.second == PIPE_OUT) {
257       int fds[2];
258       int r = ::pipe(fds);
259       checkUnixError(r, "pipe");
260       PipeInfo pinfo;
261       pinfo.direction = p.second;
262       int cfd;
263       if (p.second == PIPE_IN) {
264         // Child gets reading end
265         pinfo.parentFd = fds[1];
266         cfd = fds[0];
267       } else {
268         pinfo.parentFd = fds[0];
269         cfd = fds[1];
270       }
271       p.second = cfd;  // ensure it gets dup2()ed
272       pinfo.childFd = p.first;
273       childFds.push_back(cfd);
274       pipes_.push_back(pinfo);
275     }
276   }
277
278   // This should already be sorted, as options.fdActions_ is
279   DCHECK(std::is_sorted(pipes_.begin(), pipes_.end()));
280
281   // Note that the const casts below are legit, per
282   // http://pubs.opengroup.org/onlinepubs/009695399/functions/exec.html
283
284   char** argVec = const_cast<char**>(argv.get());
285
286   // Set up environment
287   std::unique_ptr<const char*[]> envHolder;
288   char** envVec;
289   if (env) {
290     envHolder = cloneStrings(*env);
291     envVec = const_cast<char**>(envHolder.get());
292   } else {
293     envVec = environ;
294   }
295
296   // Block all signals around vfork; see http://ewontfix.com/7/.
297   //
298   // As the child may run in the same address space as the parent until
299   // the actual execve() system call, any (custom) signal handlers that
300   // the parent has might alter parent's memory if invoked in the child,
301   // with undefined results.  So we block all signals in the parent before
302   // vfork(), which will cause them to be blocked in the child as well (we
303   // rely on the fact that Linux, just like all sane implementations, only
304   // clones the calling thread).  Then, in the child, we reset all signals
305   // to their default dispositions (while still blocked), and unblock them
306   // (so the exec()ed process inherits the parent's signal mask)
307   //
308   // The parent also unblocks all signals as soon as vfork() returns.
309   sigset_t allBlocked;
310   int r = ::sigfillset(&allBlocked);
311   checkUnixError(r, "sigfillset");
312   sigset_t oldSignals;
313   r = pthread_sigmask(SIG_SETMASK, &allBlocked, &oldSignals);
314   checkPosixError(r, "pthread_sigmask");
315
316   pid_t pid = vfork();
317   if (pid == 0) {
318     // While all signals are blocked, we must reset their
319     // dispositions to default.
320     for (int sig = 1; sig < NSIG; ++sig) {
321       ::signal(sig, SIG_DFL);
322     }
323     // Unblock signals; restore signal mask.
324     int r = pthread_sigmask(SIG_SETMASK, &oldSignals, nullptr);
325     if (r != 0) abort();
326
327     runChild(executable, argVec, envVec, options);
328     // This should never return, but there's nothing else we can do here.
329     abort();
330   }
331   // In parent.  We want to restore the signal mask even if vfork fails,
332   // so we'll save errno here, restore the signal mask, and only then
333   // throw.
334   int savedErrno = errno;
335
336   // Restore signal mask; do this even if vfork fails!
337   // We only check for errors from pthread_sigmask after we recorded state
338   // that the child is alive, so we know to reap it.
339   r = pthread_sigmask(SIG_SETMASK, &oldSignals, nullptr);
340   checkUnixError(pid, savedErrno, "vfork");
341
342   // Child is alive
343   pid_ = pid;
344   returnCode_ = ProcessReturnCode(RV_RUNNING);
345
346   // Parent work, post-fork: close child's ends of pipes
347   for (int f : childFds) {
348     closeChecked(f);
349   }
350
351   checkPosixError(r, "pthread_sigmask");
352 }
353
354 namespace {
355
356 // Checked version of close() to use in the child: abort() on error
357 void childClose(int fd) {
358   int r = ::close(fd);
359   if (r == -1) abort();
360 }
361
362 // Checked version of dup2() to use in the child: abort() on error
363 void childDup2(int oldfd, int newfd) {
364   int r = ::dup2(oldfd, newfd);
365   if (r == -1) abort();
366 }
367
368 }  // namespace
369
370 void Subprocess::runChild(const char* executable,
371                           char** argv, char** env,
372                           const Options& options) const {
373   // Close parent's ends of all pipes
374   for (auto& p : pipes_) {
375     childClose(p.parentFd);
376   }
377
378   // Close all fds that we're supposed to close.
379   // Note that we're ignoring errors here, in case some of these
380   // fds were set to close on exec.
381   for (auto& p : options.fdActions_) {
382     if (p.second == CLOSE) {
383       ::close(p.first);
384     } else {
385       childDup2(p.second, p.first);
386     }
387   }
388
389   // If requested, close all other file descriptors.  Don't close
390   // any fds in options.fdActions_, and don't touch stdin, stdout, stderr.
391   // Ignore errors.
392   if (options.closeOtherFds_) {
393     for (int fd = getdtablesize() - 1; fd >= 3; --fd) {
394       if (options.fdActions_.count(fd) == 0) {
395         ::close(fd);
396       }
397     }
398   }
399
400   // Now, finally, exec.
401   int r;
402   if (options.usePath_) {
403     ::execvp(executable, argv);
404   } else {
405     ::execve(executable, argv, env);
406   }
407
408   // If we're here, something's wrong.
409   abort();
410 }
411
412 ProcessReturnCode Subprocess::poll() {
413   returnCode_.enforce(ProcessReturnCode::RUNNING);
414   DCHECK_GT(pid_, 0);
415   int status;
416   pid_t found = ::waitpid(pid_, &status, WNOHANG);
417   checkUnixError(found, "waitpid");
418   if (found != 0) {
419     returnCode_ = ProcessReturnCode(status);
420     pid_ = -1;
421   }
422   return returnCode_;
423 }
424
425 bool Subprocess::pollChecked() {
426   if (poll().state() == ProcessReturnCode::RUNNING) {
427     return false;
428   }
429   checkStatus(returnCode_);
430   return true;
431 }
432
433 ProcessReturnCode Subprocess::wait() {
434   returnCode_.enforce(ProcessReturnCode::RUNNING);
435   DCHECK_GT(pid_, 0);
436   int status;
437   pid_t found;
438   do {
439     found = ::waitpid(pid_, &status, 0);
440   } while (found == -1 && errno == EINTR);
441   checkUnixError(found, "waitpid");
442   DCHECK_EQ(found, pid_);
443   returnCode_ = ProcessReturnCode(status);
444   return returnCode_;
445 }
446
447 void Subprocess::waitChecked() {
448   wait();
449   checkStatus(returnCode_);
450 }
451
452 void Subprocess::sendSignal(int signal) {
453   returnCode_.enforce(ProcessReturnCode::RUNNING);
454   int r = ::kill(pid_, signal);
455   checkUnixError(r, "kill");
456 }
457
458 namespace {
459 void setNonBlocking(int fd) {
460   int flags = ::fcntl(fd, F_GETFL);
461   checkUnixError(flags, "fcntl");
462   int r = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
463   checkUnixError(r, "fcntl");
464 }
465
466 std::pair<const uint8_t*, size_t> queueFront(const IOBufQueue& queue) {
467   auto* p = queue.front();
468   if (!p) return std::make_pair(nullptr, 0);
469   return io::Cursor(p).peek();
470 }
471
472 // fd write
473 bool handleWrite(int fd, IOBufQueue& queue) {
474   for (;;) {
475     auto p = queueFront(queue);
476     if (p.second == 0) {
477       return true;  // EOF
478     }
479
480     ssize_t n;
481     do {
482       n = ::write(fd, p.first, p.second);
483     } while (n == -1 && errno == EINTR);
484     if (n == -1 && errno == EAGAIN) {
485       return false;
486     }
487     checkUnixError(n, "write");
488     queue.trimStart(n);
489   }
490 }
491
492 // fd read
493 bool handleRead(int fd, IOBufQueue& queue) {
494   for (;;) {
495     auto p = queue.preallocate(100, 65000);
496     ssize_t n;
497     do {
498       n = ::read(fd, p.first, p.second);
499     } while (n == -1 && errno == EINTR);
500     if (n == -1 && errno == EAGAIN) {
501       return false;
502     }
503     checkUnixError(n, "read");
504     if (n == 0) {
505       return true;
506     }
507     queue.postallocate(n);
508   }
509 }
510
511 bool discardRead(int fd) {
512   static const size_t bufSize = 65000;
513   // Thread unsafe, but it doesn't matter.
514   static std::unique_ptr<char[]> buf(new char[bufSize]);
515
516   for (;;) {
517     ssize_t n;
518     do {
519       n = ::read(fd, buf.get(), bufSize);
520     } while (n == -1 && errno == EINTR);
521     if (n == -1 && errno == EAGAIN) {
522       return false;
523     }
524     checkUnixError(n, "read");
525     if (n == 0) {
526       return true;
527     }
528   }
529 }
530
531 }  // namespace
532
533 std::pair<std::string, std::string> Subprocess::communicate(
534     int flags,
535     StringPiece data) {
536   IOBufQueue dataQueue;
537   dataQueue.wrapBuffer(data.data(), data.size());
538
539   auto outQueues = communicateIOBuf(flags, std::move(dataQueue));
540   auto outBufs = std::make_pair(outQueues.first.move(),
541                                 outQueues.second.move());
542   std::pair<std::string, std::string> out;
543   if (outBufs.first) {
544     outBufs.first->coalesce();
545     out.first.assign(reinterpret_cast<const char*>(outBufs.first->data()),
546                      outBufs.first->length());
547   }
548   if (outBufs.second) {
549     outBufs.second->coalesce();
550     out.second.assign(reinterpret_cast<const char*>(outBufs.second->data()),
551                      outBufs.second->length());
552   }
553   return out;
554 }
555
556 std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
557     int flags,
558     IOBufQueue data) {
559   std::pair<IOBufQueue, IOBufQueue> out;
560
561   auto readCallback = [&, flags] (int pfd, int cfd) {
562     if (cfd == 1 && (flags & READ_STDOUT)) {
563       return handleRead(pfd, out.first);
564     } else if (cfd == 2 && (flags & READ_STDERR)) {
565       return handleRead(pfd, out.second);
566     } else {
567       // Don't close the file descriptor, the child might not like SIGPIPE,
568       // just read and throw the data away.
569       return discardRead(pfd);
570     }
571   };
572
573   auto writeCallback = [&, flags] (int pfd, int cfd) {
574     if (cfd == 0 && (flags & WRITE_STDIN)) {
575       return handleWrite(pfd, data);
576     } else {
577       // If we don't want to write to this fd, just close it.
578       return false;
579     }
580   };
581
582   communicate(std::move(readCallback), std::move(writeCallback));
583
584   return out;
585 }
586
587 void Subprocess::communicate(FdCallback readCallback,
588                              FdCallback writeCallback) {
589   returnCode_.enforce(ProcessReturnCode::RUNNING);
590   setAllNonBlocking();
591
592   std::vector<pollfd> fds;
593   fds.reserve(pipes_.size());
594   std::vector<int> toClose;
595   toClose.reserve(pipes_.size());
596
597   while (!pipes_.empty()) {
598     fds.clear();
599     toClose.clear();
600
601     for (auto& p : pipes_) {
602       pollfd pfd;
603       pfd.fd = p.parentFd;
604       // Yes, backwards, PIPE_IN / PIPE_OUT are defined from the
605       // child's point of view.
606       pfd.events = (p.direction == PIPE_IN ?  POLLOUT : POLLIN);
607       fds.push_back(pfd);
608     }
609
610     int r;
611     do {
612       r = ::poll(fds.data(), fds.size(), -1);
613     } while (r == -1 && errno == EINTR);
614     checkUnixError(r, "poll");
615
616     for (int i = 0; i < pipes_.size(); ++i) {
617       auto& p = pipes_[i];
618       DCHECK_EQ(fds[i].fd, p.parentFd);
619       short events = fds[i].revents;
620
621       bool closed = false;
622       if (events & POLLOUT) {
623         DCHECK(!(events & POLLIN));
624         if (writeCallback(p.parentFd, p.childFd)) {
625           toClose.push_back(i);
626           closed = true;
627         }
628       }
629
630       if (events & POLLIN) {
631         DCHECK(!(events & POLLOUT));
632         if (readCallback(p.parentFd, p.childFd)) {
633           toClose.push_back(i);
634           closed = true;
635         }
636       }
637
638       if ((events & (POLLHUP | POLLERR)) && !closed) {
639         toClose.push_back(i);
640         closed = true;
641       }
642     }
643
644     // Close the fds in reverse order so the indexes hold after erase()
645     for (int idx : boost::adaptors::reverse(toClose)) {
646       auto pos = pipes_.begin() + idx;
647       closeChecked(pos->parentFd);
648       pipes_.erase(pos);
649     }
650   }
651 }
652
653 int Subprocess::findByChildFd(int childFd) const {
654   auto pos = std::lower_bound(
655       pipes_.begin(), pipes_.end(), childFd,
656       [] (const PipeInfo& info, int fd) { return info.childFd < fd; });
657   if (pos == pipes_.end() || pos->childFd != childFd) {
658     throw std::invalid_argument(folly::to<std::string>(
659         "child fd not found ", childFd));
660   }
661   return pos - pipes_.begin();
662 }
663
664 void Subprocess::closeParentFd(int childFd) {
665   int idx = findByChildFd(childFd);
666   closeChecked(pipes_[idx].parentFd);
667   pipes_.erase(pipes_.begin() + idx);
668 }
669
670 namespace {
671
672 class Initializer {
673  public:
674   Initializer() {
675     // We like EPIPE, thanks.
676     ::signal(SIGPIPE, SIG_IGN);
677   }
678 };
679
680 Initializer initializer;
681
682 }  // namespace
683
684 }  // namespace folly
685