Merge branch '12764-writable-file' refs #12764
[arvados.git] / doc / _includes / _concurrent_hash_script_py.liquid
1 #!/usr/bin/env python
2 {% comment %}
3 Copyright (C) The Arvados Authors. All rights reserved.
4
5 SPDX-License-Identifier: CC-BY-SA-3.0
6 {% endcomment %}
7
8 import hashlib
9 import os
10 import arvados
11
12 # Jobs consist of one or more tasks.  A task is a single invocation of
13 # a crunch script.
14
15 # Get the current task
16 this_task = arvados.current_task()
17
18 # Tasks have a sequence number for ordering.  All tasks
19 # with the current sequence number must finish successfully
20 # before tasks in the next sequence are started.
21 # The first task has sequence number 0
22 if this_task['sequence'] == 0:
23     # Get the "input" field from "script_parameters" on the task object
24     job_input = arvados.current_job()['script_parameters']['input']
25
26     # Create a collection reader to read the input
27     cr = arvados.CollectionReader(job_input)
28
29     # Loop over each stream in the collection (a stream is a subset of
30     # files that logically represents a directory)
31     for s in cr.all_streams():
32
33         # Loop over each file in the stream
34         for f in s.all_files():
35
36             # Synthesize a manifest for just this file
37             task_input = f.as_manifest()
38
39             # Set attributes for a new task:
40             # 'job_uuid' the job that this task is part of
41             # 'created_by_job_task_uuid' this task that is creating the new task
42             # 'sequence' the sequence number of the new task
43             # 'parameters' the parameters to be passed to the new task
44             new_task_attrs = {
45                 'job_uuid': arvados.current_job()['uuid'],
46                 'created_by_job_task_uuid': arvados.current_task()['uuid'],
47                 'sequence': 1,
48                 'parameters': {
49                     'input':task_input
50                     }
51                 }
52
53             # Ask the Arvados API server to create a new task, running the same
54             # script as the parent task specified in 'created_by_job_task_uuid'
55             arvados.api().job_tasks().create(body=new_task_attrs).execute()
56
57     # Now tell the Arvados API server that this task executed successfully,
58     # even though it doesn't have any output.
59     this_task.set_output(None)
60 else:
61     # The task sequence was not 0, so it must be a parallel worker task
62     # created by the first task
63
64     # Instead of getting "input" from the "script_parameters" field of
65     # the job object, we get it from the "parameters" field of the
66     # task object
67     this_task_input = this_task['parameters']['input']
68
69     collection = arvados.CollectionReader(this_task_input)
70
71     # There should only be one file in the collection, so get the
72     # first one from the all files iterator.
73     input_file = next(collection.all_files())
74     output_path = os.path.normpath(os.path.join(input_file.stream_name(),
75                                                 input_file.name))
76
77     # Everything after this is the same as the first tutorial.
78     digestor = hashlib.new('md5')
79     for buf in input_file.readall():
80         digestor.update(buf)
81
82     out = arvados.CollectionWriter()
83     with out.open('md5sum.txt') as out_file:
84         out_file.write("{} {}\n".format(digestor.hexdigest(), output_path))
85
86     this_task.set_output(out.finish())
87
88 # Done!