Merge branch '2488-jobs-pipeline-doc'
[arvados.git] / doc / _includes / _parallel_hash_script_py.liquid
1 #!/usr/bin/env python
2
3 import hashlib
4 import arvados
5
6 # Jobs consist of one or more tasks.  A task is a single invocation of
7 # a crunch script.
8
9 # Get the current task
10 this_task = arvados.current_task()
11
12 # Tasks have a sequence number for ordering.  All tasks
13 # with the current sequence number must finish successfully
14 # before tasks in the next sequence are started. 
15 # The first task has sequence number 0
16 if this_task['sequence'] == 0:
17     # Get the "input" field from "script_parameters" on the task object
18     job_input = arvados.current_job()['script_parameters']['input']
19
20     # Create a collection reader to read the input
21     cr = arvados.CollectionReader(job_input)
22
23     # Loop over each stream in the collection (a stream is a subset of
24     # files that logically represents a directory
25     for s in cr.all_streams():
26
27         # Loop over each file in the stream
28         for f in s.all_files():
29
30             # Synthesize a manifest for just this file
31             task_input = f.as_manifest()
32
33             # Set attributes for a new task:
34             # 'job_uuid' the job that this task is part of
35             # 'created_by_job_task_uuid' this task that is creating the new task
36             # 'sequence' the sequence number of the new task
37             # 'parameters' the parameters to be passed to the new task
38             new_task_attrs = {
39                 'job_uuid': arvados.current_job()['uuid'],
40                 'created_by_job_task_uuid': arvados.current_task()['uuid'],
41                 'sequence': 1,
42                 'parameters': {
43                     'input':task_input
44                     }
45                 }
46
47             # Ask the Arvados API server to create a new task, running the same
48             # script as the parent task specified in 'created_by_job_task_uuid'
49             arvados.api().job_tasks().create(body=new_task_attrs).execute()
50
51     # Now tell the Arvados API server that this task executed successfully,
52     # even though it doesn't have any output.
53     this_task.set_output(None)
54 else:
55     # The task sequence was not 0, so it must be a parallel worker task
56     # created by the first task
57
58     # Instead of getting "input" from the "script_parameters" field of
59     # the job object, we get it from the "parameters" field of the
60     # task object
61     this_task_input = this_task['parameters']['input']
62
63     collection = arvados.CollectionReader(this_task_input)
64
65     out = arvados.CollectionWriter()
66     out.set_current_file_name("md5sum.txt")
67
68     # There should only be one file in the collection, so get the
69     # first one.  collection.all_files() returns an iterator so we
70     # need to make it into a list for indexed access.
71     input_file = list(collection.all_files())[0]
72
73     # Everything after this is the same as the first tutorial.
74     digestor = hashlib.new('md5')
75
76     while True:
77         buf = input_file.read(2**20)
78         if len(buf) == 0:
79             break
80         digestor.update(buf)
81
82     hexdigest = digestor.hexdigest()
83     file_name = input_file.name()
84     if input_file.stream_name() != '.':
85         file_name = os.join(input_file.stream_name(), file_name)
86     out.write("%s %s\n" % (hexdigest, file_name))
87     output_id = out.finish()
88     this_task.set_output(output_id)
89
90 # Done!