From c79e86aff4cb20413cf0f09c52fe5066ca197deb Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 15 Oct 2014 11:10:27 -0400 Subject: [PATCH] 3609: Add support for batch size, improve ability to pass lists of lists without them getting flattened. --- crunch_scripts/run-command | 37 ++++++++++++++++++++---------- sdk/python/arvados/commands/run.py | 24 +++++++++---------- 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command index fb4f3c7c30..fbfd511df0 100755 --- a/crunch_scripts/run-command +++ b/crunch_scripts/run-command @@ -103,7 +103,7 @@ def add_to_group(gr, match): gr[m] = [] gr[m].append(match.group(0)) -def expand_item(p, c): +def expand_item(p, c, flatten=True): if isinstance(c, dict): if "foreach" in c and "command" in c: var = c["foreach"] @@ -112,7 +112,7 @@ def expand_item(p, c): for i in items: params = copy.copy(p) params[var] = i - r.extend(expand_list(params, c["command"])) + r.extend(expand_item(params, c["command"])) return r if "list" in c and "index" in c and "command" in c: var = c["list"] @@ -144,22 +144,27 @@ def expand_item(p, c): elif isinstance(c, list): return expand_list(p, c) elif isinstance(c, basestring): - return [subst.do_substitution(p, c)] + if flatten: + return [subst.do_substitution(p, c)] + else: + return subst.do_substitution(p, c) return [] -def expand_list(p, l): +def expand_list(p, l, flatten=True): if isinstance(l, basestring): return expand_item(p, l) + elif flatten: + return [exp for arg in l for exp in expand_item(p, arg, flatten)] else: - return [exp for arg in l for exp in expand_item(p, arg)] + return [expand_item(p, arg, flatten) for arg in l] -def get_items(p, value): +def get_items(p, value, flatten=True): if isinstance(value, dict): return expand_item(p, value) if isinstance(value, list): - return expand_list(p, value) + return expand_list(p, value, flatten) fn = subst.do_substitution(p, value) mode = os.stat(fn).st_mode @@ -182,7 +187,7 @@ stdinfile = None def recursive_foreach(params, fvars): var = fvars[0] fvars = fvars[1:] - items = get_items(params, params[var]) + items = get_items(params, params[var], False) logger.info("parallelizing on %s with items %s" % (var, items)) if items is not None: for i in items: @@ -199,7 +204,10 @@ def recursive_foreach(params, fvars): 'parameters': params }).execute() else: - logger.info(expand_list(params, params["command"])) + if isinstance(params["command"][0], list): + logger.info(expand_list(params, params["command"], False)) + else: + logger.info(expand_list(params, params["command"], True)) else: logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var])) sys.exit(1) @@ -227,7 +235,13 @@ try: else: # This is the only task so taskp/jobp are the same taskp = jobp +except Exception as e: + logger.exception("caught exception") + logger.error("job parameters were:") + logger.error(pprint.pformat(jobp)) + sys.exit(1) +try: if not args.dry_run: if "task.vwd" in taskp: # Populate output directory with symlinks to files in collection @@ -238,10 +252,9 @@ try: cmd = [] if isinstance(taskp["command"][0], list): - for c in taskp["command"]: - cmd.append(expand_list(taskp, c)) + cmd.append(expand_list(taskp, taskp["command"], False)) else: - cmd.append(expand_list(taskp, taskp["command"])) + cmd.append(expand_list(taskp, taskp["command"], True)) if "task.stdin" in taskp: stdinname = subst.do_substitution(taskp, taskp["task.stdin"]) diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py index 9301e3cb46..e118a9e41a 100644 --- a/sdk/python/arvados/commands/run.py +++ b/sdk/python/arvados/commands/run.py @@ -18,9 +18,9 @@ arvrun_parser = argparse.ArgumentParser() arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit") arvrun_parser.add_argument('--local', action="store_true", help="Run locally using arv-crunch-job") arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs", help="Docker image to use, default arvados/jobs") -arvrun_parser.add_argument('--git-dir', type=str, default="", help="Git directory to use to find run-command when using --local") -arvrun_parser.add_argument('--repository', type=str, default="arvados", help="repository field of pipeline submission, default 'arvados'") -arvrun_parser.add_argument('--script-version', type=str, default="master", help="script_version field of pipeline submission, default 'master'") +arvrun_parser.add_argument('--git-dir', type=str, default="", help="Git repository passed to arv-crunch-job when using --local") +arvrun_parser.add_argument('--repository', type=str, default="arvados", help="repository field of component, default 'arvados'") +arvrun_parser.add_argument('--script-version', type=str, default="master", help="script_version field of component, default 'master'") arvrun_parser.add_argument('args', nargs=argparse.REMAINDER) class ArvFile(object): @@ -156,23 +156,23 @@ def main(arguments=None): task_foreach = [] group_parser = argparse.ArgumentParser() - group_parser.add_argument('--group', type=str) + group_parser.add_argument('--batch-size', type=int) group_parser.add_argument('args', nargs=argparse.REMAINDER) for s in xrange(2, len(slots)): for i in xrange(0, len(slots[s])): if slots[s][i] == '--': - inp = "input%i" % s + inp = "input%i" % (s-2) groupargs = group_parser.parse_args(slots[2][i+1:]) - component["script_parameters"][inp] = groupargs.args - if groupargs.group: - inpgroups = inp+"_groups" - component["script_parameters"][inpgroups] = {"group":inp, "regex":groupargs.group} - slots[s] = slots[s][0:i] + [{"foreach": inpgroups, "command": "$(%s)" % inpgroups}] - task_foreach.append(inpgroups) + if groupargs.batch_size: + component["script_parameters"][inp] = [] + for j in xrange(0, len(groupargs.args), groupargs.batch_size): + component["script_parameters"][inp].append(groupargs.args[j:j+groupargs.batch_size]) + slots[s] = slots[s][0:i] + [{"foreach": inp, "command": "$(%s)" % inp}] else: + component["script_parameters"][inp] = groupargs.args slots[s] = slots[s][0:i] + ["$(%s)" % inp] - task_foreach.append(inp) + task_foreach.append(inp) break if slots[s][i] == '\--': slots[s][i] = '--' -- 2.30.2