24 #include <sys/prctl.h> 30 #include <system_error> 32 #include <boost/container/flat_set.hpp> 33 #include <boost/range/adaptors.hpp> 35 #include <glog/logging.h> 55 if (!WIFEXITED(status) && !WIFSIGNALED(status)) {
56 throw std::runtime_error(
57 to<std::string>(
"Invalid ProcessReturnCode: ", status));
93 throw std::logic_error(to<std::string>(
94 "Bad use of ProcessReturnCode; state is ", s,
" expected ", expected));
116 return "not started";
120 return to<std::string>(
"exited with status ",
exitStatus());
122 return to<std::string>(
134 char const* executable,
138 :
"error preparing to execute ";
139 return to<std::string>(
prefix, executable,
": ",
errnoStr(errnoValue));
143 const char* executable,
148 errnoValue_(errnoValue) {}
153 std::unique_ptr<const char* []> cloneStrings(
154 const std::vector<std::string>&
s) {
155 std::unique_ptr<const char*[]> d(
new const char*[s.size() + 1]);
156 for (
size_t i = 0;
i < s.size();
i++) {
159 d[s.size()] =
nullptr;
177 }
else if (fd == 1 || fd == 2) {
180 throw std::invalid_argument(
181 to<std::string>(
"Only fds 0, 1, 2 are valid for action=PIPE: ", fd));
191 const std::vector<std::string>&
argv,
193 const char* executable,
194 const std::vector<std::string>* env) {
196 throw std::invalid_argument(
"argv must not be empty");
199 executable = argv[0].c_str();
201 spawn(cloneStrings(argv), executable, options, env);
207 const std::vector<std::string>* env) {
209 throw std::invalid_argument(
"usePath() not allowed when running in shell");
212 std::vector<std::string>
argv = {
"/bin/sh",
"-c", cmd};
213 spawn(cloneStrings(argv), argv[0].c_str(), options, env);
218 <<
"Subprocess destroyed without reaping child";
223 struct ChildErrorInfo {
240 for (
auto& p : pipes_) {
241 int fd = p.pipe.fd();
242 int flags = ::fcntl(fd, F_GETFL);
244 int r = ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
250 std::unique_ptr<
const char*[]>
argv,
251 const char* executable,
253 const std::vector<std::string>* env) {
255 throw std::invalid_argument(
256 "usePath() not allowed when overriding environment");
263 auto pipesGuard =
makeGuard([
this] { pipes_.clear(); });
274 CHECK_ERR(::
close(errFds[0]));
275 if (errFds[1] >= 0) {
276 CHECK_ERR(::
close(errFds[1]));
280 #if !FOLLY_HAVE_PIPE2 282 checkUnixError(fcntl(errFds[0], F_SETFD, FD_CLOEXEC),
"set FD_CLOEXEC");
287 checkUnixError(fcntl(errFds[1], F_SETFD, FD_CLOEXEC),
"set FD_CLOEXEC");
292 spawnInternal(
std::move(
argv), executable, options, env, errFds[1]);
304 CHECK_ERR(::
close(errFds[1]));
309 readChildErrorPipe(errFds[0], executable);
318 pipesGuard.dismiss();
329 const char* executable,
334 std::vector<int> childFds;
338 for (
int cfd : childFds) {
339 CHECK_ERR(::
close(cfd));
344 for (
auto& p : options.fdActions_) {
345 if (p.second == PIPE_IN || p.second == PIPE_OUT) {
354 r = ::pipe2(fds, O_CLOEXEC);
359 r = fcntl(fds[0], F_SETFD, FD_CLOEXEC);
361 r = fcntl(fds[1], F_SETFD, FD_CLOEXEC);
364 pipes_.emplace_back();
368 if (p.second == PIPE_IN) {
378 childFds.push_back(cfd);
383 DCHECK(std::is_sorted(pipes_.begin(), pipes_.end()));
388 char** argVec =
const_cast<char**
>(argv.get());
391 std::unique_ptr<const char*[]> envHolder;
394 envHolder = cloneStrings(*env);
395 envVec =
const_cast<char**
>(envHolder.get());
414 r = sigfillset(&allBlocked);
418 r = pthread_sigmask(SIG_SETMASK, &allBlocked, &oldSignals);
422 r = pthread_sigmask(SIG_SETMASK, &oldSignals,
nullptr);
423 CHECK_EQ(r, 0) <<
"pthread_sigmask: " <<
errnoStr(r);
427 const char* childDir =
428 options.childDir_.empty() ?
nullptr : options.childDir_.c_str();
432 if (options.cloneFlags_) {
433 pid = syscall(SYS_clone, *options.cloneFlags_, 0,
nullptr,
nullptr);
436 if (options.detach_) {
450 if (options.detach_) {
452 if (options.cloneFlags_) {
453 pid = syscall(SYS_clone, *options.cloneFlags_, 0,
nullptr,
nullptr);
463 }
else if (pid != 0) {
473 int errnoValue = prepareChild(options, &oldSignals, childDir);
474 if (errnoValue != 0) {
478 errnoValue =
runChild(executable, argVec, envVec, options);
497 const sigset_t* sigmask,
498 const char* childDir)
const {
502 ::signal(
sig, SIG_DFL);
507 int r = pthread_sigmask(SIG_SETMASK, sigmask,
nullptr);
515 if (::chdir(childDir) == -1) {
526 if (p.second == CLOSE) {
527 if (::
close(p.first) == -1) {
530 }
else if (p.second != p.first) {
531 if (::dup2(p.second, p.first) == -1) {
541 for (
int fd = getdtablesize() - 1; fd >= 3; --fd) {
550 if (options.parentDeathSignal_ != 0) {
551 const auto parentDeathSignal =
552 static_cast<unsigned long>(options.parentDeathSignal_);
553 if (prctl(PR_SET_PDEATHSIG, parentDeathSignal, 0, 0, 0) == -1) {
560 if (setpgrp() == -1) {
576 const char* executable,
579 const Options& options)
const {
582 ::execvp(executable, argv);
584 ::execve(executable, argv, env);
591 auto rc =
readNoInt(pfd, &info,
sizeof(info));
596 }
else if (rc !=
sizeof(ChildErrorInfo)) {
605 LOG(ERROR) <<
"unexpected error trying to read from child error pipe " 606 <<
"rc=" << rc <<
", errno=" << errno;
622 pid_t found = ::wait4(pid_, &status, WNOHANG, ru);
626 PCHECK(found != -1) <<
"waitpid(" << pid_ <<
", &status, WNOHANG)";
640 checkStatus(returnCode_);
650 found = ::waitpid(pid_, &status, 0);
651 }
while (found == -1 && errno == EINTR);
654 PCHECK(found != -1) <<
"waitpid(" << pid_ <<
", &status, WNOHANG)";
657 DCHECK_EQ(found, pid_);
665 checkStatus(returnCode_);
670 int r = ::kill(pid_, signal);
681 auto* p = queue.
front();
691 auto b = queueFront(queue);
697 if (n == -1 && errno == EAGAIN) {
709 ssize_t n =
readNoInt(fd, p.first, p.second);
710 if (n == -1 && errno == EAGAIN) {
721 bool discardRead(
int fd) {
722 static const size_t bufSize = 65000;
724 static std::unique_ptr<char[]> buf(
new char[bufSize]);
727 ssize_t n =
readNoInt(fd, buf.get(), bufSize);
728 if (n == -1 && errno == EAGAIN) {
744 auto outQueues = communicateIOBuf(
std::move(inputQueue));
746 std::make_pair(outQueues.first.move(), outQueues.second.move());
747 std::pair<std::string, std::string> out;
749 outBufs.first->coalesce();
751 reinterpret_cast<const char*>(outBufs.first->data()),
752 outBufs.first->length());
754 if (outBufs.second) {
755 outBufs.second->coalesce();
757 reinterpret_cast<const char*>(outBufs.second->data()),
758 outBufs.second->length());
767 if (!input.
empty()) {
770 findByChildFd(STDIN_FILENO);
773 std::pair<IOBufQueue, IOBufQueue> out;
775 auto readCallback = [&](
int pfd,
int cfd) ->
bool {
776 if (cfd == STDOUT_FILENO) {
777 return handleRead(pfd, out.first);
778 }
else if (cfd == STDERR_FILENO) {
779 return handleRead(pfd, out.second);
783 return discardRead(pfd);
787 auto writeCallback = [&](
int pfd,
int cfd) ->
bool {
788 if (cfd == STDIN_FILENO) {
789 return handleWrite(pfd, input);
809 std::vector<pollfd> fds;
810 fds.reserve(pipes_.size());
811 std::vector<size_t> toClose;
812 toClose.reserve(pipes_.size());
814 while (!pipes_.empty()) {
818 for (
auto& p : pipes_) {
820 pfd.fd = p.pipe.fd();
827 }
else if (p.direction == PIPE_IN) {
828 pfd.events = POLLOUT;
837 r =
::poll(fds.data(), fds.size(), -1);
838 }
while (r == -1 && errno == EINTR);
841 for (
size_t i = 0;
i < pipes_.size(); ++
i) {
843 auto parentFd = p.pipe.fd();
844 DCHECK_EQ(fds[
i].fd, parentFd);
845 short events = fds[
i].revents;
848 if (events & POLLOUT) {
849 DCHECK(!(events & POLLIN));
850 if (writeCallback(parentFd, p.childFd)) {
851 toClose.push_back(
i);
858 if (events & (POLLIN | POLLHUP)) {
859 DCHECK(!(events & POLLOUT));
860 if (readCallback(parentFd, p.childFd)) {
861 toClose.push_back(
i);
866 if ((events & (POLLHUP | POLLERR)) && !closed) {
867 toClose.push_back(
i);
873 for (
int idx : boost::adaptors::reverse(toClose)) {
874 auto pos = pipes_.begin() + idx;
882 pipes_[findByChildFd(childFd)].enabled = enabled;
886 return pipes_[findByChildFd(childFd)].enabled;
890 auto pos = std::lower_bound(
891 pipes_.begin(), pipes_.end(), childFd, [](
const Pipe&
pipe,
int fd) {
892 return pipe.childFd < fd;
894 if (pos == pipes_.end() || pos->childFd != childFd) {
895 throw std::invalid_argument(
896 folly::to<std::string>(
"child fd not found ", childFd));
898 return pos - pipes_.begin();
902 int idx = findByChildFd(childFd);
903 pipes_[idx].pipe.close();
904 pipes_.erase(pipes_.begin() + idx);
908 std::vector<Subprocess::ChildPipe> pipes;
909 for (
auto& p : pipes_) {
910 pipes.emplace_back(p.childFd,
std::move(p.pipe));
913 std::vector<Pipe>().
swap(pipes_);
923 ::signal(SIGPIPE, SIG_IGN);
927 Initializer initializer;
static constexpr int RV_RUNNING
ProcessReturnCode & operator=(const ProcessReturnCode &p)=default
const folly::IOBuf * front() const
int returnCode(int value)
void readChildErrorPipe(int pfd, const char *executable)
#define FOLLY_POP_WARNING
static ProcessReturnCode makeRunning()
size_t findByChildFd(const int childFd) const
static constexpr int RV_NOT_STARTED
#define FOLLY_PUSH_WARNING
static ProcessReturnCode make(int status)
constexpr detail::Map< Move > move
constexpr int kExecFailure
ssize_t readNoInt(int fd, void *buf, size_t count)
Options & fd(int fd, int action)
constexpr size_type size() const
std::pair< std::string, std::string > communicate(StringPiece input=StringPiece())
void runChild(const char *file)
void spawn(std::unique_ptr< const char *[]> argv, const char *executable, const Options &options, const std::vector< std::string > *env)
—— Concurrent Priority Queue Implementation ——
requires E e noexcept(noexcept(s.error(std::move(e))))
bool prefix(Cursor &c, uint32_t expected)
CalledProcessError(ProcessReturnCode rc)
requires And< SemiMovable< VN >... > &&SemiMovable< E > auto error(E e)
static const int PIPE_OUT
std::pair< void *, std::size_t > preallocate(std::size_t min, std::size_t newAllocationSize, std::size_t max=std::numeric_limits< std::size_t >::max())
DangerousPostForkPreExecCallback * dangerousPostForkPreExecCallback_
void closeParentFd(int childFd)
static std::string toSubprocessSpawnErrorMessage(char const *executable, int errCode, int errnoValue)
void enableNotifications(int childFd, bool enabled)
FOLLY_ALWAYS_INLINE void assume_unreachable()
void checkPosixError(int err, Args &&...args)
std::pair< IOBufQueue, IOBufQueue > communicateIOBuf(IOBufQueue input=IOBufQueue())
int runChild(const char *executable, char **argv, char **env, const Options &options) const
bool notificationsEnabled(int childFd) const
constexpr Iter data() const
void sendSignal(int signal)
ssize_t writeNoInt(int fd, const void *buf, size_t count)
void wrapBuffer(const void *buf, size_t len, std::size_t blockSize=(1U<< 31))
constexpr detail::Sig< Sig > const sig
bool wait(Waiter *waiter, bool shouldSleep, Waiter *&next)
SubprocessSpawnError(const char *executable, int errCode, int errnoValue)
fbstring errnoStr(int err)
void checkUnixError(ssize_t ret, Args &&...args)
void swap(exception_wrapper &a, exception_wrapper &b) noexcept
FOLLY_NODISCARD detail::ScopeGuardImplDecay< F, true > makeGuard(F &&f) noexcept(noexcept(detail::ScopeGuardImplDecay< F, true >(static_cast< F && >(f))))
std::vector< ChildPipe > takeOwnershipOfPipes()
int poll(PollDescriptor fds[], nfds_t nfds, int timeout)
void enforce(State state) const
void trimStart(size_t amount)
int prepareChild(const Options &options, const sigset_t *sigmask, const char *childDir) const
void postallocate(std::size_t n)
int close(NetworkSocket s)
ProcessReturnCode poll(struct rusage *ru=nullptr)
constexpr int kChildFailure
void pipe(CPUExecutor cpu, IOExecutor io)
#define FOLLY_GCC_DISABLE_WARNING(warningName)