10 from arvados_ipc import *
12 class InvalidArgumentError(Exception):
15 this_job = arvados.current_job()
16 this_task = arvados.current_task()
17 tmpdir = arvados.current_task().tmpdir
18 arvados.util.clear_tmpdir()
20 bundle_dir = arvados.util.collection_extract(
21 collection = this_job['script_parameters']['gatk_bundle'],
24 'human_g1k_v37.fasta',
25 'human_g1k_v37.fasta.fai',
27 'dbsnp_137.b37.vcf.idx',
30 ref_fasta_files = [os.path.join(bundle_dir, f)
31 for f in os.listdir(bundle_dir)
32 if re.search(r'\.fasta(\.gz)?$', f)]
34 if 'regions' in this_job['script_parameters']:
35 regions_dir = arvados.util.collection_extract(
36 collection = this_job['script_parameters']['regions'],
38 region_padding = int(this_job['script_parameters']['region_padding'])
39 for f in os.listdir(regions_dir):
40 if re.search(r'\.bed$', f):
42 '--intervals', os.path.join(regions_dir, f),
43 '--interval_padding', str(region_padding)
47 # Start a child process for each input file, feeding data to picard.
49 input_child_names = []
53 input_collection = this_job['script_parameters']['input']
55 for s in arvados.CollectionReader(input_collection).all_streams():
56 for f in s.all_files():
57 if not re.search(r'\.bam$', f.name()):
60 childname = 'input-' + str(input_index)
61 input_child_names += [childname]
62 pipe_setup(pipes, childname)
63 childpid = named_fork(children, childname)
65 pipe_closeallbut(pipes, (childname, 'w'))
67 os.write(pipes[childname, 'w'], s)
68 os.close(pipes[childname, 'w'])
70 sys.stderr.write("pid %d writing %s to fd %d->%d\n" %
72 s.name()+'/'+f.name(),
73 pipes[childname, 'w'],
74 pipes[childname, 'r']))
75 pipe_closeallbut(pipes, *[(childname, 'r')
76 for childname in input_child_names])
79 # Merge-sort the input files to merge.bam
84 'I=/dev/fd/' + str(pipes[childname, 'r'])
85 for childname in input_child_names
91 'use_threading': 'true',
92 'create_index': 'true',
93 'validation_stringency': 'LENIENT',
97 pipe_closeallbut(pipes)
100 # Run CoverageBySample on merge.bam
102 pipe_setup(pipes, 'stats_log')
103 pipe_setup(pipes, 'stats_out')
104 if 0 == named_fork(children, 'GATK'):
105 pipe_closeallbut(pipes,
110 '-T', 'CoverageBySample',
111 '-R', ref_fasta_files[0],
113 '-o', '/dev/fd/' + str(pipes['stats_out', 'w']),
114 '--log_to_file', '/dev/fd/' + str(pipes['stats_log', 'w']),
118 pipe_closeallbut(pipes)
120 pipe_closeallbut(pipes, ('stats_log', 'r'), ('stats_out', 'r'))
123 # Start two threads to read from CoverageBySample pipes
125 class ExceptionPropagatingThread(threading.Thread):
127 If a subclassed thread calls _raise(e) in run(), running join() on
128 the thread will raise e in the thread that calls join().
130 def __init__(self, *args, **kwargs):
131 super(ExceptionPropagatingThread, self).__init__(*args, **kwargs)
132 self.__exception = None
133 def join(self, *args, **kwargs):
134 ret = super(ExceptionPropagatingThread, self).join(*args, **kwargs)
136 raise self.__exception
138 def _raise(self, exception):
139 self.__exception = exception
141 class StatsLogReader(ExceptionPropagatingThread):
142 def __init__(self, **kwargs):
143 super(StatsLogReader, self).__init__()
147 for logline in self.args['infile']:
148 x = re.search('Processing (\d+) bp from intervals', logline)
150 self._total_bp = int(x.group(1))
151 except Exception as e:
155 return self._total_bp
156 stats_log_thr = StatsLogReader(infile=os.fdopen(pipes.pop(('stats_log', 'r'))))
157 stats_log_thr.start()
159 class StatsOutReader(ExceptionPropagatingThread):
161 Read output of CoverageBySample and collect a histogram of
162 coverage (last column) -> number of loci (number of rows).
164 def __init__(self, **kwargs):
165 super(StatsOutReader, self).__init__()
171 for line in self.args['infile']:
173 i = int(string.split(line)[-1])
178 hist.extend([0 for x in range(1+i-len(hist))])
181 hist[0] = stats_log_thr.total_bp() - histtot
182 self._histogram = hist
183 except Exception as e:
187 return self._histogram
188 stats_out_thr = StatsOutReader(infile=os.fdopen(pipes.pop(('stats_out', 'r'))))
189 stats_out_thr.start()
192 # Run UnifiedGenotyper on merge.bam
196 '-nt', arvados_gatk2.cpus_on_this_node(),
197 '-T', 'UnifiedGenotyper',
198 '-R', ref_fasta_files[0],
200 '-o', os.path.join(tmpdir, 'out.vcf'),
201 '--dbsnp', os.path.join(bundle_dir, 'dbsnp_137.b37.vcf'),
202 '-metrics', 'UniGenMetrics',
203 '-A', 'DepthOfCoverage',
204 '-A', 'AlleleBalance',
206 '-A', 'HaplotypeScore',
207 '-A', 'MappingQualityRankSumTest',
208 '-A', 'ReadPosRankSumTest',
209 '-A', 'FisherStrand',
213 + arvados.getjobparam('GATK2_UnifiedGenotyper_args',[]))
215 # Copy the output VCF file to Keep
217 out = arvados.CollectionWriter()
218 out.start_new_stream()
219 out.start_new_file('out.vcf')
220 out.write(open(os.path.join(tmpdir, 'out.vcf'), 'rb'))
223 # Write statistics to Keep
225 out.start_new_file('mincoverage_nlocus.csv')
227 hist = stats_out_thr.histogram()
228 total_bp = stats_log_thr.total_bp()
229 for i in range(len(hist)):
230 out.write("%d,%d,%f\n" %
233 100.0 * (total_bp - sofar) / total_bp))
236 if waitpid_and_check_children(children):
237 this_task.set_output(out.finish())