7 # Jobs consist of one or more tasks. A task is a single invocation of
10 # Get the current task
11 this_task = arvados.current_task()
13 # Tasks have a sequence number for ordering. All tasks
14 # with the current sequence number must finish successfully
15 # before tasks in the next sequence are started.
16 # The first task has sequence number 0
17 if this_task['sequence'] == 0:
18 # Get the "input" field from "script_parameters" on the task object
19 job_input = arvados.current_job()['script_parameters']['input']
21 # Create a collection reader to read the input
22 cr = arvados.CollectionReader(job_input)
24 # Loop over each stream in the collection (a stream is a subset of
25 # files that logically represents a directory)
26 for s in cr.all_streams():
28 # Loop over each file in the stream
29 for f in s.all_files():
31 # Synthesize a manifest for just this file
32 task_input = f.as_manifest()
34 # Set attributes for a new task:
35 # 'job_uuid' the job that this task is part of
36 # 'created_by_job_task_uuid' this task that is creating the new task
37 # 'sequence' the sequence number of the new task
38 # 'parameters' the parameters to be passed to the new task
40 'job_uuid': arvados.current_job()['uuid'],
41 'created_by_job_task_uuid': arvados.current_task()['uuid'],
48 # Ask the Arvados API server to create a new task, running the same
49 # script as the parent task specified in 'created_by_job_task_uuid'
50 arvados.api().job_tasks().create(body=new_task_attrs).execute()
52 # Now tell the Arvados API server that this task executed successfully,
53 # even though it doesn't have any output.
54 this_task.set_output(None)
56 # The task sequence was not 0, so it must be a parallel worker task
57 # created by the first task
59 # Instead of getting "input" from the "script_parameters" field of
60 # the job object, we get it from the "parameters" field of the
62 this_task_input = this_task['parameters']['input']
64 collection = arvados.CollectionReader(this_task_input)
66 # There should only be one file in the collection, so get the
67 # first one from the all files iterator.
68 input_file = next(collection.all_files())
69 output_path = os.path.normpath(os.path.join(input_file.stream_name(),
72 # Everything after this is the same as the first tutorial.
73 digestor = hashlib.new('md5')
74 for buf in input_file.readall():
77 out = arvados.CollectionWriter()
78 with out.open('md5sum.txt') as out_file:
79 out_file.write("{} {}\n".format(digestor.hexdigest(), output_path))
81 this_task.set_output(out.finish())