X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1281ecab8f2396739ee9232c36796e46cd551426..refs/heads/9929-portable-manifest-text:/crunch_scripts/run-command diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command index ae2233e491..74793d4fce 100755 --- a/crunch_scripts/run-command +++ b/crunch_scripts/run-command @@ -331,6 +331,13 @@ try: if not args.dry_run: stdoutfile = open(stdoutname, "wb") + if "task.env" in taskp: + env = copy.copy(os.environ) + for k,v in taskp["task.env"].items(): + env[k] = subst.do_substitution(taskp, v) + else: + env = None + logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else "")) if args.dry_run: @@ -363,7 +370,7 @@ try: # this is an intermediate command in the pipeline, so its stdout should go to a pipe next_stdout = subprocess.PIPE - sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout) + sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout, env=env) # Need to close the FDs on our side so that subcommands will get SIGPIPE if the # consuming process ends prematurely. @@ -390,12 +397,19 @@ try: active = 1 pids = set([s.pid for s in subprocesses]) while len(pids) > 0: - (pid, status) = os.wait() - pids.discard(pid) - if not taskp.get("task.ignore_rcode"): - rcode[pid] = (status >> 8) + try: + (pid, status) = os.wait() + except OSError as e: + if e.errno == errno.EINTR: + pass + else: + raise else: - rcode[pid] = 0 + pids.discard(pid) + if not taskp.get("task.ignore_rcode"): + rcode[pid] = (status >> 8) + else: + rcode[pid] = 0 if sig.sig is not None: logger.critical("terminating on signal %s" % sig.sig) @@ -415,7 +429,7 @@ signal.signal(signal.SIGQUIT, signal.SIG_DFL) logger.info("the following output files will be saved to keep:") -subprocess.call(["find", ".", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir) +subprocess.call(["find", "-L", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir) logger.info("start writing output to keep") @@ -426,14 +440,14 @@ if "task.vwd" in taskp and "task.foreach" in jobp: if stat.S_ISLNK(s.st_mode): os.unlink(os.path.join(root, f)) -outcollection = vwd.checkin(outdir).manifest_text() +(outcollection, checkin_error) = vwd.checkin(outdir) # Success if we ran any subprocess, and they all exited 0. -success = rcode and all(status == 0 for status in rcode.itervalues()) +success = rcode and all(status == 0 for status in rcode.itervalues()) and not checkin_error api.job_tasks().update(uuid=arvados.current_task()['uuid'], body={ - 'output': outcollection, + 'output': outcollection.manifest_text(), 'success': success, 'progress':1.0 }).execute()