From 0812bc1c717e5fed57d420b177f6ca9d41e81032 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 31 Oct 2014 15:26:46 -0400 Subject: [PATCH] 3609: Inherit --retries from _util. Be more specific about error being caught. Add comments. --- crunch_scripts/run-command | 23 +++++++++-- sdk/python/arvados/commands/run.py | 65 +++++++++++++++++++++--------- 2 files changed, 66 insertions(+), 22 deletions(-) diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command index 6c27a94572..28adb749cb 100755 --- a/crunch_scripts/run-command +++ b/crunch_scripts/run-command @@ -119,13 +119,19 @@ def add_to_group(gr, match): gr[m] = [] gr[m].append(match.group(0)) +# Return the name of variable ('var') that will take on each value in 'items' +# when performing an inner substitution def var_items(p, c, key): if "var" in c: - # Var specifies + # Var specifies the variable name for inner parameter substitution return (c["var"], get_items(p, c[key])) else: + # The component function ('key') value is a list, so return the list + # directly with no parameter substition. if isinstance(c[key], list): return (None, get_items(p, c[key])) + + # check if c[key] is a string that looks like a parameter m = re.match("^\$\((.*)\)$", c[key]) if m and m.group(1) in p: return (m.group(1), get_items(p, c[key])) @@ -133,6 +139,10 @@ def var_items(p, c, key): # backwards compatible, foreach specifies bare parameter name to use return (c[key], get_items(p, p[c[key]])) +# "p" is the parameter scope, "c" is the item to be expanded. +# If "c" is a dict, apply function expansion. +# If "c" is a list, recursively expand each item and return a new list. +# If "c" is a string, apply parameter substitution def expand_item(p, c): if isinstance(c, dict): if "foreach" in c and "command" in c: @@ -185,8 +195,13 @@ def expand_item(p, c): else: return subst.do_substitution(p, c) - return [] + raise Exception("expand_item() unexpected parameter type %s" % (type(c)) +# Evaluate in a list context +# "p" is the parameter scope, "value" will be evaluated +# if "value" is a list after expansion, return that +# if "value" is a path to a directory, return a list consisting of each entry in the directory +# if "value" is a path to a file, return a list consisting of each line of the file def get_items(p, value): value = expand_item(p, value) if isinstance(value, list): @@ -208,6 +223,7 @@ stdoutfile = None stdinname = None stdinfile = None +# Construct the cross product of all values of each variable listed in fvars def recursive_foreach(params, fvars): var = fvars[0] fvars = fvars[1:] @@ -392,7 +408,8 @@ if "task.vwd" in taskp: else: outcollection = robust_put.upload(outdir, logger) -success = reduce(lambda x, y: x & (y == 0), [True]+rcode.values()) +# Success if no non-zero return codes +success = not any([status != 0 for status in rcode.values()]) api.job_tasks().update(uuid=arvados.current_task()['uuid'], body={ diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py index 962d6a806c..411b9977fd 100644 --- a/sdk/python/arvados/commands/run.py +++ b/sdk/python/arvados/commands/run.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import arvados +import arvados.commands.ws as ws import argparse import json import re @@ -8,13 +9,13 @@ import os import stat import put import time -import arvados.commands.ws as ws import subprocess import logging +import arvados.commands._util as arv_cmd logger = logging.getLogger('arvados.arv-run') -arvrun_parser = argparse.ArgumentParser() +arvrun_parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt]) arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit") arvrun_parser.add_argument('--local', action="store_true", help="Run locally using arv-run-pipeline-instance") arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs", help="Docker image to use, default arvados/jobs") @@ -35,6 +36,9 @@ class ArvFile(object): class UploadFile(ArvFile): pass +# Determine if a file is in a collection, and return a tuple consisting of the +# portable data hash and the path relative to the root of the collection. +# Return None if the path isn't with an arv-mount collection or there was is error. def is_in_collection(root, branch): try: if root == "/": @@ -47,9 +51,12 @@ def is_in_collection(root, branch): else: sp = os.path.split(root) return is_in_collection(sp[0], os.path.join(sp[1], branch)) - except: + except IOError, OSError: return (None, None) +# Determine the project to place the output of this command by searching upward +# for arv-mount psuedofile indicating the project. If the cwd isn't within +# an arv-mount project or there is an error, return current_user. def determine_project(root, current_user): try: if root == "/": @@ -65,9 +72,14 @@ def determine_project(root, current_user): else: sp = os.path.split(root) return determine_project(sp[0], current_user) - except: + except IOError, OSError: return current_user +# Determine if string corresponds to a file, and if that file is part of a +# arv-mounted collection or only local to the machine. Returns one of +# ArvFile() (file already exists in a collection), UploadFile() (file needs to +# be uploaded to a collection), or simply returns prefix+fn (which yields the +# original parameter string). def statfile(prefix, fn): absfn = os.path.abspath(fn) if os.path.exists(absfn): @@ -93,6 +105,17 @@ def main(arguments=None): reading_into = 2 + # Parse the command arguments into 'slots'. + # All words following '>' are output arguments and are collected into slots[0]. + # All words following '<' are input arguments and are collected into slots[1]. + # slots[2..] store the parameters of each command in the pipeline. + # + # e.g. arv-run foo arg1 arg2 '|' bar arg3 arg4 '<' input1 input2 input3 '>' output.txt + # will be parsed into: + # [['output.txt'], + # ['input1', 'input2', 'input3'], + # ['foo', 'arg1', 'arg2'], + # ['bar', 'arg3', 'arg4']] slots = [[], [], []] for c in args.args: if c.startswith('>'): @@ -120,9 +143,11 @@ def main(arguments=None): else: project = determine_project(os.getcwd(), api.users().current().execute()["uuid"]) + # Identify input files. Look at each parameter and test to see if there is + # a file by that name. This uses 'patterns' to look for within + # command line arguments, such as --foo=file.txt or -lfile.txt patterns = [re.compile("([^=]+=)(.*)"), re.compile("(-[A-Za-z])(.+)")] - for j, command in enumerate(slots[1:]): for i, a in enumerate(command): if j > 0 and i == 0: @@ -133,17 +158,19 @@ def main(arguments=None): # if it starts with a \ then don't do any interpretation command[i] = a[1:] else: - # Do some pattern matching - matched = False - for p in patterns: - m = p.match(a) - if m: - command[i] = statfile(m.group(1), m.group(2)) - matched = True - break - if not matched: - # parameter might be a file, so test it - command[i] = statfile('', a) + # See if it looks like a file + command[i] = statfile('', a) + + # If a file named command[i] was found, it would now be an + # ArvFile or UploadFile. If command[i] is a basestring, that + # means it doesn't correspond exactly to a file, so do some + # pattern matching. + if isinstance(command[i], basestring): + for p in patterns: + m = p.match(a) + if m: + command[i] = statfile(m.group(1), m.group(2)) + break n = True pathprefix = "/" @@ -185,7 +212,7 @@ def main(arguments=None): pdh = "$(input)" else: files = sorted(files, key=lambda x: x.fn) - collection = arvados.CollectionWriter(api, num_retries=3) + collection = arvados.CollectionWriter(api, num_retries=args.retries) stream = None for f in files: sp = os.path.split(f.fn) @@ -242,7 +269,7 @@ def main(arguments=None): if slots[1]: task_foreach.append("stdin") component["script_parameters"]["stdin"] = slots[1] - component["script_parameters"]["task.stdin"] = "$(stdin)"\ + component["script_parameters"]["task.stdin"] = "$(stdin)" if task_foreach: component["script_parameters"]["task.foreach"] = task_foreach @@ -264,7 +291,7 @@ def main(arguments=None): print(json.dumps(pipeline, indent=4)) else: pipeline["owner_uuid"] = project - pi = api.pipeline_instances().create(body=pipeline).execute() + pi = api.pipeline_instances().create(body=pipeline, ensure_unique_name=True).execute() print "Running pipeline %s" % pi["uuid"] if args.local: -- 2.30.2