X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0edcc26fa0f04c707f0b6fd3694c3dae7572d8f7..471dd5bfde6b7218b72bedff6d92c6150af059e6:/sdk/python/arvados/commands/run.py diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py index 962d6a806c..f2bf0f353b 100644 --- a/sdk/python/arvados/commands/run.py +++ b/sdk/python/arvados/commands/run.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import arvados +import arvados.commands.ws as ws import argparse import json import re @@ -8,13 +9,13 @@ import os import stat import put import time -import arvados.commands.ws as ws import subprocess import logging +import arvados.commands._util as arv_cmd logger = logging.getLogger('arvados.arv-run') -arvrun_parser = argparse.ArgumentParser() +arvrun_parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt]) arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit") arvrun_parser.add_argument('--local', action="store_true", help="Run locally using arv-run-pipeline-instance") arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs", help="Docker image to use, default arvados/jobs") @@ -35,6 +36,9 @@ class ArvFile(object): class UploadFile(ArvFile): pass +# Determine if a file is in a collection, and return a tuple consisting of the +# portable data hash and the path relative to the root of the collection. +# Return None if the path isn't with an arv-mount collection or there was is error. def is_in_collection(root, branch): try: if root == "/": @@ -47,9 +51,12 @@ def is_in_collection(root, branch): else: sp = os.path.split(root) return is_in_collection(sp[0], os.path.join(sp[1], branch)) - except: + except IOError, OSError: return (None, None) +# Determine the project to place the output of this command by searching upward +# for arv-mount psuedofile indicating the project. If the cwd isn't within +# an arv-mount project or there is an error, return current_user. def determine_project(root, current_user): try: if root == "/": @@ -65,9 +72,14 @@ def determine_project(root, current_user): else: sp = os.path.split(root) return determine_project(sp[0], current_user) - except: + except IOError, OSError: return current_user +# Determine if string corresponds to a file, and if that file is part of a +# arv-mounted collection or only local to the machine. Returns one of +# ArvFile() (file already exists in a collection), UploadFile() (file needs to +# be uploaded to a collection), or simply returns prefix+fn (which yields the +# original parameter string). def statfile(prefix, fn): absfn = os.path.abspath(fn) if os.path.exists(absfn): @@ -80,6 +92,12 @@ def statfile(prefix, fn): else: # trim leading '/' for path prefix test later return UploadFile(prefix, absfn[1:]) + if stat.S_ISDIR(st.st_mode): + sp = os.path.split(absfn) + (pdh, branch) = is_in_collection(sp[0], sp[1]) + if pdh: + return ArvFile(prefix, "$(dir %s/%s/)" % (pdh, branch)) + return prefix+fn def main(arguments=None): @@ -93,6 +111,17 @@ def main(arguments=None): reading_into = 2 + # Parse the command arguments into 'slots'. + # All words following '>' are output arguments and are collected into slots[0]. + # All words following '<' are input arguments and are collected into slots[1]. + # slots[2..] store the parameters of each command in the pipeline. + # + # e.g. arv-run foo arg1 arg2 '|' bar arg3 arg4 '<' input1 input2 input3 '>' output.txt + # will be parsed into: + # [['output.txt'], + # ['input1', 'input2', 'input3'], + # ['foo', 'arg1', 'arg2'], + # ['bar', 'arg3', 'arg4']] slots = [[], [], []] for c in args.args: if c.startswith('>'): @@ -120,9 +149,11 @@ def main(arguments=None): else: project = determine_project(os.getcwd(), api.users().current().execute()["uuid"]) + # Identify input files. Look at each parameter and test to see if there is + # a file by that name. This uses 'patterns' to look for within + # command line arguments, such as --foo=file.txt or -lfile.txt patterns = [re.compile("([^=]+=)(.*)"), re.compile("(-[A-Za-z])(.+)")] - for j, command in enumerate(slots[1:]): for i, a in enumerate(command): if j > 0 and i == 0: @@ -133,17 +164,19 @@ def main(arguments=None): # if it starts with a \ then don't do any interpretation command[i] = a[1:] else: - # Do some pattern matching - matched = False - for p in patterns: - m = p.match(a) - if m: - command[i] = statfile(m.group(1), m.group(2)) - matched = True - break - if not matched: - # parameter might be a file, so test it - command[i] = statfile('', a) + # See if it looks like a file + command[i] = statfile('', a) + + # If a file named command[i] was found, it would now be an + # ArvFile or UploadFile. If command[i] is a basestring, that + # means it doesn't correspond exactly to a file, so do some + # pattern matching. + if isinstance(command[i], basestring): + for p in patterns: + m = p.match(a) + if m: + command[i] = statfile(m.group(1), m.group(2)) + break n = True pathprefix = "/" @@ -181,11 +214,11 @@ def main(arguments=None): print("Upload local files: \"%s\"" % '" "'.join([c.fn for c in files])) if args.dry_run: - print("cd %s" % pathprefix) + print("$(input) is %s" % pathprefix.rstrip('/')) pdh = "$(input)" else: files = sorted(files, key=lambda x: x.fn) - collection = arvados.CollectionWriter(api, num_retries=3) + collection = arvados.CollectionWriter(api, num_retries=args.retries) stream = None for f in files: sp = os.path.split(f.fn) @@ -242,7 +275,7 @@ def main(arguments=None): if slots[1]: task_foreach.append("stdin") component["script_parameters"]["stdin"] = slots[1] - component["script_parameters"]["task.stdin"] = "$(stdin)"\ + component["script_parameters"]["task.stdin"] = "$(stdin)" if task_foreach: component["script_parameters"]["task.foreach"] = task_foreach @@ -264,7 +297,7 @@ def main(arguments=None): print(json.dumps(pipeline, indent=4)) else: pipeline["owner_uuid"] = project - pi = api.pipeline_instances().create(body=pipeline).execute() + pi = api.pipeline_instances().create(body=pipeline, ensure_unique_name=True).execute() print "Running pipeline %s" % pi["uuid"] if args.local: