3 Copyright (C) The Arvados Authors. All rights reserved.
5 SPDX-License-Identifier: CC-BY-SA-3.0
12 # Jobs consist of one or more tasks. A task is a single invocation of
15 # Get the current task
16 this_task = arvados.current_task()
18 # Tasks have a sequence number for ordering. All tasks
19 # with the current sequence number must finish successfully
20 # before tasks in the next sequence are started.
21 # The first task has sequence number 0
22 if this_task['sequence'] == 0:
23 # Get the "input" field from "script_parameters" on the task object
24 job_input = arvados.current_job()['script_parameters']['input']
26 # Create a collection reader to read the input
27 cr = arvados.CollectionReader(job_input)
29 # Loop over each stream in the collection (a stream is a subset of
30 # files that logically represents a directory)
31 for s in cr.all_streams():
33 # Loop over each file in the stream
34 for f in s.all_files():
36 # Synthesize a manifest for just this file
37 task_input = f.as_manifest()
39 # Set attributes for a new task:
40 # 'job_uuid' the job that this task is part of
41 # 'created_by_job_task_uuid' this task that is creating the new task
42 # 'sequence' the sequence number of the new task
43 # 'parameters' the parameters to be passed to the new task
45 'job_uuid': arvados.current_job()['uuid'],
46 'created_by_job_task_uuid': arvados.current_task()['uuid'],
53 # Ask the Arvados API server to create a new task, running the same
54 # script as the parent task specified in 'created_by_job_task_uuid'
55 arvados.api().job_tasks().create(body=new_task_attrs).execute()
57 # Now tell the Arvados API server that this task executed successfully,
58 # even though it doesn't have any output.
59 this_task.set_output(None)
61 # The task sequence was not 0, so it must be a parallel worker task
62 # created by the first task
64 # Instead of getting "input" from the "script_parameters" field of
65 # the job object, we get it from the "parameters" field of the
67 this_task_input = this_task['parameters']['input']
69 collection = arvados.CollectionReader(this_task_input)
71 # There should only be one file in the collection, so get the
72 # first one from the all files iterator.
73 input_file = next(collection.all_files())
74 output_path = os.path.normpath(os.path.join(input_file.stream_name(),
77 # Everything after this is the same as the first tutorial.
78 digestor = hashlib.new('md5')
79 for buf in input_file.readall():
82 out = arvados.CollectionWriter()
83 with out.open('md5sum.txt') as out_file:
84 out_file.write("{} {}\n".format(digestor.hexdigest(), output_path))
86 this_task.set_output(out.finish())