AsyncIO in folly
[folly.git] / folly / Subprocess.cpp
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "folly/Subprocess.h"
18
19 #include <fcntl.h>
20 #include <poll.h>
21 #include <unistd.h>
22 #include <wait.h>
23
24 #include <array>
25 #include <algorithm>
26 #include <system_error>
27
28 #include <boost/container/flat_set.hpp>
29 #include <boost/range/adaptors.hpp>
30
31 #include <glog/logging.h>
32
33 #include "folly/Conv.h"
34 #include "folly/Exception.h"
35 #include "folly/ScopeGuard.h"
36 #include "folly/String.h"
37 #include "folly/io/Cursor.h"
38
39 extern char** environ;
40
41 namespace folly {
42
43 ProcessReturnCode::State ProcessReturnCode::state() const {
44   if (rawStatus_ == RV_NOT_STARTED) return NOT_STARTED;
45   if (rawStatus_ == RV_RUNNING) return RUNNING;
46   if (WIFEXITED(rawStatus_)) return EXITED;
47   if (WIFSIGNALED(rawStatus_)) return KILLED;
48   throw std::runtime_error(to<std::string>(
49       "Invalid ProcessReturnCode: ", rawStatus_));
50 }
51
52 void ProcessReturnCode::enforce(State s) const {
53   if (state() != s) {
54     throw std::logic_error(to<std::string>("Invalid state ", s));
55   }
56 }
57
58 int ProcessReturnCode::exitStatus() const {
59   enforce(EXITED);
60   return WEXITSTATUS(rawStatus_);
61 }
62
63 int ProcessReturnCode::killSignal() const {
64   enforce(KILLED);
65   return WTERMSIG(rawStatus_);
66 }
67
68 bool ProcessReturnCode::coreDumped() const {
69   enforce(KILLED);
70   return WCOREDUMP(rawStatus_);
71 }
72
73 std::string ProcessReturnCode::str() const {
74   switch (state()) {
75   case NOT_STARTED:
76     return "not started";
77   case RUNNING:
78     return "running";
79   case EXITED:
80     return to<std::string>("exited with status ", exitStatus());
81   case KILLED:
82     return to<std::string>("killed by signal ", killSignal(),
83                            (coreDumped() ? " (core dumped)" : ""));
84   }
85   CHECK(false);  // unreached
86 }
87
88 CalledProcessError::CalledProcessError(ProcessReturnCode rc)
89   : returnCode_(rc),
90     what_(returnCode_.str()) {
91 }
92
93 namespace {
94
95 // Copy pointers to the given strings in a format suitable for posix_spawn
96 std::unique_ptr<const char*[]> cloneStrings(const std::vector<std::string>& s) {
97   std::unique_ptr<const char*[]> d(new const char*[s.size() + 1]);
98   for (int i = 0; i < s.size(); i++) {
99     d[i] = s[i].c_str();
100   }
101   d[s.size()] = nullptr;
102   return d;
103 }
104
105 // Check a wait() status, throw on non-successful
106 void checkStatus(ProcessReturnCode returnCode) {
107   if (returnCode.state() != ProcessReturnCode::EXITED ||
108       returnCode.exitStatus() != 0) {
109     throw CalledProcessError(returnCode);
110   }
111 }
112
113 }  // namespace
114
115 Subprocess::Options& Subprocess::Options::fd(int fd, int action) {
116   if (action == Subprocess::PIPE) {
117     if (fd == 0) {
118       action = Subprocess::PIPE_IN;
119     } else if (fd == 1 || fd == 2) {
120       action = Subprocess::PIPE_OUT;
121     } else {
122       throw std::invalid_argument(
123           to<std::string>("Only fds 0, 1, 2 are valid for action=PIPE: ", fd));
124     }
125   }
126   fdActions_[fd] = action;
127   return *this;
128 }
129
130 Subprocess::Subprocess(
131     const std::vector<std::string>& argv,
132     const Options& options,
133     const char* executable,
134     const std::vector<std::string>* env)
135   : pid_(-1),
136     returnCode_(RV_NOT_STARTED) {
137   if (argv.empty()) {
138     throw std::invalid_argument("argv must not be empty");
139   }
140   if (!executable) executable = argv[0].c_str();
141   spawn(cloneStrings(argv), executable, options, env);
142 }
143
144 Subprocess::Subprocess(
145     const std::string& cmd,
146     const Options& options,
147     const std::vector<std::string>* env)
148   : pid_(-1),
149     returnCode_(RV_NOT_STARTED) {
150   if (options.usePath_) {
151     throw std::invalid_argument("usePath() not allowed when running in shell");
152   }
153   const char* shell = getenv("SHELL");
154   if (!shell) {
155     shell = "/bin/sh";
156   }
157
158   std::unique_ptr<const char*[]> argv(new const char*[4]);
159   argv[0] = shell;
160   argv[1] = "-c";
161   argv[2] = cmd.c_str();
162   argv[3] = nullptr;
163   spawn(std::move(argv), shell, options, env);
164 }
165
166 Subprocess::~Subprocess() {
167   CHECK_NE(returnCode_.state(), ProcessReturnCode::RUNNING)
168     << "Subprocess destroyed without reaping child";
169 }
170
171 namespace {
172 void closeChecked(int fd) {
173   checkUnixError(::close(fd), "close");
174 }
175 }  // namespace
176
177 void Subprocess::closeAll() {
178   for (auto& p : pipes_) {
179     closeChecked(p.parentFd);
180   }
181   pipes_.clear();
182 }
183
184 void Subprocess::setAllNonBlocking() {
185   for (auto& p : pipes_) {
186     int fd = p.parentFd;
187     int flags = ::fcntl(fd, F_GETFL);
188     checkUnixError(flags, "fcntl");
189     int r = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
190     checkUnixError(r, "fcntl");
191   }
192 }
193
194 void Subprocess::spawn(
195     std::unique_ptr<const char*[]> argv,
196     const char* executable,
197     const Options& optionsIn,
198     const std::vector<std::string>* env) {
199   if (optionsIn.usePath_ && env) {
200     throw std::invalid_argument(
201         "usePath() not allowed when overriding environment");
202   }
203
204   // Make a copy, we'll mutate options
205   Options options(optionsIn);
206
207   // Parent work, pre-fork: create pipes
208   std::vector<int> childFds;
209   for (auto& p : options.fdActions_) {
210     if (p.second == PIPE_IN || p.second == PIPE_OUT) {
211       int fds[2];
212       int r = ::pipe(fds);
213       checkUnixError(r, "pipe");
214       PipeInfo pinfo;
215       pinfo.direction = p.second;
216       int cfd;
217       if (p.second == PIPE_IN) {
218         // Child gets reading end
219         pinfo.parentFd = fds[1];
220         cfd = fds[0];
221       } else {
222         pinfo.parentFd = fds[0];
223         cfd = fds[1];
224       }
225       p.second = cfd;  // ensure it gets dup2()ed
226       pinfo.childFd = p.first;
227       childFds.push_back(cfd);
228       pipes_.push_back(pinfo);
229     }
230   }
231
232   // This should already be sorted, as options.fdActions_ is
233   DCHECK(std::is_sorted(pipes_.begin(), pipes_.end()));
234
235   // Note that the const casts below are legit, per
236   // http://pubs.opengroup.org/onlinepubs/009695399/functions/exec.html
237
238   char** argVec = const_cast<char**>(argv.get());
239
240   // Set up environment
241   std::unique_ptr<const char*[]> envHolder;
242   char** envVec;
243   if (env) {
244     envHolder = cloneStrings(*env);
245     envVec = const_cast<char**>(envHolder.get());
246   } else {
247     envVec = environ;
248   }
249
250   // Block all signals around vfork; see http://ewontfix.com/7/.
251   //
252   // As the child may run in the same address space as the parent until
253   // the actual execve() system call, any (custom) signal handlers that
254   // the parent has might alter parent's memory if invoked in the child,
255   // with undefined results.  So we block all signals in the parent before
256   // vfork(), which will cause them to be blocked in the child as well (we
257   // rely on the fact that Linux, just like all sane implementations, only
258   // clones the calling thread).  Then, in the child, we reset all signals
259   // to their default dispositions (while still blocked), and unblock them
260   // (so the exec()ed process inherits the parent's signal mask)
261   //
262   // The parent also unblocks all signals as soon as vfork() returns.
263   sigset_t allBlocked;
264   int r = ::sigfillset(&allBlocked);
265   checkUnixError(r, "sigfillset");
266   sigset_t oldSignals;
267   r = pthread_sigmask(SIG_SETMASK, &allBlocked, &oldSignals);
268   checkPosixError(r, "pthread_sigmask");
269
270   pid_t pid = vfork();
271   if (pid == 0) {
272     // While all signals are blocked, we must reset their
273     // dispositions to default.
274     for (int sig = 1; sig < NSIG; ++sig) {
275       ::signal(sig, SIG_DFL);
276     }
277     // Unblock signals; restore signal mask.
278     int r = pthread_sigmask(SIG_SETMASK, &oldSignals, nullptr);
279     if (r != 0) abort();
280
281     runChild(executable, argVec, envVec, options);
282     // This should never return, but there's nothing else we can do here.
283     abort();
284   }
285   // In parent.  We want to restore the signal mask even if vfork fails,
286   // so we'll save errno here, restore the signal mask, and only then
287   // throw.
288   int savedErrno = errno;
289
290   // Restore signal mask; do this even if vfork fails!
291   // We only check for errors from pthread_sigmask after we recorded state
292   // that the child is alive, so we know to reap it.
293   r = pthread_sigmask(SIG_SETMASK, &oldSignals, nullptr);
294   checkUnixError(pid, savedErrno, "vfork");
295
296   // Child is alive
297   pid_ = pid;
298   returnCode_ = ProcessReturnCode(RV_RUNNING);
299
300   // Parent work, post-fork: close child's ends of pipes
301   for (int f : childFds) {
302     closeChecked(f);
303   }
304
305   checkPosixError(r, "pthread_sigmask");
306 }
307
308 namespace {
309
310 // Checked version of close() to use in the child: abort() on error
311 void childClose(int fd) {
312   int r = ::close(fd);
313   if (r == -1) abort();
314 }
315
316 // Checked version of dup2() to use in the child: abort() on error
317 void childDup2(int oldfd, int newfd) {
318   int r = ::dup2(oldfd, newfd);
319   if (r == -1) abort();
320 }
321
322 }  // namespace
323
324 void Subprocess::runChild(const char* executable,
325                           char** argv, char** env,
326                           const Options& options) const {
327   // Close parent's ends of all pipes
328   for (auto& p : pipes_) {
329     childClose(p.parentFd);
330   }
331
332   // Close all fds that we're supposed to close.
333   // Note that we're ignoring errors here, in case some of these
334   // fds were set to close on exec.
335   for (auto& p : options.fdActions_) {
336     if (p.second == CLOSE) {
337       ::close(p.first);
338     } else {
339       childDup2(p.second, p.first);
340     }
341   }
342
343   // If requested, close all other file descriptors.  Don't close
344   // any fds in options.fdActions_, and don't touch stdin, stdout, stderr.
345   // Ignore errors.
346   if (options.closeOtherFds_) {
347     for (int fd = getdtablesize() - 1; fd >= 3; --fd) {
348       if (options.fdActions_.count(fd) == 0) {
349         ::close(fd);
350       }
351     }
352   }
353
354   // Now, finally, exec.
355   int r;
356   if (options.usePath_) {
357     ::execvp(executable, argv);
358   } else {
359     ::execve(executable, argv, env);
360   }
361
362   // If we're here, something's wrong.
363   abort();
364 }
365
366 ProcessReturnCode Subprocess::poll() {
367   returnCode_.enforce(ProcessReturnCode::RUNNING);
368   DCHECK_GT(pid_, 0);
369   int status;
370   pid_t found = ::waitpid(pid_, &status, WNOHANG);
371   checkUnixError(found, "waitpid");
372   if (found != 0) {
373     returnCode_ = ProcessReturnCode(status);
374     pid_ = -1;
375   }
376   return returnCode_;
377 }
378
379 bool Subprocess::pollChecked() {
380   if (poll().state() == ProcessReturnCode::RUNNING) {
381     return false;
382   }
383   checkStatus(returnCode_);
384   return true;
385 }
386
387 ProcessReturnCode Subprocess::wait() {
388   returnCode_.enforce(ProcessReturnCode::RUNNING);
389   DCHECK_GT(pid_, 0);
390   int status;
391   pid_t found;
392   do {
393     found = ::waitpid(pid_, &status, 0);
394   } while (found == -1 && errno == EINTR);
395   checkUnixError(found, "waitpid");
396   DCHECK_EQ(found, pid_);
397   returnCode_ = ProcessReturnCode(status);
398   pid_ = -1;
399   return returnCode_;
400 }
401
402 void Subprocess::waitChecked() {
403   wait();
404   checkStatus(returnCode_);
405 }
406
407 void Subprocess::sendSignal(int signal) {
408   returnCode_.enforce(ProcessReturnCode::RUNNING);
409   int r = ::kill(pid_, signal);
410   checkUnixError(r, "kill");
411 }
412
413 pid_t Subprocess::pid() const {
414   return pid_;
415 }
416
417 namespace {
418
419 std::pair<const uint8_t*, size_t> queueFront(const IOBufQueue& queue) {
420   auto* p = queue.front();
421   if (!p) return std::make_pair(nullptr, 0);
422   return io::Cursor(p).peek();
423 }
424
425 // fd write
426 bool handleWrite(int fd, IOBufQueue& queue) {
427   for (;;) {
428     auto p = queueFront(queue);
429     if (p.second == 0) {
430       return true;  // EOF
431     }
432
433     ssize_t n;
434     do {
435       n = ::write(fd, p.first, p.second);
436     } while (n == -1 && errno == EINTR);
437     if (n == -1 && errno == EAGAIN) {
438       return false;
439     }
440     checkUnixError(n, "write");
441     queue.trimStart(n);
442   }
443 }
444
445 // fd read
446 bool handleRead(int fd, IOBufQueue& queue) {
447   for (;;) {
448     auto p = queue.preallocate(100, 65000);
449     ssize_t n;
450     do {
451       n = ::read(fd, p.first, p.second);
452     } while (n == -1 && errno == EINTR);
453     if (n == -1 && errno == EAGAIN) {
454       return false;
455     }
456     checkUnixError(n, "read");
457     if (n == 0) {
458       return true;
459     }
460     queue.postallocate(n);
461   }
462 }
463
464 bool discardRead(int fd) {
465   static const size_t bufSize = 65000;
466   // Thread unsafe, but it doesn't matter.
467   static std::unique_ptr<char[]> buf(new char[bufSize]);
468
469   for (;;) {
470     ssize_t n;
471     do {
472       n = ::read(fd, buf.get(), bufSize);
473     } while (n == -1 && errno == EINTR);
474     if (n == -1 && errno == EAGAIN) {
475       return false;
476     }
477     checkUnixError(n, "read");
478     if (n == 0) {
479       return true;
480     }
481   }
482 }
483
484 }  // namespace
485
486 std::pair<std::string, std::string> Subprocess::communicate(
487     const CommunicateFlags& flags,
488     StringPiece data) {
489   IOBufQueue dataQueue;
490   dataQueue.wrapBuffer(data.data(), data.size());
491
492   auto outQueues = communicateIOBuf(flags, std::move(dataQueue));
493   auto outBufs = std::make_pair(outQueues.first.move(),
494                                 outQueues.second.move());
495   std::pair<std::string, std::string> out;
496   if (outBufs.first) {
497     outBufs.first->coalesce();
498     out.first.assign(reinterpret_cast<const char*>(outBufs.first->data()),
499                      outBufs.first->length());
500   }
501   if (outBufs.second) {
502     outBufs.second->coalesce();
503     out.second.assign(reinterpret_cast<const char*>(outBufs.second->data()),
504                      outBufs.second->length());
505   }
506   return out;
507 }
508
509 std::pair<IOBufQueue, IOBufQueue> Subprocess::communicateIOBuf(
510     const CommunicateFlags& flags,
511     IOBufQueue data) {
512   std::pair<IOBufQueue, IOBufQueue> out;
513
514   auto readCallback = [&] (int pfd, int cfd) -> bool {
515     if (cfd == 1 && flags.readStdout_) {
516       return handleRead(pfd, out.first);
517     } else if (cfd == 2 && flags.readStderr_) {
518       return handleRead(pfd, out.second);
519     } else {
520       // Don't close the file descriptor, the child might not like SIGPIPE,
521       // just read and throw the data away.
522       return discardRead(pfd);
523     }
524   };
525
526   auto writeCallback = [&] (int pfd, int cfd) -> bool {
527     if (cfd == 0 && flags.writeStdin_) {
528       return handleWrite(pfd, data);
529     } else {
530       // If we don't want to write to this fd, just close it.
531       return false;
532     }
533   };
534
535   communicate(std::move(readCallback), std::move(writeCallback));
536
537   return out;
538 }
539
540 void Subprocess::communicate(FdCallback readCallback,
541                              FdCallback writeCallback) {
542   returnCode_.enforce(ProcessReturnCode::RUNNING);
543   setAllNonBlocking();
544
545   std::vector<pollfd> fds;
546   fds.reserve(pipes_.size());
547   std::vector<int> toClose;
548   toClose.reserve(pipes_.size());
549
550   while (!pipes_.empty()) {
551     fds.clear();
552     toClose.clear();
553
554     for (auto& p : pipes_) {
555       pollfd pfd;
556       pfd.fd = p.parentFd;
557       // Yes, backwards, PIPE_IN / PIPE_OUT are defined from the
558       // child's point of view.
559       pfd.events = (p.direction == PIPE_IN ?  POLLOUT : POLLIN);
560       fds.push_back(pfd);
561     }
562
563     int r;
564     do {
565       r = ::poll(fds.data(), fds.size(), -1);
566     } while (r == -1 && errno == EINTR);
567     checkUnixError(r, "poll");
568
569     for (int i = 0; i < pipes_.size(); ++i) {
570       auto& p = pipes_[i];
571       DCHECK_EQ(fds[i].fd, p.parentFd);
572       short events = fds[i].revents;
573
574       bool closed = false;
575       if (events & POLLOUT) {
576         DCHECK(!(events & POLLIN));
577         if (writeCallback(p.parentFd, p.childFd)) {
578           toClose.push_back(i);
579           closed = true;
580         }
581       }
582
583       if (events & POLLIN) {
584         DCHECK(!(events & POLLOUT));
585         if (readCallback(p.parentFd, p.childFd)) {
586           toClose.push_back(i);
587           closed = true;
588         }
589       }
590
591       if ((events & (POLLHUP | POLLERR)) && !closed) {
592         toClose.push_back(i);
593         closed = true;
594       }
595     }
596
597     // Close the fds in reverse order so the indexes hold after erase()
598     for (int idx : boost::adaptors::reverse(toClose)) {
599       auto pos = pipes_.begin() + idx;
600       closeChecked(pos->parentFd);
601       pipes_.erase(pos);
602     }
603   }
604 }
605
606 int Subprocess::findByChildFd(int childFd) const {
607   auto pos = std::lower_bound(
608       pipes_.begin(), pipes_.end(), childFd,
609       [] (const PipeInfo& info, int fd) { return info.childFd < fd; });
610   if (pos == pipes_.end() || pos->childFd != childFd) {
611     throw std::invalid_argument(folly::to<std::string>(
612         "child fd not found ", childFd));
613   }
614   return pos - pipes_.begin();
615 }
616
617 void Subprocess::closeParentFd(int childFd) {
618   int idx = findByChildFd(childFd);
619   closeChecked(pipes_[idx].parentFd);
620   pipes_.erase(pipes_.begin() + idx);
621 }
622
623 namespace {
624
625 class Initializer {
626  public:
627   Initializer() {
628     // We like EPIPE, thanks.
629     ::signal(SIGPIPE, SIG_IGN);
630   }
631 };
632
633 Initializer initializer;
634
635 }  // namespace
636
637 }  // namespace folly
638