- var = jobp["task.foreach"]
- items = get_items(jobp, jobp[var])
- logging.info("parallelizing on %s with items %s" % (var, items))
- if items is not None:
- for i in items:
- params = copy.copy(jobp)
- params[var] = i
- arvados.api().job_tasks().create(body={
- 'job_uuid': arvados.current_job()['uuid'],
- 'created_by_job_task_uuid': arvados.current_task()['uuid'],
- 'sequence': 1,
- 'parameters': params
- }
- ).execute()
- if "task.vwd" in jobp:
- # Base vwd collection will be merged with output fragments from
- # the other tasks by crunch.
- arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
- else:
- arvados.current_task().set_output(None)
- sys.exit(0)
- else:
+ # This is the first task to start the other tasks and exit
+ fvars = jobp["task.foreach"]
+ if isinstance(fvars, basestring):
+ fvars = [fvars]
+ if not isinstance(fvars, list) or len(fvars) == 0:
+ logger.error("value of task.foreach must be a string or non-empty list")