git/run-command.c
Brandon Williams e3a434468f run-command: use the async-signal-safe execv instead of execvp
Convert the function used to exec from 'execvp()' to 'execv()' as the (p)
variant of exec isn't async-signal-safe and has the potential to call malloc
during the path resolution it performs.  Instead we simply do the path
resolution ourselves during the preparation stage prior to forking.  There also
don't exist any portable (p) variants which also take in an environment to use
in the exec'd process.  This allows easy migration to using 'execve()' in a
future patch.

Also, as noted in [1], in the event of an ENOEXEC the (p) variants of
exec will attempt to execute the command by interpreting it with the
'sh' utility.  To maintain this functionality, if 'execv()' fails with
ENOEXEC, start_command will atempt to execute the command by
interpreting it with 'sh'.

[1] http://pubs.opengroup.org/onlinepubs/009695399/functions/exec.html

Signed-off-by: Brandon Williams <bmwill@google.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
2017-04-20 17:55:32 -07:00

1422 lines
30 KiB
C

#include "cache.h"
#include "run-command.h"
#include "exec_cmd.h"
#include "sigchain.h"
#include "argv-array.h"
#include "thread-utils.h"
#include "strbuf.h"
void child_process_init(struct child_process *child)
{
memset(child, 0, sizeof(*child));
argv_array_init(&child->args);
argv_array_init(&child->env_array);
}
void child_process_clear(struct child_process *child)
{
argv_array_clear(&child->args);
argv_array_clear(&child->env_array);
}
struct child_to_clean {
pid_t pid;
struct child_process *process;
struct child_to_clean *next;
};
static struct child_to_clean *children_to_clean;
static int installed_child_cleanup_handler;
static void cleanup_children(int sig, int in_signal)
{
struct child_to_clean *children_to_wait_for = NULL;
while (children_to_clean) {
struct child_to_clean *p = children_to_clean;
children_to_clean = p->next;
if (p->process && !in_signal) {
struct child_process *process = p->process;
if (process->clean_on_exit_handler) {
trace_printf(
"trace: run_command: running exit handler for pid %"
PRIuMAX, (uintmax_t)p->pid
);
process->clean_on_exit_handler(process);
}
}
kill(p->pid, sig);
if (p->process && p->process->wait_after_clean) {
p->next = children_to_wait_for;
children_to_wait_for = p;
} else {
if (!in_signal)
free(p);
}
}
while (children_to_wait_for) {
struct child_to_clean *p = children_to_wait_for;
children_to_wait_for = p->next;
while (waitpid(p->pid, NULL, 0) < 0 && errno == EINTR)
; /* spin waiting for process exit or error */
if (!in_signal)
free(p);
}
}
static void cleanup_children_on_signal(int sig)
{
cleanup_children(sig, 1);
sigchain_pop(sig);
raise(sig);
}
static void cleanup_children_on_exit(void)
{
cleanup_children(SIGTERM, 0);
}
static void mark_child_for_cleanup(pid_t pid, struct child_process *process)
{
struct child_to_clean *p = xmalloc(sizeof(*p));
p->pid = pid;
p->process = process;
p->next = children_to_clean;
children_to_clean = p;
if (!installed_child_cleanup_handler) {
atexit(cleanup_children_on_exit);
sigchain_push_common(cleanup_children_on_signal);
installed_child_cleanup_handler = 1;
}
}
static void clear_child_for_cleanup(pid_t pid)
{
struct child_to_clean **pp;
for (pp = &children_to_clean; *pp; pp = &(*pp)->next) {
struct child_to_clean *clean_me = *pp;
if (clean_me->pid == pid) {
*pp = clean_me->next;
free(clean_me);
return;
}
}
}
static inline void close_pair(int fd[2])
{
close(fd[0]);
close(fd[1]);
}
#ifndef GIT_WINDOWS_NATIVE
static inline void dup_devnull(int to)
{
int fd = open("/dev/null", O_RDWR);
if (fd < 0)
die_errno(_("open /dev/null failed"));
if (dup2(fd, to) < 0)
die_errno(_("dup2(%d,%d) failed"), fd, to);
close(fd);
}
#endif
static char *locate_in_PATH(const char *file)
{
const char *p = getenv("PATH");
struct strbuf buf = STRBUF_INIT;
if (!p || !*p)
return NULL;
while (1) {
const char *end = strchrnul(p, ':');
strbuf_reset(&buf);
/* POSIX specifies an empty entry as the current directory. */
if (end != p) {
strbuf_add(&buf, p, end - p);
strbuf_addch(&buf, '/');
}
strbuf_addstr(&buf, file);
if (!access(buf.buf, F_OK))
return strbuf_detach(&buf, NULL);
if (!*end)
break;
p = end + 1;
}
strbuf_release(&buf);
return NULL;
}
static int exists_in_PATH(const char *file)
{
char *r = locate_in_PATH(file);
free(r);
return r != NULL;
}
int sane_execvp(const char *file, char * const argv[])
{
if (!execvp(file, argv))
return 0; /* cannot happen ;-) */
/*
* When a command can't be found because one of the directories
* listed in $PATH is unsearchable, execvp reports EACCES, but
* careful usability testing (read: analysis of occasional bug
* reports) reveals that "No such file or directory" is more
* intuitive.
*
* We avoid commands with "/", because execvp will not do $PATH
* lookups in that case.
*
* The reassignment of EACCES to errno looks like a no-op below,
* but we need to protect against exists_in_PATH overwriting errno.
*/
if (errno == EACCES && !strchr(file, '/'))
errno = exists_in_PATH(file) ? EACCES : ENOENT;
else if (errno == ENOTDIR && !strchr(file, '/'))
errno = ENOENT;
return -1;
}
static const char **prepare_shell_cmd(struct argv_array *out, const char **argv)
{
if (!argv[0])
die("BUG: shell command is empty");
if (strcspn(argv[0], "|&;<>()$`\\\"' \t\n*?[#~=%") != strlen(argv[0])) {
#ifndef GIT_WINDOWS_NATIVE
argv_array_push(out, SHELL_PATH);
#else
argv_array_push(out, "sh");
#endif
argv_array_push(out, "-c");
/*
* If we have no extra arguments, we do not even need to
* bother with the "$@" magic.
*/
if (!argv[1])
argv_array_push(out, argv[0]);
else
argv_array_pushf(out, "%s \"$@\"", argv[0]);
}
argv_array_pushv(out, argv);
return out->argv;
}
#ifndef GIT_WINDOWS_NATIVE
static int child_notifier = -1;
static void notify_parent(void)
{
/*
* execvp failed. If possible, we'd like to let start_command
* know, so failures like ENOENT can be handled right away; but
* otherwise, finish_command will still report the error.
*/
xwrite(child_notifier, "", 1);
}
static void prepare_cmd(struct argv_array *out, const struct child_process *cmd)
{
if (!cmd->argv[0])
die("BUG: command is empty");
/*
* Add SHELL_PATH so in the event exec fails with ENOEXEC we can
* attempt to interpret the command with 'sh'.
*/
argv_array_push(out, SHELL_PATH);
if (cmd->git_cmd) {
argv_array_push(out, "git");
argv_array_pushv(out, cmd->argv);
} else if (cmd->use_shell) {
prepare_shell_cmd(out, cmd->argv);
} else {
argv_array_pushv(out, cmd->argv);
}
/*
* If there are no '/' characters in the command then perform a path
* lookup and use the resolved path as the command to exec. If there
* are no '/' characters or if the command wasn't found in the path,
* have exec attempt to invoke the command directly.
*/
if (!strchr(out->argv[1], '/')) {
char *program = locate_in_PATH(out->argv[1]);
if (program) {
free((char *)out->argv[1]);
out->argv[1] = program;
}
}
}
#endif
static inline void set_cloexec(int fd)
{
int flags = fcntl(fd, F_GETFD);
if (flags >= 0)
fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
}
static int wait_or_whine(pid_t pid, const char *argv0, int in_signal)
{
int status, code = -1;
pid_t waiting;
int failed_errno = 0;
while ((waiting = waitpid(pid, &status, 0)) < 0 && errno == EINTR)
; /* nothing */
if (in_signal)
return 0;
if (waiting < 0) {
failed_errno = errno;
error_errno("waitpid for %s failed", argv0);
} else if (waiting != pid) {
error("waitpid is confused (%s)", argv0);
} else if (WIFSIGNALED(status)) {
code = WTERMSIG(status);
if (code != SIGINT && code != SIGQUIT && code != SIGPIPE)
error("%s died of signal %d", argv0, code);
/*
* This return value is chosen so that code & 0xff
* mimics the exit code that a POSIX shell would report for
* a program that died from this signal.
*/
code += 128;
} else if (WIFEXITED(status)) {
code = WEXITSTATUS(status);
/*
* Convert special exit code when execvp failed.
*/
if (code == 127) {
code = -1;
failed_errno = ENOENT;
}
} else {
error("waitpid is confused (%s)", argv0);
}
clear_child_for_cleanup(pid);
errno = failed_errno;
return code;
}
int start_command(struct child_process *cmd)
{
int need_in, need_out, need_err;
int fdin[2], fdout[2], fderr[2];
int failed_errno;
char *str;
if (!cmd->argv)
cmd->argv = cmd->args.argv;
if (!cmd->env)
cmd->env = cmd->env_array.argv;
/*
* In case of errors we must keep the promise to close FDs
* that have been passed in via ->in and ->out.
*/
need_in = !cmd->no_stdin && cmd->in < 0;
if (need_in) {
if (pipe(fdin) < 0) {
failed_errno = errno;
if (cmd->out > 0)
close(cmd->out);
str = "standard input";
goto fail_pipe;
}
cmd->in = fdin[1];
}
need_out = !cmd->no_stdout
&& !cmd->stdout_to_stderr
&& cmd->out < 0;
if (need_out) {
if (pipe(fdout) < 0) {
failed_errno = errno;
if (need_in)
close_pair(fdin);
else if (cmd->in)
close(cmd->in);
str = "standard output";
goto fail_pipe;
}
cmd->out = fdout[0];
}
need_err = !cmd->no_stderr && cmd->err < 0;
if (need_err) {
if (pipe(fderr) < 0) {
failed_errno = errno;
if (need_in)
close_pair(fdin);
else if (cmd->in)
close(cmd->in);
if (need_out)
close_pair(fdout);
else if (cmd->out)
close(cmd->out);
str = "standard error";
fail_pipe:
error("cannot create %s pipe for %s: %s",
str, cmd->argv[0], strerror(failed_errno));
child_process_clear(cmd);
errno = failed_errno;
return -1;
}
cmd->err = fderr[0];
}
trace_argv_printf(cmd->argv, "trace: run_command:");
fflush(NULL);
#ifndef GIT_WINDOWS_NATIVE
{
int notify_pipe[2];
struct argv_array argv = ARGV_ARRAY_INIT;
if (pipe(notify_pipe))
notify_pipe[0] = notify_pipe[1] = -1;
prepare_cmd(&argv, cmd);
cmd->pid = fork();
failed_errno = errno;
if (!cmd->pid) {
/*
* Redirect the channel to write syscall error messages to
* before redirecting the process's stderr so that all die()
* in subsequent call paths use the parent's stderr.
*/
if (cmd->no_stderr || need_err) {
int child_err = dup(2);
set_cloexec(child_err);
set_error_handle(fdopen(child_err, "w"));
}
close(notify_pipe[0]);
set_cloexec(notify_pipe[1]);
child_notifier = notify_pipe[1];
atexit(notify_parent);
if (cmd->no_stdin)
dup_devnull(0);
else if (need_in) {
dup2(fdin[0], 0);
close_pair(fdin);
} else if (cmd->in) {
dup2(cmd->in, 0);
close(cmd->in);
}
if (cmd->no_stderr)
dup_devnull(2);
else if (need_err) {
dup2(fderr[1], 2);
close_pair(fderr);
} else if (cmd->err > 1) {
dup2(cmd->err, 2);
close(cmd->err);
}
if (cmd->no_stdout)
dup_devnull(1);
else if (cmd->stdout_to_stderr)
dup2(2, 1);
else if (need_out) {
dup2(fdout[1], 1);
close_pair(fdout);
} else if (cmd->out > 1) {
dup2(cmd->out, 1);
close(cmd->out);
}
if (cmd->dir && chdir(cmd->dir))
die_errno("exec '%s': cd to '%s' failed", cmd->argv[0],
cmd->dir);
if (cmd->env) {
for (; *cmd->env; cmd->env++) {
if (strchr(*cmd->env, '='))
putenv((char *)*cmd->env);
else
unsetenv(*cmd->env);
}
}
/*
* Attempt to exec using the command and arguments starting at
* argv.argv[1]. argv.argv[0] contains SHELL_PATH which will
* be used in the event exec failed with ENOEXEC at which point
* we will try to interpret the command using 'sh'.
*/
execv(argv.argv[1], (char *const *) argv.argv + 1);
if (errno == ENOEXEC)
execv(argv.argv[0], (char *const *) argv.argv);
if (errno == ENOENT) {
if (!cmd->silent_exec_failure)
error("cannot run %s: %s", cmd->argv[0],
strerror(ENOENT));
exit(127);
} else {
die_errno("cannot exec '%s'", cmd->argv[0]);
}
}
if (cmd->pid < 0)
error_errno("cannot fork() for %s", cmd->argv[0]);
else if (cmd->clean_on_exit)
mark_child_for_cleanup(cmd->pid, cmd);
/*
* Wait for child's exec. If the exec succeeds (or if fork()
* failed), EOF is seen immediately by the parent. Otherwise, the
* child process sends a single byte.
* Note that use of this infrastructure is completely advisory,
* therefore, we keep error checks minimal.
*/
close(notify_pipe[1]);
if (read(notify_pipe[0], &notify_pipe[1], 1) == 1) {
/*
* At this point we know that fork() succeeded, but exec()
* failed. Errors have been reported to our stderr.
*/
wait_or_whine(cmd->pid, cmd->argv[0], 0);
failed_errno = errno;
cmd->pid = -1;
}
close(notify_pipe[0]);
argv_array_clear(&argv);
}
#else
{
int fhin = 0, fhout = 1, fherr = 2;
const char **sargv = cmd->argv;
struct argv_array nargv = ARGV_ARRAY_INIT;
if (cmd->no_stdin)
fhin = open("/dev/null", O_RDWR);
else if (need_in)
fhin = dup(fdin[0]);
else if (cmd->in)
fhin = dup(cmd->in);
if (cmd->no_stderr)
fherr = open("/dev/null", O_RDWR);
else if (need_err)
fherr = dup(fderr[1]);
else if (cmd->err > 2)
fherr = dup(cmd->err);
if (cmd->no_stdout)
fhout = open("/dev/null", O_RDWR);
else if (cmd->stdout_to_stderr)
fhout = dup(fherr);
else if (need_out)
fhout = dup(fdout[1]);
else if (cmd->out > 1)
fhout = dup(cmd->out);
if (cmd->git_cmd)
cmd->argv = prepare_git_cmd(&nargv, cmd->argv);
else if (cmd->use_shell)
cmd->argv = prepare_shell_cmd(&nargv, cmd->argv);
cmd->pid = mingw_spawnvpe(cmd->argv[0], cmd->argv, (char**) cmd->env,
cmd->dir, fhin, fhout, fherr);
failed_errno = errno;
if (cmd->pid < 0 && (!cmd->silent_exec_failure || errno != ENOENT))
error_errno("cannot spawn %s", cmd->argv[0]);
if (cmd->clean_on_exit && cmd->pid >= 0)
mark_child_for_cleanup(cmd->pid, cmd);
argv_array_clear(&nargv);
cmd->argv = sargv;
if (fhin != 0)
close(fhin);
if (fhout != 1)
close(fhout);
if (fherr != 2)
close(fherr);
}
#endif
if (cmd->pid < 0) {
if (need_in)
close_pair(fdin);
else if (cmd->in)
close(cmd->in);
if (need_out)
close_pair(fdout);
else if (cmd->out)
close(cmd->out);
if (need_err)
close_pair(fderr);
else if (cmd->err)
close(cmd->err);
child_process_clear(cmd);
errno = failed_errno;
return -1;
}
if (need_in)
close(fdin[0]);
else if (cmd->in)
close(cmd->in);
if (need_out)
close(fdout[1]);
else if (cmd->out)
close(cmd->out);
if (need_err)
close(fderr[1]);
else if (cmd->err)
close(cmd->err);
return 0;
}
int finish_command(struct child_process *cmd)
{
int ret = wait_or_whine(cmd->pid, cmd->argv[0], 0);
child_process_clear(cmd);
return ret;
}
int finish_command_in_signal(struct child_process *cmd)
{
return wait_or_whine(cmd->pid, cmd->argv[0], 1);
}
int run_command(struct child_process *cmd)
{
int code;
if (cmd->out < 0 || cmd->err < 0)
die("BUG: run_command with a pipe can cause deadlock");
code = start_command(cmd);
if (code)
return code;
return finish_command(cmd);
}
int run_command_v_opt(const char **argv, int opt)
{
return run_command_v_opt_cd_env(argv, opt, NULL, NULL);
}
int run_command_v_opt_cd_env(const char **argv, int opt, const char *dir, const char *const *env)
{
struct child_process cmd = CHILD_PROCESS_INIT;
cmd.argv = argv;
cmd.no_stdin = opt & RUN_COMMAND_NO_STDIN ? 1 : 0;
cmd.git_cmd = opt & RUN_GIT_CMD ? 1 : 0;
cmd.stdout_to_stderr = opt & RUN_COMMAND_STDOUT_TO_STDERR ? 1 : 0;
cmd.silent_exec_failure = opt & RUN_SILENT_EXEC_FAILURE ? 1 : 0;
cmd.use_shell = opt & RUN_USING_SHELL ? 1 : 0;
cmd.clean_on_exit = opt & RUN_CLEAN_ON_EXIT ? 1 : 0;
cmd.dir = dir;
cmd.env = env;
return run_command(&cmd);
}
#ifndef NO_PTHREADS
static pthread_t main_thread;
static int main_thread_set;
static pthread_key_t async_key;
static pthread_key_t async_die_counter;
static void *run_thread(void *data)
{
struct async *async = data;
intptr_t ret;
if (async->isolate_sigpipe) {
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGPIPE);
if (pthread_sigmask(SIG_BLOCK, &mask, NULL) < 0) {
ret = error("unable to block SIGPIPE in async thread");
return (void *)ret;
}
}
pthread_setspecific(async_key, async);
ret = async->proc(async->proc_in, async->proc_out, async->data);
return (void *)ret;
}
static NORETURN void die_async(const char *err, va_list params)
{
vreportf("fatal: ", err, params);
if (in_async()) {
struct async *async = pthread_getspecific(async_key);
if (async->proc_in >= 0)
close(async->proc_in);
if (async->proc_out >= 0)
close(async->proc_out);
pthread_exit((void *)128);
}
exit(128);
}
static int async_die_is_recursing(void)
{
void *ret = pthread_getspecific(async_die_counter);
pthread_setspecific(async_die_counter, (void *)1);
return ret != NULL;
}
int in_async(void)
{
if (!main_thread_set)
return 0; /* no asyncs started yet */
return !pthread_equal(main_thread, pthread_self());
}
static void NORETURN async_exit(int code)
{
pthread_exit((void *)(intptr_t)code);
}
#else
static struct {
void (**handlers)(void);
size_t nr;
size_t alloc;
} git_atexit_hdlrs;
static int git_atexit_installed;
static void git_atexit_dispatch(void)
{
size_t i;
for (i=git_atexit_hdlrs.nr ; i ; i--)
git_atexit_hdlrs.handlers[i-1]();
}
static void git_atexit_clear(void)
{
free(git_atexit_hdlrs.handlers);
memset(&git_atexit_hdlrs, 0, sizeof(git_atexit_hdlrs));
git_atexit_installed = 0;
}
#undef atexit
int git_atexit(void (*handler)(void))
{
ALLOC_GROW(git_atexit_hdlrs.handlers, git_atexit_hdlrs.nr + 1, git_atexit_hdlrs.alloc);
git_atexit_hdlrs.handlers[git_atexit_hdlrs.nr++] = handler;
if (!git_atexit_installed) {
if (atexit(&git_atexit_dispatch))
return -1;
git_atexit_installed = 1;
}
return 0;
}
#define atexit git_atexit
static int process_is_async;
int in_async(void)
{
return process_is_async;
}
static void NORETURN async_exit(int code)
{
exit(code);
}
#endif
void check_pipe(int err)
{
if (err == EPIPE) {
if (in_async())
async_exit(141);
signal(SIGPIPE, SIG_DFL);
raise(SIGPIPE);
/* Should never happen, but just in case... */
exit(141);
}
}
int start_async(struct async *async)
{
int need_in, need_out;
int fdin[2], fdout[2];
int proc_in, proc_out;
need_in = async->in < 0;
if (need_in) {
if (pipe(fdin) < 0) {
if (async->out > 0)
close(async->out);
return error_errno("cannot create pipe");
}
async->in = fdin[1];
}
need_out = async->out < 0;
if (need_out) {
if (pipe(fdout) < 0) {
if (need_in)
close_pair(fdin);
else if (async->in)
close(async->in);
return error_errno("cannot create pipe");
}
async->out = fdout[0];
}
if (need_in)
proc_in = fdin[0];
else if (async->in)
proc_in = async->in;
else
proc_in = -1;
if (need_out)
proc_out = fdout[1];
else if (async->out)
proc_out = async->out;
else
proc_out = -1;
#ifdef NO_PTHREADS
/* Flush stdio before fork() to avoid cloning buffers */
fflush(NULL);
async->pid = fork();
if (async->pid < 0) {
error_errno("fork (async) failed");
goto error;
}
if (!async->pid) {
if (need_in)
close(fdin[1]);
if (need_out)
close(fdout[0]);
git_atexit_clear();
process_is_async = 1;
exit(!!async->proc(proc_in, proc_out, async->data));
}
mark_child_for_cleanup(async->pid, NULL);
if (need_in)
close(fdin[0]);
else if (async->in)
close(async->in);
if (need_out)
close(fdout[1]);
else if (async->out)
close(async->out);
#else
if (!main_thread_set) {
/*
* We assume that the first time that start_async is called
* it is from the main thread.
*/
main_thread_set = 1;
main_thread = pthread_self();
pthread_key_create(&async_key, NULL);
pthread_key_create(&async_die_counter, NULL);
set_die_routine(die_async);
set_die_is_recursing_routine(async_die_is_recursing);
}
if (proc_in >= 0)
set_cloexec(proc_in);
if (proc_out >= 0)
set_cloexec(proc_out);
async->proc_in = proc_in;
async->proc_out = proc_out;
{
int err = pthread_create(&async->tid, NULL, run_thread, async);
if (err) {
error_errno("cannot create thread");
goto error;
}
}
#endif
return 0;
error:
if (need_in)
close_pair(fdin);
else if (async->in)
close(async->in);
if (need_out)
close_pair(fdout);
else if (async->out)
close(async->out);
return -1;
}
int finish_async(struct async *async)
{
#ifdef NO_PTHREADS
return wait_or_whine(async->pid, "child process", 0);
#else
void *ret = (void *)(intptr_t)(-1);
if (pthread_join(async->tid, &ret))
error("pthread_join failed");
return (int)(intptr_t)ret;
#endif
}
const char *find_hook(const char *name)
{
static struct strbuf path = STRBUF_INIT;
strbuf_reset(&path);
strbuf_git_path(&path, "hooks/%s", name);
if (access(path.buf, X_OK) < 0) {
#ifdef STRIP_EXTENSION
strbuf_addstr(&path, STRIP_EXTENSION);
if (access(path.buf, X_OK) >= 0)
return path.buf;
#endif
return NULL;
}
return path.buf;
}
int run_hook_ve(const char *const *env, const char *name, va_list args)
{
struct child_process hook = CHILD_PROCESS_INIT;
const char *p;
p = find_hook(name);
if (!p)
return 0;
argv_array_push(&hook.args, p);
while ((p = va_arg(args, const char *)))
argv_array_push(&hook.args, p);
hook.env = env;
hook.no_stdin = 1;
hook.stdout_to_stderr = 1;
return run_command(&hook);
}
int run_hook_le(const char *const *env, const char *name, ...)
{
va_list args;
int ret;
va_start(args, name);
ret = run_hook_ve(env, name, args);
va_end(args);
return ret;
}
struct io_pump {
/* initialized by caller */
int fd;
int type; /* POLLOUT or POLLIN */
union {
struct {
const char *buf;
size_t len;
} out;
struct {
struct strbuf *buf;
size_t hint;
} in;
} u;
/* returned by pump_io */
int error; /* 0 for success, otherwise errno */
/* internal use */
struct pollfd *pfd;
};
static int pump_io_round(struct io_pump *slots, int nr, struct pollfd *pfd)
{
int pollsize = 0;
int i;
for (i = 0; i < nr; i++) {
struct io_pump *io = &slots[i];
if (io->fd < 0)
continue;
pfd[pollsize].fd = io->fd;
pfd[pollsize].events = io->type;
io->pfd = &pfd[pollsize++];
}
if (!pollsize)
return 0;
if (poll(pfd, pollsize, -1) < 0) {
if (errno == EINTR)
return 1;
die_errno("poll failed");
}
for (i = 0; i < nr; i++) {
struct io_pump *io = &slots[i];
if (io->fd < 0)
continue;
if (!(io->pfd->revents & (POLLOUT|POLLIN|POLLHUP|POLLERR|POLLNVAL)))
continue;
if (io->type == POLLOUT) {
ssize_t len = xwrite(io->fd,
io->u.out.buf, io->u.out.len);
if (len < 0) {
io->error = errno;
close(io->fd);
io->fd = -1;
} else {
io->u.out.buf += len;
io->u.out.len -= len;
if (!io->u.out.len) {
close(io->fd);
io->fd = -1;
}
}
}
if (io->type == POLLIN) {
ssize_t len = strbuf_read_once(io->u.in.buf,
io->fd, io->u.in.hint);
if (len < 0)
io->error = errno;
if (len <= 0) {
close(io->fd);
io->fd = -1;
}
}
}
return 1;
}
static int pump_io(struct io_pump *slots, int nr)
{
struct pollfd *pfd;
int i;
for (i = 0; i < nr; i++)
slots[i].error = 0;
ALLOC_ARRAY(pfd, nr);
while (pump_io_round(slots, nr, pfd))
; /* nothing */
free(pfd);
/* There may be multiple errno values, so just pick the first. */
for (i = 0; i < nr; i++) {
if (slots[i].error) {
errno = slots[i].error;
return -1;
}
}
return 0;
}
int pipe_command(struct child_process *cmd,
const char *in, size_t in_len,
struct strbuf *out, size_t out_hint,
struct strbuf *err, size_t err_hint)
{
struct io_pump io[3];
int nr = 0;
if (in)
cmd->in = -1;
if (out)
cmd->out = -1;
if (err)
cmd->err = -1;
if (start_command(cmd) < 0)
return -1;
if (in) {
io[nr].fd = cmd->in;
io[nr].type = POLLOUT;
io[nr].u.out.buf = in;
io[nr].u.out.len = in_len;
nr++;
}
if (out) {
io[nr].fd = cmd->out;
io[nr].type = POLLIN;
io[nr].u.in.buf = out;
io[nr].u.in.hint = out_hint;
nr++;
}
if (err) {
io[nr].fd = cmd->err;
io[nr].type = POLLIN;
io[nr].u.in.buf = err;
io[nr].u.in.hint = err_hint;
nr++;
}
if (pump_io(io, nr) < 0) {
finish_command(cmd); /* throw away exit code */
return -1;
}
return finish_command(cmd);
}
enum child_state {
GIT_CP_FREE,
GIT_CP_WORKING,
GIT_CP_WAIT_CLEANUP,
};
struct parallel_processes {
void *data;
int max_processes;
int nr_processes;
get_next_task_fn get_next_task;
start_failure_fn start_failure;
task_finished_fn task_finished;
struct {
enum child_state state;
struct child_process process;
struct strbuf err;
void *data;
} *children;
/*
* The struct pollfd is logically part of *children,
* but the system call expects it as its own array.
*/
struct pollfd *pfd;
unsigned shutdown : 1;
int output_owner;
struct strbuf buffered_output; /* of finished children */
};
static int default_start_failure(struct strbuf *out,
void *pp_cb,
void *pp_task_cb)
{
return 0;
}
static int default_task_finished(int result,
struct strbuf *out,
void *pp_cb,
void *pp_task_cb)
{
return 0;
}
static void kill_children(struct parallel_processes *pp, int signo)
{
int i, n = pp->max_processes;
for (i = 0; i < n; i++)
if (pp->children[i].state == GIT_CP_WORKING)
kill(pp->children[i].process.pid, signo);
}
static struct parallel_processes *pp_for_signal;
static void handle_children_on_signal(int signo)
{
kill_children(pp_for_signal, signo);
sigchain_pop(signo);
raise(signo);
}
static void pp_init(struct parallel_processes *pp,
int n,
get_next_task_fn get_next_task,
start_failure_fn start_failure,
task_finished_fn task_finished,
void *data)
{
int i;
if (n < 1)
n = online_cpus();
pp->max_processes = n;
trace_printf("run_processes_parallel: preparing to run up to %d tasks", n);
pp->data = data;
if (!get_next_task)
die("BUG: you need to specify a get_next_task function");
pp->get_next_task = get_next_task;
pp->start_failure = start_failure ? start_failure : default_start_failure;
pp->task_finished = task_finished ? task_finished : default_task_finished;
pp->nr_processes = 0;
pp->output_owner = 0;
pp->shutdown = 0;
pp->children = xcalloc(n, sizeof(*pp->children));
pp->pfd = xcalloc(n, sizeof(*pp->pfd));
strbuf_init(&pp->buffered_output, 0);
for (i = 0; i < n; i++) {
strbuf_init(&pp->children[i].err, 0);
child_process_init(&pp->children[i].process);
pp->pfd[i].events = POLLIN | POLLHUP;
pp->pfd[i].fd = -1;
}
pp_for_signal = pp;
sigchain_push_common(handle_children_on_signal);
}
static void pp_cleanup(struct parallel_processes *pp)
{
int i;
trace_printf("run_processes_parallel: done");
for (i = 0; i < pp->max_processes; i++) {
strbuf_release(&pp->children[i].err);
child_process_clear(&pp->children[i].process);
}
free(pp->children);
free(pp->pfd);
/*
* When get_next_task added messages to the buffer in its last
* iteration, the buffered output is non empty.
*/
strbuf_write(&pp->buffered_output, stderr);
strbuf_release(&pp->buffered_output);
sigchain_pop_common();
}
/* returns
* 0 if a new task was started.
* 1 if no new jobs was started (get_next_task ran out of work, non critical
* problem with starting a new command)
* <0 no new job was started, user wishes to shutdown early. Use negative code
* to signal the children.
*/
static int pp_start_one(struct parallel_processes *pp)
{
int i, code;
for (i = 0; i < pp->max_processes; i++)
if (pp->children[i].state == GIT_CP_FREE)
break;
if (i == pp->max_processes)
die("BUG: bookkeeping is hard");
code = pp->get_next_task(&pp->children[i].process,
&pp->children[i].err,
pp->data,
&pp->children[i].data);
if (!code) {
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
strbuf_reset(&pp->children[i].err);
return 1;
}
pp->children[i].process.err = -1;
pp->children[i].process.stdout_to_stderr = 1;
pp->children[i].process.no_stdin = 1;
if (start_command(&pp->children[i].process)) {
code = pp->start_failure(&pp->children[i].err,
pp->data,
&pp->children[i].data);
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
strbuf_reset(&pp->children[i].err);
if (code)
pp->shutdown = 1;
return code;
}
pp->nr_processes++;
pp->children[i].state = GIT_CP_WORKING;
pp->pfd[i].fd = pp->children[i].process.err;
return 0;
}
static void pp_buffer_stderr(struct parallel_processes *pp, int output_timeout)
{
int i;
while ((i = poll(pp->pfd, pp->max_processes, output_timeout)) < 0) {
if (errno == EINTR)
continue;
pp_cleanup(pp);
die_errno("poll");
}
/* Buffer output from all pipes. */
for (i = 0; i < pp->max_processes; i++) {
if (pp->children[i].state == GIT_CP_WORKING &&
pp->pfd[i].revents & (POLLIN | POLLHUP)) {
int n = strbuf_read_once(&pp->children[i].err,
pp->children[i].process.err, 0);
if (n == 0) {
close(pp->children[i].process.err);
pp->children[i].state = GIT_CP_WAIT_CLEANUP;
} else if (n < 0)
if (errno != EAGAIN)
die_errno("read");
}
}
}
static void pp_output(struct parallel_processes *pp)
{
int i = pp->output_owner;
if (pp->children[i].state == GIT_CP_WORKING &&
pp->children[i].err.len) {
strbuf_write(&pp->children[i].err, stderr);
strbuf_reset(&pp->children[i].err);
}
}
static int pp_collect_finished(struct parallel_processes *pp)
{
int i, code;
int n = pp->max_processes;
int result = 0;
while (pp->nr_processes > 0) {
for (i = 0; i < pp->max_processes; i++)
if (pp->children[i].state == GIT_CP_WAIT_CLEANUP)
break;
if (i == pp->max_processes)
break;
code = finish_command(&pp->children[i].process);
code = pp->task_finished(code,
&pp->children[i].err, pp->data,
&pp->children[i].data);
if (code)
result = code;
if (code < 0)
break;
pp->nr_processes--;
pp->children[i].state = GIT_CP_FREE;
pp->pfd[i].fd = -1;
child_process_init(&pp->children[i].process);
if (i != pp->output_owner) {
strbuf_addbuf(&pp->buffered_output, &pp->children[i].err);
strbuf_reset(&pp->children[i].err);
} else {
strbuf_write(&pp->children[i].err, stderr);
strbuf_reset(&pp->children[i].err);
/* Output all other finished child processes */
strbuf_write(&pp->buffered_output, stderr);
strbuf_reset(&pp->buffered_output);
/*
* Pick next process to output live.
* NEEDSWORK:
* For now we pick it randomly by doing a round
* robin. Later we may want to pick the one with
* the most output or the longest or shortest
* running process time.
*/
for (i = 0; i < n; i++)
if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING)
break;
pp->output_owner = (pp->output_owner + i) % n;
}
}
return result;
}
int run_processes_parallel(int n,
get_next_task_fn get_next_task,
start_failure_fn start_failure,
task_finished_fn task_finished,
void *pp_cb)
{
int i, code;
int output_timeout = 100;
int spawn_cap = 4;
struct parallel_processes pp;
pp_init(&pp, n, get_next_task, start_failure, task_finished, pp_cb);
while (1) {
for (i = 0;
i < spawn_cap && !pp.shutdown &&
pp.nr_processes < pp.max_processes;
i++) {
code = pp_start_one(&pp);
if (!code)
continue;
if (code < 0) {
pp.shutdown = 1;
kill_children(&pp, -code);
}
break;
}
if (!pp.nr_processes)
break;
pp_buffer_stderr(&pp, output_timeout);
pp_output(&pp);
code = pp_collect_finished(&pp);
if (code) {
pp.shutdown = 1;
if (code < 0)
kill_children(&pp, -code);
}
}
pp_cleanup(&pp);
return 0;
}