+ return subst.do_substitution(p, c)
+ else:
+ raise EvaluationError("expand_item() unexpected parameter type %s" % type(c))
+
+# Evaluate in a list context
+# "p" is the parameter scope, "value" will be evaluated
+# if "value" is a list after expansion, return that
+# if "value" is a path to a directory, return a list consisting of each entry in the directory
+# if "value" is a path to a file, return a list consisting of each line of the file
+def get_items(p, value):
+ value = expand_item(p, value)
+ if isinstance(value, list):
+ return value
+ elif isinstance(value, basestring):
+ mode = os.stat(value).st_mode
+ prefix = value[len(os.environ['TASK_KEEPMOUNT'])+1:]
+ if mode is not None:
+ if stat.S_ISDIR(mode):
+ items = [os.path.join(value, l) for l in os.listdir(value)]
+ elif stat.S_ISREG(mode):
+ with open(value) as f:
+ items = [line.rstrip("\r\n") for line in f]
+ return items
+ raise EvaluationError("get_items did not yield a list")
+
+stdoutname = None
+stdoutfile = None
+stdinname = None
+stdinfile = None
+
+# Construct the cross product of all values of each variable listed in fvars
+def recursive_foreach(params, fvars):
+ var = fvars[0]
+ fvars = fvars[1:]
+ items = get_items(params, params[var])
+ logger.info("parallelizing on %s with items %s" % (var, items))
+ if items is not None:
+ for i in items:
+ params = copy.copy(params)
+ params[var] = i
+ if len(fvars) > 0:
+ recursive_foreach(params, fvars)
+ else:
+ if not args.dry_run:
+ arvados.api().job_tasks().create(body={
+ 'job_uuid': arvados.current_job()['uuid'],
+ 'created_by_job_task_uuid': arvados.current_task()['uuid'],
+ 'sequence': 1,
+ 'parameters': params
+ }).execute()
+ else:
+ if isinstance(params["command"][0], list):
+ for c in params["command"]:
+ logger.info(flatten(expand_item(params, c)))
+ else:
+ logger.info(flatten(expand_item(params, params["command"])))
+ else:
+ logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
+ sys.exit(1)
+
+try:
+ if "task.foreach" in jobp:
+ if args.dry_run or arvados.current_task()['sequence'] == 0:
+ # This is the first task to start the other tasks and exit
+ fvars = jobp["task.foreach"]
+ if isinstance(fvars, basestring):
+ fvars = [fvars]
+ if not isinstance(fvars, list) or len(fvars) == 0:
+ logger.error("value of task.foreach must be a string or non-empty list")
+ sys.exit(1)
+ recursive_foreach(jobp, jobp["task.foreach"])
+ if not args.dry_run:
+ if "task.vwd" in jobp:
+ # Set output of the first task to the base vwd collection so it
+ # will be merged with output fragments from the other tasks by
+ # crunch.
+ arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
+ else:
+ arvados.current_task().set_output(None)
+ sys.exit(0)
+ else:
+ # This is the only task so taskp/jobp are the same
+ taskp = jobp
+except Exception as e:
+ logger.exception("caught exception")
+ logger.error("job parameters were:")
+ logger.error(pprint.pformat(jobp))
+ sys.exit(1)