3959: crunch-job use job locking API
[arvados.git] / crunch_scripts / picard-gatk2-prep
1 #!/usr/bin/env python
2
3 import arvados
4 import os
5 import re
6 import sys
7 import subprocess
8 import arvados_picard
9 from arvados_ipc import *
10
11 arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
12
13 this_job = arvados.current_job()
14 this_task = arvados.current_task()
15 ref_dir = arvados.util.collection_extract(
16     collection = this_job['script_parameters']['reference'],
17     path = 'reference',
18     decompress = True)
19 ref_fasta_files = [os.path.join(ref_dir, f)
20                    for f in os.listdir(ref_dir)
21                    if re.search(r'\.fasta(\.gz)?$', f)]
22 input_collection = this_task['parameters']['input']
23
24 for s in arvados.CollectionReader(input_collection).all_streams():
25     for f in s.all_files():
26         input_stream_name = s.name()
27         input_file_name = f.name()
28         break
29
30 # Unfortunately, picard FixMateInformation cannot read from a pipe. We
31 # must copy the input to a temporary file before running picard.
32 input_bam_path = os.path.join(this_task.tmpdir, input_file_name)
33 with open(input_bam_path, 'wb') as bam:
34     for s in arvados.CollectionReader(input_collection).all_streams():
35         for f in s.all_files():
36             for s in f.readall():
37                 bam.write(s)
38
39 children = {}
40 pipes = {}
41
42 pipe_setup(pipes, 'fixmate')
43 if 0==named_fork(children, 'fixmate'):
44     pipe_closeallbut(pipes, ('fixmate', 'w'))
45     arvados_picard.run(
46         'FixMateInformation',
47         params={
48             'i': input_bam_path,
49             'o': '/dev/stdout',
50             'quiet': 'true',
51             'so': 'coordinate',
52             'validation_stringency': 'LENIENT',
53             'compression_level': 0
54             },
55         stdout=os.fdopen(pipes['fixmate','w'], 'wb', 2**20))
56     os._exit(0)
57 os.close(pipes.pop(('fixmate','w'), None))
58
59 pipe_setup(pipes, 'sortsam')
60 if 0==named_fork(children, 'sortsam'):
61     pipe_closeallbut(pipes, ('fixmate', 'r'), ('sortsam', 'w'))
62     arvados_picard.run(
63         'SortSam',
64         params={
65             'i': '/dev/stdin',
66             'o': '/dev/stdout',
67             'quiet': 'true',
68             'so': 'coordinate',
69             'validation_stringency': 'LENIENT',
70             'compression_level': 0
71             },
72         stdin=os.fdopen(pipes['fixmate','r'], 'rb', 2**20),
73         stdout=os.fdopen(pipes['sortsam','w'], 'wb', 2**20))
74     os._exit(0)
75
76 pipe_setup(pipes, 'reordersam')
77 if 0==named_fork(children, 'reordersam'):
78     pipe_closeallbut(pipes, ('sortsam', 'r'), ('reordersam', 'w'))
79     arvados_picard.run(
80         'ReorderSam',
81         params={
82             'i': '/dev/stdin',
83             'o': '/dev/stdout',
84             'reference': ref_fasta_files[0],
85             'quiet': 'true',
86             'validation_stringency': 'LENIENT',
87             'compression_level': 0
88             },
89         stdin=os.fdopen(pipes['sortsam','r'], 'rb', 2**20),
90         stdout=os.fdopen(pipes['reordersam','w'], 'wb', 2**20))
91     os._exit(0)
92
93 pipe_setup(pipes, 'addrg')
94 if 0==named_fork(children, 'addrg'):
95     pipe_closeallbut(pipes, ('reordersam', 'r'), ('addrg', 'w'))
96     arvados_picard.run(
97         'AddOrReplaceReadGroups',
98         params={
99             'i': '/dev/stdin',
100             'o': '/dev/stdout',
101             'quiet': 'true',
102             'rglb': this_job['script_parameters'].get('rglb', 0),
103             'rgpl': this_job['script_parameters'].get('rgpl', 'illumina'),
104             'rgpu': this_job['script_parameters'].get('rgpu', 0),
105             'rgsm': this_job['script_parameters'].get('rgsm', 0),
106             'validation_stringency': 'LENIENT'
107             },
108         stdin=os.fdopen(pipes['reordersam','r'], 'rb', 2**20),
109         stdout=os.fdopen(pipes['addrg','w'], 'wb', 2**20))
110     os._exit(0)
111
112 pipe_setup(pipes, 'bammanifest')
113 pipe_setup(pipes, 'bam')
114 pipe_setup(pipes, 'casm_in')
115 if 0==named_fork(children, 'bammanifest'):
116     pipe_closeallbut(pipes,
117                      ('addrg', 'r'),
118                      ('bammanifest', 'w'),
119                      ('bam', 'w'),
120                      ('casm_in', 'w'))
121     out = arvados.CollectionWriter()
122     out.start_new_stream(input_stream_name)
123     out.start_new_file(input_file_name)
124     while True:
125         buf = os.read(pipes['addrg','r'], 2**20)
126         if len(buf) == 0:
127             break
128         os.write(pipes['bam','w'], buf)
129         os.write(pipes['casm_in','w'], buf)
130         out.write(buf)
131     os.write(pipes['bammanifest','w'], out.manifest_text())
132     os.close(pipes['bammanifest','w'])
133     os._exit(0)
134
135 pipe_setup(pipes, 'casm')
136 if 0 == named_fork(children, 'casm'):
137     pipe_closeallbut(pipes, ('casm_in', 'r'), ('casm', 'w'))
138     arvados_picard.run(
139         'CollectAlignmentSummaryMetrics',
140         params={
141             'input': '/dev/fd/' + str(pipes['casm_in','r']),
142             'output': '/dev/fd/' + str(pipes['casm','w']),
143             'reference_sequence': ref_fasta_files[0],
144             'validation_stringency': 'LENIENT',
145             },
146         close_fds=False)
147     os._exit(0)
148
149 pipe_setup(pipes, 'index')
150 if 0==named_fork(children, 'index'):
151     pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
152     arvados_picard.run(
153         'BuildBamIndex',
154         params={
155             'i': '/dev/stdin',
156             'o': '/dev/stdout',
157             'quiet': 'true',
158             'validation_stringency': 'LENIENT'
159             },
160         stdin=os.fdopen(pipes['bam','r'], 'rb', 2**20),
161         stdout=os.fdopen(pipes['index','w'], 'wb', 2**20))
162     os._exit(0)
163
164 pipe_setup(pipes, 'indexmanifest')
165 if 0==named_fork(children, 'indexmanifest'):
166     pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
167     out = arvados.CollectionWriter()
168     out.start_new_stream(input_stream_name)
169     out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
170     while True:
171         buf = os.read(pipes['index','r'], 2**20)
172         if len(buf) == 0:
173             break
174         out.write(buf)
175     os.write(pipes['indexmanifest','w'], out.manifest_text())
176     os.close(pipes['indexmanifest','w'])
177     os._exit(0)
178
179 pipe_closeallbut(pipes,
180                  ('bammanifest', 'r'),
181                  ('indexmanifest', 'r'),
182                  ('casm', 'r'))
183
184 outmanifest = ''
185
186 for which in ['bammanifest', 'indexmanifest']:
187     with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
188         while True:
189             buf = f.read()
190             if buf == '':
191                 break
192             outmanifest += buf
193
194 casm_out = arvados.CollectionWriter()
195 casm_out.start_new_stream(input_stream_name)
196 casm_out.start_new_file(input_file_name + '.casm.tsv')
197 casm_out.write(os.fdopen(pipes.pop(('casm','r'))))
198
199 outmanifest += casm_out.manifest_text()
200
201 all_ok = True
202 for (childname, pid) in children.items():
203     all_ok = all_ok and waitpid_and_check_exit(pid, childname)
204
205 if all_ok:
206     this_task.set_output(outmanifest)
207 else:
208     sys.exit(1)