X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2fe1e71c5cc17dbf06fd7b1e188fd0279c07d3ca..1de8e55b47ea46fe1e589fbfe1ff0ae77b9e2cbf:/crunch_scripts/run-command diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command index fc3134f651..c07debd787 100755 --- a/crunch_scripts/run-command +++ b/crunch_scripts/run-command @@ -156,6 +156,8 @@ def var_items(p, c, key): def expand_item(p, c): if isinstance(c, dict): if "foreach" in c and "command" in c: + # Expand a command template for each item in the specified user + # parameter var, items = var_items(p, c, "foreach") if var is None: raise EvaluationError("Must specify 'var' in foreach") @@ -166,6 +168,7 @@ def expand_item(p, c): r.append(expand_item(params, c["command"])) return r elif "list" in c and "index" in c and "command" in c: + # extract a single item from a list var, items = var_items(p, c, "list") if var is None: raise EvaluationError("Must specify 'var' in list") @@ -175,9 +178,13 @@ def expand_item(p, c): elif "regex" in c: pattern = re.compile(c["regex"]) if "filter" in c: + # filter list so that it only includes items that match a + # regular expression _, items = var_items(p, c, "filter") return [i for i in items if pattern.match(i)] elif "group" in c: + # generate a list of lists, where items are grouped on common + # subexpression match _, items = var_items(p, c, "group") groups = {} for i in items: @@ -186,6 +193,8 @@ def expand_item(p, c): add_to_group(groups, match) return [groups[k] for k in groups] elif "extract" in c: + # generate a list of lists, where items are split by + # subexpression match _, items = var_items(p, c, "extract") r = [] for i in items: @@ -194,6 +203,7 @@ def expand_item(p, c): r.append(list(match.groups())) return r elif "batch" in c and "size" in c: + # generate a list of lists, where items are split into a batch size _, items = var_items(p, c, "batch") sz = int(c["size"]) r = [] @@ -209,8 +219,8 @@ def expand_item(p, c): return expand_item(p, p[m.group(1)]) else: return subst.do_substitution(p, c) - - raise EvaluationError("expand_item() unexpected parameter type %s" % (type(c)) + else: + raise EvaluationError("expand_item() unexpected parameter type %s" % type(c)) # Evaluate in a list context # "p" is the parameter scope, "value" will be evaluated @@ -338,6 +348,8 @@ except Exception as e: logger.error(pprint.pformat(taskp)) sys.exit(1) +# rcode holds the return codes produced by each subprocess +rcode = {} try: subprocesses = [] close_streams = [] @@ -379,7 +391,6 @@ try: active = 1 pids = set([s.pid for s in subprocesses]) - rcode = {} while len(pids) > 0: (pid, status) = os.wait() pids.discard(pid) @@ -423,8 +434,8 @@ if "task.vwd" in taskp: else: outcollection = robust_put.upload(outdir, logger) -# Success if no non-zero return codes -success = not any([status != 0 for status in rcode.values()]) +# Success if we ran any subprocess, and they all exited 0. +success = rcode and all(status == 0 for status in rcode.itervalues()) api.job_tasks().update(uuid=arvados.current_task()['uuid'], body={ @@ -433,4 +444,4 @@ api.job_tasks().update(uuid=arvados.current_task()['uuid'], 'progress':1.0 }).execute() -sys.exit(rcode) +sys.exit(0 if success else 1)