X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0812bc1c717e5fed57d420b177f6ca9d41e81032..19e02c2bbb7f9d8db25d0b670124e29a08145843:/crunch_scripts/run-command diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command index 28adb749cb..13ae918895 100755 --- a/crunch_scripts/run-command +++ b/crunch_scripts/run-command @@ -119,25 +119,35 @@ def add_to_group(gr, match): gr[m] = [] gr[m].append(match.group(0)) +class EvaluationError(Exception): + pass + # Return the name of variable ('var') that will take on each value in 'items' # when performing an inner substitution def var_items(p, c, key): + if key not in c: + raise EvaluationError("'%s' was expected in 'p' but is missing" % key) + if "var" in c: + if not isinstance(c["var"], basestring): + raise EvaluationError("Value of 'var' must be a string") # Var specifies the variable name for inner parameter substitution return (c["var"], get_items(p, c[key])) else: # The component function ('key') value is a list, so return the list - # directly with no parameter substition. + # directly with no parameter selected. if isinstance(c[key], list): return (None, get_items(p, c[key])) - - # check if c[key] is a string that looks like a parameter - m = re.match("^\$\((.*)\)$", c[key]) - if m and m.group(1) in p: - return (m.group(1), get_items(p, c[key])) + elif isinstance(c[key], basestring): + # check if c[key] is a string that looks like a parameter + m = re.match("^\$\((.*)\)$", c[key]) + if m and m.group(1) in p: + return (m.group(1), get_items(p, c[key])) + else: + # backwards compatible, foreach specifies bare parameter name to use + return (c[key], get_items(p, p[c[key]])) else: - # backwards compatible, foreach specifies bare parameter name to use - return (c[key], get_items(p, p[c[key]])) + raise EvaluationError("Value of '%s' must be a string or list" % key) # "p" is the parameter scope, "c" is the item to be expanded. # If "c" is a dict, apply function expansion. @@ -146,25 +156,36 @@ def var_items(p, c, key): def expand_item(p, c): if isinstance(c, dict): if "foreach" in c and "command" in c: + # Expand a command template for each item in the specified user + # parameter var, items = var_items(p, c, "foreach") + if var is None: + raise EvaluationError("Must specify 'var' in foreach") r = [] for i in items: params = copy.copy(p) params[var] = i r.append(expand_item(params, c["command"])) return r - if "list" in c and "index" in c and "command" in c: + elif "list" in c and "index" in c and "command" in c: + # extract a single item from a list var, items = var_items(p, c, "list") + if var is None: + raise EvaluationError("Must specify 'var' in list") params = copy.copy(p) params[var] = items[int(c["index"])] return expand_item(params, c["command"]) - if "regex" in c: + elif "regex" in c: pattern = re.compile(c["regex"]) if "filter" in c: - var, items = var_items(p, c, "filter") + # filter list so that it only includes items that match a + # regular expression + _, items = var_items(p, c, "filter") return [i for i in items if pattern.match(i)] elif "group" in c: - var, items = var_items(p, c, "group") + # generate a list of lists, where items are grouped on common + # subexpression match + _, items = var_items(p, c, "group") groups = {} for i in items: match = pattern.match(i) @@ -172,20 +193,24 @@ def expand_item(p, c): add_to_group(groups, match) return [groups[k] for k in groups] elif "extract" in c: - var, items = var_items(p, c, "extract") + # generate a list of lists, where items are split by + # subexpression match + _, items = var_items(p, c, "extract") r = [] for i in items: match = pattern.match(i) if match: r.append(list(match.groups())) return r - if "batch" in c and "size" in c: - var, items = var_items(p, c, "batch") + elif "batch" in c and "size" in c: + # generate a list of lists, where items are split into a batch size + _, items = var_items(p, c, "batch") sz = int(c["size"]) r = [] for j in xrange(0, len(items), sz): r.append(items[j:j+sz]) return r + raise EvaluationError("Missing valid list context function") elif isinstance(c, list): return [expand_item(p, arg) for arg in c] elif isinstance(c, basestring): @@ -194,8 +219,8 @@ def expand_item(p, c): return expand_item(p, p[m.group(1)]) else: return subst.do_substitution(p, c) - - raise Exception("expand_item() unexpected parameter type %s" % (type(c)) + else: + raise EvaluationError("expand_item() unexpected parameter type %s" % type(c)) # Evaluate in a list context # "p" is the parameter scope, "value" will be evaluated @@ -216,7 +241,7 @@ def get_items(p, value): with open(value) as f: items = [line.rstrip("\r\n") for line in f] return items - raise Exception("get_items did not yield a list") + raise EvaluationError("get_items did not yield a list") stdoutname = None stdoutfile = None @@ -323,6 +348,8 @@ except Exception as e: logger.error(pprint.pformat(taskp)) sys.exit(1) +# rcode holds the return codes produced by each subprocess +rcode = {} try: subprocesses = [] close_streams = [] @@ -364,7 +391,6 @@ try: active = 1 pids = set([s.pid for s in subprocesses]) - rcode = {} while len(pids) > 0: (pid, status) = os.wait() pids.discard(pid) @@ -409,7 +435,7 @@ else: outcollection = robust_put.upload(outdir, logger) # Success if no non-zero return codes -success = not any([status != 0 for status in rcode.values()]) +success = any(rcode) and not any([status != 0 for status in rcode.values()]) api.job_tasks().update(uuid=arvados.current_task()['uuid'], body={