Merge branch '6598-crunch-progress-stats' closes #6598
[arvados.git] / crunch_scripts / GATK2-merge-call
1 #!/usr/bin/env python
2
3 import os
4 import re
5 import string
6 import threading
7 import arvados
8 import arvados_gatk2
9 import arvados_picard
10 from arvados_ipc import *
11
12 class InvalidArgumentError(Exception):
13     pass
14
15 this_job = arvados.current_job()
16 this_task = arvados.current_task()
17 tmpdir = arvados.current_task().tmpdir
18 arvados.util.clear_tmpdir()
19
20 bundle_dir = arvados.util.collection_extract(
21     collection = this_job['script_parameters']['gatk_bundle'],
22     files = [
23         'human_g1k_v37.dict',
24         'human_g1k_v37.fasta',
25         'human_g1k_v37.fasta.fai',
26         'dbsnp_137.b37.vcf',
27         'dbsnp_137.b37.vcf.idx',
28         ],
29     path = 'gatk_bundle')
30 ref_fasta_files = [os.path.join(bundle_dir, f)
31                    for f in os.listdir(bundle_dir)
32                    if re.search(r'\.fasta(\.gz)?$', f)]
33 regions_args = []
34 if 'regions' in this_job['script_parameters']:
35     regions_dir = arvados.util.collection_extract(
36         collection = this_job['script_parameters']['regions'],
37         path = 'regions')
38     region_padding = int(this_job['script_parameters']['region_padding'])
39     for f in os.listdir(regions_dir):
40         if re.search(r'\.bed$', f):
41             regions_args += [
42                 '--intervals', os.path.join(regions_dir, f),
43                 '--interval_padding', str(region_padding)
44                 ]
45
46
47 # Start a child process for each input file, feeding data to picard.
48
49 input_child_names = []
50 children = {}
51 pipes = {}
52
53 input_collection = this_job['script_parameters']['input']
54 input_index = 0
55 for s in arvados.CollectionReader(input_collection).all_streams():
56     for f in s.all_files():
57         if not re.search(r'\.bam$', f.name()):
58             continue
59         input_index += 1
60         childname = 'input-' + str(input_index)
61         input_child_names += [childname]
62         pipe_setup(pipes, childname)
63         childpid = named_fork(children, childname)
64         if childpid == 0:
65             pipe_closeallbut(pipes, (childname, 'w'))
66             for s in f.readall():
67                 os.write(pipes[childname, 'w'], s)
68             os.close(pipes[childname, 'w'])
69             os._exit(0)
70         sys.stderr.write("pid %d writing %s to fd %d->%d\n" %
71                          (childpid,
72                           s.name()+'/'+f.name(),
73                           pipes[childname, 'w'],
74                           pipes[childname, 'r']))
75         pipe_closeallbut(pipes, *[(childname, 'r')
76                                   for childname in input_child_names])
77
78
79 # Merge-sort the input files to merge.bam
80
81 arvados_picard.run(
82     'MergeSamFiles',
83     args=[
84         'I=/dev/fd/' + str(pipes[childname, 'r'])
85         for childname in input_child_names
86         ],
87     params={
88         'o': 'merge.bam',
89         'quiet': 'true',
90         'so': 'coordinate',
91         'use_threading': 'true',
92         'create_index': 'true',
93         'validation_stringency': 'LENIENT',
94         },
95     close_fds=False,
96     )
97 pipe_closeallbut(pipes)
98
99
100 # Run CoverageBySample on merge.bam
101
102 pipe_setup(pipes, 'stats_log')
103 pipe_setup(pipes, 'stats_out')
104 if 0 == named_fork(children, 'GATK'):
105     pipe_closeallbut(pipes,
106                      ('stats_log', 'w'),
107                      ('stats_out', 'w'))
108     arvados_gatk2.run(
109         args=[
110             '-T', 'CoverageBySample',
111             '-R', ref_fasta_files[0],
112             '-I', 'merge.bam',
113             '-o', '/dev/fd/' + str(pipes['stats_out', 'w']),
114             '--log_to_file', '/dev/fd/' + str(pipes['stats_log', 'w']),
115             ]
116         + regions_args,
117         close_fds=False)
118     pipe_closeallbut(pipes)
119     os._exit(0)
120 pipe_closeallbut(pipes, ('stats_log', 'r'), ('stats_out', 'r'))
121
122
123 # Start two threads to read from CoverageBySample pipes
124
125 class ExceptionPropagatingThread(threading.Thread):
126     """
127     If a subclassed thread calls _raise(e) in run(), running join() on
128     the thread will raise e in the thread that calls join().
129     """
130     def __init__(self, *args, **kwargs):
131         super(ExceptionPropagatingThread, self).__init__(*args, **kwargs)
132         self.__exception = None
133     def join(self, *args, **kwargs):
134         ret = super(ExceptionPropagatingThread, self).join(*args, **kwargs)
135         if self.__exception:
136             raise self.__exception
137         return ret
138     def _raise(self, exception):
139         self.__exception = exception
140
141 class StatsLogReader(ExceptionPropagatingThread):
142     def __init__(self, **kwargs):
143         super(StatsLogReader, self).__init__()
144         self.args = kwargs
145     def run(self):
146         try:
147             for logline in self.args['infile']:
148                 x = re.search('Processing (\d+) bp from intervals', logline)
149                 if x:
150                     self._total_bp = int(x.group(1))
151         except Exception as e:
152             self._raise(e)
153     def total_bp(self):
154         self.join()
155         return self._total_bp
156 stats_log_thr = StatsLogReader(infile=os.fdopen(pipes.pop(('stats_log', 'r'))))
157 stats_log_thr.start()
158
159 class StatsOutReader(ExceptionPropagatingThread):
160     """
161     Read output of CoverageBySample and collect a histogram of
162     coverage (last column) -> number of loci (number of rows).
163     """
164     def __init__(self, **kwargs):
165         super(StatsOutReader, self).__init__()
166         self.args = kwargs
167     def run(self):
168         try:
169             hist = [0]
170             histtot = 0
171             for line in self.args['infile']:
172                 try:
173                     i = int(string.split(line)[-1])
174                 except ValueError:
175                     continue
176                 if i >= 1:
177                     if len(hist) <= i:
178                         hist.extend([0 for x in range(1+i-len(hist))])
179                     hist[i] += 1
180                     histtot += 1
181             hist[0] = stats_log_thr.total_bp() - histtot
182             self._histogram = hist
183         except Exception as e:
184             self._raise(e)
185     def histogram(self):
186         self.join()
187         return self._histogram
188 stats_out_thr = StatsOutReader(infile=os.fdopen(pipes.pop(('stats_out', 'r'))))
189 stats_out_thr.start()
190
191
192 # Run UnifiedGenotyper on merge.bam
193
194 arvados_gatk2.run(
195     args=[
196         '-nt', arvados_gatk2.cpus_on_this_node(),
197         '-T', 'UnifiedGenotyper',
198         '-R', ref_fasta_files[0],
199         '-I', 'merge.bam',
200         '-o', os.path.join(tmpdir, 'out.vcf'),
201         '--dbsnp', os.path.join(bundle_dir, 'dbsnp_137.b37.vcf'),
202         '-metrics', 'UniGenMetrics',
203         '-A', 'DepthOfCoverage',
204         '-A', 'AlleleBalance',
205         '-A', 'QualByDepth',
206         '-A', 'HaplotypeScore',
207         '-A', 'MappingQualityRankSumTest',
208         '-A', 'ReadPosRankSumTest',
209         '-A', 'FisherStrand',
210         '-glm', 'both',
211         ]
212     + regions_args
213     + arvados.getjobparam('GATK2_UnifiedGenotyper_args',[]))
214
215 # Copy the output VCF file to Keep
216
217 out = arvados.CollectionWriter()
218 out.start_new_stream()
219 out.start_new_file('out.vcf')
220 out.write(open(os.path.join(tmpdir, 'out.vcf'), 'rb'))
221
222
223 # Write statistics to Keep
224
225 out.start_new_file('mincoverage_nlocus.csv')
226 sofar = 0
227 hist = stats_out_thr.histogram()
228 total_bp = stats_log_thr.total_bp()
229 for i in range(len(hist)):
230     out.write("%d,%d,%f\n" %
231               (i,
232                total_bp - sofar,
233                100.0 * (total_bp - sofar) / total_bp))
234     sofar += hist[i]
235
236 if waitpid_and_check_children(children):
237     this_task.set_output(out.finish())
238 else:
239     sys.exit(1)