+ if "task.foreach" in jobp:
+ if arvados.current_task()['sequence'] == 0:
+ var = jobp["task.foreach"]
+ items = get_items(jobp, jobp[var])
+ logging.info("parallelizing on %s with items %s" % (var, items))
+ if items is not None:
+ for i in items:
+ params = copy.copy(jobp)
+ params[var] = i
+ arvados.api().job_tasks().create(body={
+ 'job_uuid': arvados.current_job()['uuid'],
+ 'created_by_job_task_uuid': arvados.current_task()['uuid'],
+ 'sequence': 1,
+ 'parameters': params
+ }
+ ).execute()
+ if "task.vwd" in jobp:
+ # Base vwd collection will be merged with output fragments from
+ # the other tasks by crunch.
+ arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
+ else:
+ arvados.current_task().set_output(None)
+ sys.exit(0)
+ else:
+ sys.exit(1)
+ else:
+ taskp = jobp
+
+ if "task.vwd" in taskp:
+ # Populate output directory with symlinks to files in collection
+ vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
+
+ if "task.cwd" in taskp:
+ os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
+
+ cmd = expand_list(taskp, taskp["command"])
+
+ if "task.stdin" in taskp:
+ stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
+ stdinfile = open(stdinname, "rb")
+
+ if "task.stdout" in taskp:
+ stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])