14807: Pass token to service handlers explicitly, not via context.
[arvados.git] / crunch_scripts / GATK2-merge-call
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 import os
7 import re
8 import string
9 import threading
10 import arvados
11 import arvados_gatk2
12 import arvados_picard
13 from arvados_ipc import *
14
15 class InvalidArgumentError(Exception):
16     pass
17
18 this_job = arvados.current_job()
19 this_task = arvados.current_task()
20 tmpdir = arvados.current_task().tmpdir
21 arvados.util.clear_tmpdir()
22
23 bundle_dir = arvados.util.collection_extract(
24     collection = this_job['script_parameters']['gatk_bundle'],
25     files = [
26         'human_g1k_v37.dict',
27         'human_g1k_v37.fasta',
28         'human_g1k_v37.fasta.fai',
29         'dbsnp_137.b37.vcf',
30         'dbsnp_137.b37.vcf.idx',
31         ],
32     path = 'gatk_bundle')
33 ref_fasta_files = [os.path.join(bundle_dir, f)
34                    for f in os.listdir(bundle_dir)
35                    if re.search(r'\.fasta(\.gz)?$', f)]
36 regions_args = []
37 if 'regions' in this_job['script_parameters']:
38     regions_dir = arvados.util.collection_extract(
39         collection = this_job['script_parameters']['regions'],
40         path = 'regions')
41     region_padding = int(this_job['script_parameters']['region_padding'])
42     for f in os.listdir(regions_dir):
43         if re.search(r'\.bed$', f):
44             regions_args += [
45                 '--intervals', os.path.join(regions_dir, f),
46                 '--interval_padding', str(region_padding)
47                 ]
48
49
50 # Start a child process for each input file, feeding data to picard.
51
52 input_child_names = []
53 children = {}
54 pipes = {}
55
56 input_collection = this_job['script_parameters']['input']
57 input_index = 0
58 for s in arvados.CollectionReader(input_collection).all_streams():
59     for f in s.all_files():
60         if not re.search(r'\.bam$', f.name()):
61             continue
62         input_index += 1
63         childname = 'input-' + str(input_index)
64         input_child_names += [childname]
65         pipe_setup(pipes, childname)
66         childpid = named_fork(children, childname)
67         if childpid == 0:
68             pipe_closeallbut(pipes, (childname, 'w'))
69             for s in f.readall():
70                 os.write(pipes[childname, 'w'], s)
71             os.close(pipes[childname, 'w'])
72             os._exit(0)
73         sys.stderr.write("pid %d writing %s to fd %d->%d\n" %
74                          (childpid,
75                           s.name()+'/'+f.name(),
76                           pipes[childname, 'w'],
77                           pipes[childname, 'r']))
78         pipe_closeallbut(pipes, *[(childname, 'r')
79                                   for childname in input_child_names])
80
81
82 # Merge-sort the input files to merge.bam
83
84 arvados_picard.run(
85     'MergeSamFiles',
86     args=[
87         'I=/dev/fd/' + str(pipes[childname, 'r'])
88         for childname in input_child_names
89         ],
90     params={
91         'o': 'merge.bam',
92         'quiet': 'true',
93         'so': 'coordinate',
94         'use_threading': 'true',
95         'create_index': 'true',
96         'validation_stringency': 'LENIENT',
97         },
98     close_fds=False,
99     )
100 pipe_closeallbut(pipes)
101
102
103 # Run CoverageBySample on merge.bam
104
105 pipe_setup(pipes, 'stats_log')
106 pipe_setup(pipes, 'stats_out')
107 if 0 == named_fork(children, 'GATK'):
108     pipe_closeallbut(pipes,
109                      ('stats_log', 'w'),
110                      ('stats_out', 'w'))
111     arvados_gatk2.run(
112         args=[
113             '-T', 'CoverageBySample',
114             '-R', ref_fasta_files[0],
115             '-I', 'merge.bam',
116             '-o', '/dev/fd/' + str(pipes['stats_out', 'w']),
117             '--log_to_file', '/dev/fd/' + str(pipes['stats_log', 'w']),
118             ]
119         + regions_args,
120         close_fds=False)
121     pipe_closeallbut(pipes)
122     os._exit(0)
123 pipe_closeallbut(pipes, ('stats_log', 'r'), ('stats_out', 'r'))
124
125
126 # Start two threads to read from CoverageBySample pipes
127
128 class ExceptionPropagatingThread(threading.Thread):
129     """
130     If a subclassed thread calls _raise(e) in run(), running join() on
131     the thread will raise e in the thread that calls join().
132     """
133     def __init__(self, *args, **kwargs):
134         super(ExceptionPropagatingThread, self).__init__(*args, **kwargs)
135         self.__exception = None
136     def join(self, *args, **kwargs):
137         ret = super(ExceptionPropagatingThread, self).join(*args, **kwargs)
138         if self.__exception:
139             raise self.__exception
140         return ret
141     def _raise(self, exception):
142         self.__exception = exception
143
144 class StatsLogReader(ExceptionPropagatingThread):
145     def __init__(self, **kwargs):
146         super(StatsLogReader, self).__init__()
147         self.args = kwargs
148     def run(self):
149         try:
150             for logline in self.args['infile']:
151                 x = re.search('Processing (\d+) bp from intervals', logline)
152                 if x:
153                     self._total_bp = int(x.group(1))
154         except Exception as e:
155             self._raise(e)
156     def total_bp(self):
157         self.join()
158         return self._total_bp
159 stats_log_thr = StatsLogReader(infile=os.fdopen(pipes.pop(('stats_log', 'r'))))
160 stats_log_thr.start()
161
162 class StatsOutReader(ExceptionPropagatingThread):
163     """
164     Read output of CoverageBySample and collect a histogram of
165     coverage (last column) -> number of loci (number of rows).
166     """
167     def __init__(self, **kwargs):
168         super(StatsOutReader, self).__init__()
169         self.args = kwargs
170     def run(self):
171         try:
172             hist = [0]
173             histtot = 0
174             for line in self.args['infile']:
175                 try:
176                     i = int(string.split(line)[-1])
177                 except ValueError:
178                     continue
179                 if i >= 1:
180                     if len(hist) <= i:
181                         hist.extend([0 for x in range(1+i-len(hist))])
182                     hist[i] += 1
183                     histtot += 1
184             hist[0] = stats_log_thr.total_bp() - histtot
185             self._histogram = hist
186         except Exception as e:
187             self._raise(e)
188     def histogram(self):
189         self.join()
190         return self._histogram
191 stats_out_thr = StatsOutReader(infile=os.fdopen(pipes.pop(('stats_out', 'r'))))
192 stats_out_thr.start()
193
194
195 # Run UnifiedGenotyper on merge.bam
196
197 arvados_gatk2.run(
198     args=[
199         '-nt', arvados_gatk2.cpus_on_this_node(),
200         '-T', 'UnifiedGenotyper',
201         '-R', ref_fasta_files[0],
202         '-I', 'merge.bam',
203         '-o', os.path.join(tmpdir, 'out.vcf'),
204         '--dbsnp', os.path.join(bundle_dir, 'dbsnp_137.b37.vcf'),
205         '-metrics', 'UniGenMetrics',
206         '-A', 'DepthOfCoverage',
207         '-A', 'AlleleBalance',
208         '-A', 'QualByDepth',
209         '-A', 'HaplotypeScore',
210         '-A', 'MappingQualityRankSumTest',
211         '-A', 'ReadPosRankSumTest',
212         '-A', 'FisherStrand',
213         '-glm', 'both',
214         ]
215     + regions_args
216     + arvados.getjobparam('GATK2_UnifiedGenotyper_args',[]))
217
218 # Copy the output VCF file to Keep
219
220 out = arvados.CollectionWriter()
221 out.start_new_stream()
222 out.start_new_file('out.vcf')
223 out.write(open(os.path.join(tmpdir, 'out.vcf'), 'rb'))
224
225
226 # Write statistics to Keep
227
228 out.start_new_file('mincoverage_nlocus.csv')
229 sofar = 0
230 hist = stats_out_thr.histogram()
231 total_bp = stats_log_thr.total_bp()
232 for i in range(len(hist)):
233     out.write("%d,%d,%f\n" %
234               (i,
235                total_bp - sofar,
236                100.0 * (total_bp - sofar) / total_bp))
237     sofar += hist[i]
238
239 if waitpid_and_check_children(children):
240     this_task.set_output(out.finish())
241 else:
242     sys.exit(1)