2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
13 from arvados_ipc import *
15 class InvalidArgumentError(Exception):
18 this_job = arvados.current_job()
19 this_task = arvados.current_task()
20 tmpdir = arvados.current_task().tmpdir
21 arvados.util.clear_tmpdir()
23 bundle_dir = arvados.util.collection_extract(
24 collection = this_job['script_parameters']['gatk_bundle'],
27 'human_g1k_v37.fasta',
28 'human_g1k_v37.fasta.fai',
30 'dbsnp_137.b37.vcf.idx',
33 ref_fasta_files = [os.path.join(bundle_dir, f)
34 for f in os.listdir(bundle_dir)
35 if re.search(r'\.fasta(\.gz)?$', f)]
37 if 'regions' in this_job['script_parameters']:
38 regions_dir = arvados.util.collection_extract(
39 collection = this_job['script_parameters']['regions'],
41 region_padding = int(this_job['script_parameters']['region_padding'])
42 for f in os.listdir(regions_dir):
43 if re.search(r'\.bed$', f):
45 '--intervals', os.path.join(regions_dir, f),
46 '--interval_padding', str(region_padding)
50 # Start a child process for each input file, feeding data to picard.
52 input_child_names = []
56 input_collection = this_job['script_parameters']['input']
58 for s in arvados.CollectionReader(input_collection).all_streams():
59 for f in s.all_files():
60 if not re.search(r'\.bam$', f.name()):
63 childname = 'input-' + str(input_index)
64 input_child_names += [childname]
65 pipe_setup(pipes, childname)
66 childpid = named_fork(children, childname)
68 pipe_closeallbut(pipes, (childname, 'w'))
70 os.write(pipes[childname, 'w'], s)
71 os.close(pipes[childname, 'w'])
73 sys.stderr.write("pid %d writing %s to fd %d->%d\n" %
75 s.name()+'/'+f.name(),
76 pipes[childname, 'w'],
77 pipes[childname, 'r']))
78 pipe_closeallbut(pipes, *[(childname, 'r')
79 for childname in input_child_names])
82 # Merge-sort the input files to merge.bam
87 'I=/dev/fd/' + str(pipes[childname, 'r'])
88 for childname in input_child_names
94 'use_threading': 'true',
95 'create_index': 'true',
96 'validation_stringency': 'LENIENT',
100 pipe_closeallbut(pipes)
103 # Run CoverageBySample on merge.bam
105 pipe_setup(pipes, 'stats_log')
106 pipe_setup(pipes, 'stats_out')
107 if 0 == named_fork(children, 'GATK'):
108 pipe_closeallbut(pipes,
113 '-T', 'CoverageBySample',
114 '-R', ref_fasta_files[0],
116 '-o', '/dev/fd/' + str(pipes['stats_out', 'w']),
117 '--log_to_file', '/dev/fd/' + str(pipes['stats_log', 'w']),
121 pipe_closeallbut(pipes)
123 pipe_closeallbut(pipes, ('stats_log', 'r'), ('stats_out', 'r'))
126 # Start two threads to read from CoverageBySample pipes
128 class ExceptionPropagatingThread(threading.Thread):
130 If a subclassed thread calls _raise(e) in run(), running join() on
131 the thread will raise e in the thread that calls join().
133 def __init__(self, *args, **kwargs):
134 super(ExceptionPropagatingThread, self).__init__(*args, **kwargs)
135 self.__exception = None
136 def join(self, *args, **kwargs):
137 ret = super(ExceptionPropagatingThread, self).join(*args, **kwargs)
139 raise self.__exception
141 def _raise(self, exception):
142 self.__exception = exception
144 class StatsLogReader(ExceptionPropagatingThread):
145 def __init__(self, **kwargs):
146 super(StatsLogReader, self).__init__()
150 for logline in self.args['infile']:
151 x = re.search('Processing (\d+) bp from intervals', logline)
153 self._total_bp = int(x.group(1))
154 except Exception as e:
158 return self._total_bp
159 stats_log_thr = StatsLogReader(infile=os.fdopen(pipes.pop(('stats_log', 'r'))))
160 stats_log_thr.start()
162 class StatsOutReader(ExceptionPropagatingThread):
164 Read output of CoverageBySample and collect a histogram of
165 coverage (last column) -> number of loci (number of rows).
167 def __init__(self, **kwargs):
168 super(StatsOutReader, self).__init__()
174 for line in self.args['infile']:
176 i = int(string.split(line)[-1])
181 hist.extend([0 for x in range(1+i-len(hist))])
184 hist[0] = stats_log_thr.total_bp() - histtot
185 self._histogram = hist
186 except Exception as e:
190 return self._histogram
191 stats_out_thr = StatsOutReader(infile=os.fdopen(pipes.pop(('stats_out', 'r'))))
192 stats_out_thr.start()
195 # Run UnifiedGenotyper on merge.bam
199 '-nt', arvados_gatk2.cpus_on_this_node(),
200 '-T', 'UnifiedGenotyper',
201 '-R', ref_fasta_files[0],
203 '-o', os.path.join(tmpdir, 'out.vcf'),
204 '--dbsnp', os.path.join(bundle_dir, 'dbsnp_137.b37.vcf'),
205 '-metrics', 'UniGenMetrics',
206 '-A', 'DepthOfCoverage',
207 '-A', 'AlleleBalance',
209 '-A', 'HaplotypeScore',
210 '-A', 'MappingQualityRankSumTest',
211 '-A', 'ReadPosRankSumTest',
212 '-A', 'FisherStrand',
216 + arvados.getjobparam('GATK2_UnifiedGenotyper_args',[]))
218 # Copy the output VCF file to Keep
220 out = arvados.CollectionWriter()
221 out.start_new_stream()
222 out.start_new_file('out.vcf')
223 out.write(open(os.path.join(tmpdir, 'out.vcf'), 'rb'))
226 # Write statistics to Keep
228 out.start_new_file('mincoverage_nlocus.csv')
230 hist = stats_out_thr.histogram()
231 total_bp = stats_log_thr.total_bp()
232 for i in range(len(hist)):
233 out.write("%d,%d,%f\n" %
236 100.0 * (total_bp - sofar) / total_bp))
239 if waitpid_and_check_children(children):
240 this_task.set_output(out.finish())