#!/usr/bin/env python import logging logging.basicConfig(level=logging.INFO, format="run-command: %(message)s") import arvados import re import os import subprocess import sys import shutil import crunchutil.subst as subst import time import arvados.commands.put as put import signal import stat import copy import traceback import pprint import multiprocessing import crunchutil.robust_put as robust_put import crunchutil.vwd as vwd os.umask(0077) t = arvados.current_task().tmpdir api = arvados.api('v1') os.chdir(arvados.current_task().tmpdir) os.mkdir("tmpdir") os.mkdir("output") os.chdir("output") outdir = os.getcwd() taskp = None jobp = arvados.current_job()['script_parameters'] if len(arvados.current_task()['parameters']) > 0: taskp = arvados.current_task()['parameters'] links = [] def sub_tmpdir(v): return os.path.join(arvados.current_task().tmpdir, 'tmpdir') def sub_outdir(v): return outdir def sub_cores(v): return str(multiprocessing.cpu_count()) def sub_jobid(v): return os.environ['JOB_UUID'] def sub_taskid(v): return os.environ['TASK_UUID'] def sub_jobsrc(v): return os.environ['CRUNCH_SRC'] subst.default_subs["task.tmpdir"] = sub_tmpdir subst.default_subs["task.outdir"] = sub_outdir subst.default_subs["job.srcdir"] = sub_jobsrc subst.default_subs["node.cores"] = sub_cores subst.default_subs["job.uuid"] = sub_jobid subst.default_subs["task.uuid"] = sub_taskid class SigHandler(object): def __init__(self): self.sig = None def send_signal(self, sp, signum): sp.send_signal(signum) self.sig = signum def expand_item(p, c): if isinstance(c, dict): if "foreach" in c and "command" in c: var = c["foreach"] items = get_items(p, p[var]) r = [] for i in items: params = copy.copy(p) params[var] = i r.extend(expand_list(params, c["command"])) return r elif isinstance(c, list): return expand_list(p, c) elif isinstance(c, str) or isinstance(c, unicode): return [subst.do_substitution(p, c)] return [] def expand_list(p, l): return [exp for arg in l for exp in expand_item(p, arg)] def get_items(p, value): if isinstance(value, list): return expand_list(p, value) fn = subst.do_substitution(p, value) mode = os.stat(fn).st_mode prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:] if mode is not None: if stat.S_ISDIR(mode): items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)] elif stat.S_ISREG(mode): with open(fn) as f: items = [line for line in f] return items else: return None stdoutname = None stdoutfile = None stdinname = None stdinfile = None rcode = 1 try: if "task.foreach" in jobp: if arvados.current_task()['sequence'] == 0: var = jobp["task.foreach"] items = get_items(jobp, jobp[var]) logging.info("parallelizing on %s with items %s" % (var, items)) if items is not None: for i in items: params = copy.copy(jobp) params[var] = i arvados.api().job_tasks().create(body={ 'job_uuid': arvados.current_job()['uuid'], 'created_by_job_task_uuid': arvados.current_task()['uuid'], 'sequence': 1, 'parameters': params } ).execute() if "task.vwd" in jobp: # Base vwd collection will be merged with output fragments from # the other tasks by crunch. arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"])) else: arvados.current_task().set_output(None) sys.exit(0) else: sys.exit(1) else: taskp = jobp if "task.vwd" in taskp: # Populate output directory with symlinks to files in collection vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir) if "task.cwd" in taskp: os.chdir(subst.do_substitution(taskp, taskp["task.cwd"])) cmd = expand_list(taskp, taskp["command"]) if "task.stdin" in taskp: stdinname = subst.do_substitution(taskp, taskp["task.stdin"]) stdinfile = open(stdinname, "rb") if "task.stdout" in taskp: stdoutname = subst.do_substitution(taskp, taskp["task.stdout"]) stdoutfile = open(stdoutname, "wb") logging.info("{}{}{}".format(' '.join(cmd), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else "")) except subst.SubstitutionError as e: logging.error(str(e)) logging.error("task parameters was:") logging.error(pprint.pformat(taskp)) sys.exit(1) except Exception as e: logging.exception("caught exception") logging.error("task parameters was:") logging.error(pprint.pformat(taskp)) sys.exit(1) try: sp = subprocess.Popen(cmd, shell=False, stdin=stdinfile, stdout=stdoutfile) sig = SigHandler() # forward signals to the process. signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum)) signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum)) signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum)) # wait for process to complete. rcode = sp.wait() if sig.sig is not None: logging.critical("terminating on signal %s" % sig.sig) sys.exit(2) else: logging.info("completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed")) except Exception as e: logging.exception("caught exception") # restore default signal handlers. signal.signal(signal.SIGINT, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGQUIT, signal.SIG_DFL) for l in links: os.unlink(l) logging.info("the following output files will be saved to keep:") subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr) logging.info("start writing output to keep") if "task.vwd" in taskp: if "task.foreach" in jobp: # This is a subtask, so don't merge with the original collection, that will happen at the end outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text() else: # Just a single task, so do merge with the original collection outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text() else: outcollection = robust_put.upload(outdir) api.job_tasks().update(uuid=arvados.current_task()['uuid'], body={ 'output': outcollection, 'success': (rcode == 0), 'progress':1.0 }).execute() sys.exit(rcode)