12018: Enhanced readability of ProcessFile() function.
[arvados.git] / crunch_scripts / picard-gatk2-prep
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 import arvados
7 import os
8 import re
9 import sys
10 import subprocess
11 import arvados_picard
12 from arvados_ipc import *
13
14 arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
15
16 this_job = arvados.current_job()
17 this_task = arvados.current_task()
18 ref_dir = arvados.util.collection_extract(
19     collection = this_job['script_parameters']['reference'],
20     path = 'reference',
21     decompress = True)
22 ref_fasta_files = [os.path.join(ref_dir, f)
23                    for f in os.listdir(ref_dir)
24                    if re.search(r'\.fasta(\.gz)?$', f)]
25 input_collection = this_task['parameters']['input']
26
27 for s in arvados.CollectionReader(input_collection).all_streams():
28     for f in s.all_files():
29         input_stream_name = s.name()
30         input_file_name = f.name()
31         break
32
33 # Unfortunately, picard FixMateInformation cannot read from a pipe. We
34 # must copy the input to a temporary file before running picard.
35 input_bam_path = os.path.join(this_task.tmpdir, input_file_name)
36 with open(input_bam_path, 'wb') as bam:
37     for s in arvados.CollectionReader(input_collection).all_streams():
38         for f in s.all_files():
39             for s in f.readall():
40                 bam.write(s)
41
42 children = {}
43 pipes = {}
44
45 pipe_setup(pipes, 'fixmate')
46 if 0==named_fork(children, 'fixmate'):
47     pipe_closeallbut(pipes, ('fixmate', 'w'))
48     arvados_picard.run(
49         'FixMateInformation',
50         params={
51             'i': input_bam_path,
52             'o': '/dev/stdout',
53             'quiet': 'true',
54             'so': 'coordinate',
55             'validation_stringency': 'LENIENT',
56             'compression_level': 0
57             },
58         stdout=os.fdopen(pipes['fixmate','w'], 'wb', 2**20))
59     os._exit(0)
60 os.close(pipes.pop(('fixmate','w'), None))
61
62 pipe_setup(pipes, 'sortsam')
63 if 0==named_fork(children, 'sortsam'):
64     pipe_closeallbut(pipes, ('fixmate', 'r'), ('sortsam', 'w'))
65     arvados_picard.run(
66         'SortSam',
67         params={
68             'i': '/dev/stdin',
69             'o': '/dev/stdout',
70             'quiet': 'true',
71             'so': 'coordinate',
72             'validation_stringency': 'LENIENT',
73             'compression_level': 0
74             },
75         stdin=os.fdopen(pipes['fixmate','r'], 'rb', 2**20),
76         stdout=os.fdopen(pipes['sortsam','w'], 'wb', 2**20))
77     os._exit(0)
78
79 pipe_setup(pipes, 'reordersam')
80 if 0==named_fork(children, 'reordersam'):
81     pipe_closeallbut(pipes, ('sortsam', 'r'), ('reordersam', 'w'))
82     arvados_picard.run(
83         'ReorderSam',
84         params={
85             'i': '/dev/stdin',
86             'o': '/dev/stdout',
87             'reference': ref_fasta_files[0],
88             'quiet': 'true',
89             'validation_stringency': 'LENIENT',
90             'compression_level': 0
91             },
92         stdin=os.fdopen(pipes['sortsam','r'], 'rb', 2**20),
93         stdout=os.fdopen(pipes['reordersam','w'], 'wb', 2**20))
94     os._exit(0)
95
96 pipe_setup(pipes, 'addrg')
97 if 0==named_fork(children, 'addrg'):
98     pipe_closeallbut(pipes, ('reordersam', 'r'), ('addrg', 'w'))
99     arvados_picard.run(
100         'AddOrReplaceReadGroups',
101         params={
102             'i': '/dev/stdin',
103             'o': '/dev/stdout',
104             'quiet': 'true',
105             'rglb': this_job['script_parameters'].get('rglb', 0),
106             'rgpl': this_job['script_parameters'].get('rgpl', 'illumina'),
107             'rgpu': this_job['script_parameters'].get('rgpu', 0),
108             'rgsm': this_job['script_parameters'].get('rgsm', 0),
109             'validation_stringency': 'LENIENT'
110             },
111         stdin=os.fdopen(pipes['reordersam','r'], 'rb', 2**20),
112         stdout=os.fdopen(pipes['addrg','w'], 'wb', 2**20))
113     os._exit(0)
114
115 pipe_setup(pipes, 'bammanifest')
116 pipe_setup(pipes, 'bam')
117 pipe_setup(pipes, 'casm_in')
118 if 0==named_fork(children, 'bammanifest'):
119     pipe_closeallbut(pipes,
120                      ('addrg', 'r'),
121                      ('bammanifest', 'w'),
122                      ('bam', 'w'),
123                      ('casm_in', 'w'))
124     out = arvados.CollectionWriter()
125     out.start_new_stream(input_stream_name)
126     out.start_new_file(input_file_name)
127     while True:
128         buf = os.read(pipes['addrg','r'], 2**20)
129         if len(buf) == 0:
130             break
131         os.write(pipes['bam','w'], buf)
132         os.write(pipes['casm_in','w'], buf)
133         out.write(buf)
134     os.write(pipes['bammanifest','w'], out.manifest_text())
135     os.close(pipes['bammanifest','w'])
136     os._exit(0)
137
138 pipe_setup(pipes, 'casm')
139 if 0 == named_fork(children, 'casm'):
140     pipe_closeallbut(pipes, ('casm_in', 'r'), ('casm', 'w'))
141     arvados_picard.run(
142         'CollectAlignmentSummaryMetrics',
143         params={
144             'input': '/dev/fd/' + str(pipes['casm_in','r']),
145             'output': '/dev/fd/' + str(pipes['casm','w']),
146             'reference_sequence': ref_fasta_files[0],
147             'validation_stringency': 'LENIENT',
148             },
149         close_fds=False)
150     os._exit(0)
151
152 pipe_setup(pipes, 'index')
153 if 0==named_fork(children, 'index'):
154     pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
155     arvados_picard.run(
156         'BuildBamIndex',
157         params={
158             'i': '/dev/stdin',
159             'o': '/dev/stdout',
160             'quiet': 'true',
161             'validation_stringency': 'LENIENT'
162             },
163         stdin=os.fdopen(pipes['bam','r'], 'rb', 2**20),
164         stdout=os.fdopen(pipes['index','w'], 'wb', 2**20))
165     os._exit(0)
166
167 pipe_setup(pipes, 'indexmanifest')
168 if 0==named_fork(children, 'indexmanifest'):
169     pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
170     out = arvados.CollectionWriter()
171     out.start_new_stream(input_stream_name)
172     out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
173     while True:
174         buf = os.read(pipes['index','r'], 2**20)
175         if len(buf) == 0:
176             break
177         out.write(buf)
178     os.write(pipes['indexmanifest','w'], out.manifest_text())
179     os.close(pipes['indexmanifest','w'])
180     os._exit(0)
181
182 pipe_closeallbut(pipes,
183                  ('bammanifest', 'r'),
184                  ('indexmanifest', 'r'),
185                  ('casm', 'r'))
186
187 outmanifest = ''
188
189 for which in ['bammanifest', 'indexmanifest']:
190     with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
191         while True:
192             buf = f.read()
193             if buf == '':
194                 break
195             outmanifest += buf
196
197 casm_out = arvados.CollectionWriter()
198 casm_out.start_new_stream(input_stream_name)
199 casm_out.start_new_file(input_file_name + '.casm.tsv')
200 casm_out.write(os.fdopen(pipes.pop(('casm','r'))))
201
202 outmanifest += casm_out.manifest_text()
203
204 all_ok = True
205 for (childname, pid) in children.items():
206     all_ok = all_ok and waitpid_and_check_exit(pid, childname)
207
208 if all_ok:
209     this_task.set_output(outmanifest)
210 else:
211     sys.exit(1)