From 4f4ad25bf60751a09e316dca8c29cf3628ad7bdc Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 16 Sep 2014 22:47:44 -0400 Subject: [PATCH] 3609: Fix trimming redirect parts of the command line. 3609: Collect command line arguments, uploads files, builds arv-run submission. Needs work on event listener. 3609: Now print log messages for submitted pipeline. 3609: Fix trimming redirect parts of the command line. --- sdk/python/arvados/commands/run.py | 161 +++++++++++++++++++++++++++++ sdk/python/bin/arv-run | 4 + 2 files changed, 165 insertions(+) create mode 100644 sdk/python/arvados/commands/run.py create mode 100755 sdk/python/bin/arv-run diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py new file mode 100644 index 0000000000..e27b3df79c --- /dev/null +++ b/sdk/python/arvados/commands/run.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python + +import arvados +import argparse +import json +import re +import os +import stat +import put +import arvados.events +import time + +arvrun_parser = argparse.ArgumentParser() +arvrun_parser.add_argument('--dry-run', action="store_true") +arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs") +arvrun_parser.add_argument('command') +arvrun_parser.add_argument('args', nargs=argparse.REMAINDER) + +needupload_files = [] + +class ArvFile(object): + def __init__(self, prefix, fn): + self.prefix = prefix + self.fn = fn + +def statfile(prefix, fn, pattern): + absfn = os.path.abspath(fn) + if os.path.exists(absfn): + fn = os.path.abspath(fn) + st = os.stat(fn) + if stat.S_ISREG(st.st_mode): + mount = os.path.dirname(fn)+"/.arvados#collection" + if os.path.exists(mount): + with file(mount, 'r') as f: + c = json.load(f) + return prefix+"$(file "+c["portable_data_hash"]+"/" + os.path.basename(fn) + ")" + else: + needupload_files.append(fn) + return ArvFile(prefix, fn[1:]) + return prefix+fn + +def main(arguments=None): + args = arvrun_parser.parse_args(arguments) + + patterns = [re.compile("(--[^=]+=)(.*)"), + re.compile("(-[^=]+=)(.*)"), + re.compile("(-.)(.+)")] + + commandargs = [] + + for a in args.args: + if a[0] == '-': + matched = False + for p in patterns: + m = p.match(a) + if m: + commandargs.append(statfile(m.group(1), m.group(2), p)) + matched = True + break + if not matched: + commandargs.append(a) + else: + commandargs.append(statfile('', a, None)) + + n = True + pathprefix = "/" + files = [c for c in commandargs if isinstance(c, ArvFile)] + if len(files) > 0: + while n: + pathstep = None + for c in files: + if pathstep is None: + sp = c.fn.split('/') + if len(sp) < 2: + n = False + break + pathstep = sp[0] + "/" + else: + if not c.fn.startswith(pathstep): + n = False + break + if n: + pathprefix += pathstep + for c in files: + c.fn = c.fn[len(pathstep):] + + os.chdir(pathprefix) + + if args.dry_run: + print("cd %s" % pathprefix) + print("arv-put \"%s\"" % '" "'.join([c.fn for c in files])) + pdh = "$(input)" + else: + pdh = put.main(["--portable-data-hash"]+[c.fn for c in files]) + + commandargs = [("%s$(file %s/%s)" % (c.prefix, pdh, c.fn)) if isinstance(c, ArvFile) else c for c in commandargs] + + cut = None + i = -1 + stdio = [None, None] + for j in xrange(0, len(commandargs)): + c = commandargs[j] + if c == '<': + stdio[0] = [] + i = 0 + cut = j if cut is None else cut + elif c == '>': + stdio[1] = [] + i = 1 + cut = j if cut is None else cut + elif i > -1: + stdio[i].append(c) + + if cut is not None: + commandargs = commandargs[:cut] + + component = { + "script": "run-command", + "script_version": "bf243e064a7a2ee4e69a87dc3ba46e949a545150", + "repository": "arvados", + "script_parameters": { + "command": [args.command]+commandargs + }, + "runtime_constraints": { + "docker_image": args.docker_image + } + } + + if stdio[0]: + component["script_parameters"]["task.stdin"] = stdio[0][0] + if stdio[1]: + component["script_parameters"]["task.stdout"] = stdio[1][0] + + pipeline = { + "name": "", + "components": { + args.command: component + }, + "state":"RunningOnServer" + } + + if args.dry_run: + print(json.dumps(pipeline, indent=4)) + else: + api = arvados.api('v1') + pi = api.pipeline_instances().create(body=pipeline).execute() + ws = None + def report(x): + if "event_type" in x: + print "\n" + print x + if x["event_type"] == "stderr": + print x["properties"]["text"] + elif x["event_type"] == "update" and x["properties"]["new_attributes"]["state"] in ["Complete", "Failed"]: + ws.close_connection() + + ws = arvados.events.subscribe(api, [["object_uuid", "=", pi["uuid"]], ["event_type", "in", ["stderr", "update"]]], report) + ws.run_forever() + +if __name__ == '__main__': + main() diff --git a/sdk/python/bin/arv-run b/sdk/python/bin/arv-run new file mode 100755 index 0000000000..41f5fd3918 --- /dev/null +++ b/sdk/python/bin/arv-run @@ -0,0 +1,4 @@ +#!/usr/bin/env python + +from arvados.commands.run import main +main() -- 2.39.5