2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
8 import arvados_samtools
14 arvados_bwa.one_task_per_pair_input_file(if_sequence=0, and_end_task=True)
16 this_job = arvados.current_job()
17 this_task = arvados.current_task()
18 ref_dir = arvados.util.collection_extract(
19 collection = this_job['script_parameters']['reference_index'],
24 for f in os.listdir(ref_dir):
25 basename = re.sub(r'\.bwt$', '', f)
27 ref_basename = os.path.join(ref_dir, basename)
28 if ref_basename == None:
29 raise Exception("Could not find *.bwt in reference collection.")
31 tmp_dir = arvados.current_task().tmpdir
34 def input_filename(self):
35 for s in arvados.CollectionReader(self.collection).all_streams():
36 for f in s.all_files():
37 return f.decompressed_name()
38 def generate_input(self):
39 for s in arvados.CollectionReader(self.collection).all_streams():
40 for f in s.all_files():
41 for s in f.readall_decompressed():
43 def aln(self, input_param):
44 self.collection = this_task['parameters'][input_param]
45 reads_filename = os.path.join(tmp_dir, self.input_filename())
46 aln_filename = os.path.join(tmp_dir, self.input_filename() + '.sai')
47 reads_pipe_r, reads_pipe_w = os.pipe()
49 os.close(reads_pipe_r)
50 reads_file = open(reads_filename, 'wb')
51 for s in self.generate_input():
52 if len(s) != os.write(reads_pipe_w, s):
53 raise Exception("short write")
56 os.close(reads_pipe_w)
58 os.close(reads_pipe_w)
60 aln_file = open(aln_filename, 'wb')
61 bwa_proc = subprocess.Popen(
62 [arvados_bwa.bwa_binary(),
66 stdin=os.fdopen(reads_pipe_r, 'rb', 2**20),
69 return reads_filename, aln_filename
71 reads_1, alignments_1 = Aligner().aln('input_1')
72 reads_2, alignments_2 = Aligner().aln('input_2')
73 pid1, exit1 = os.wait()
74 pid2, exit2 = os.wait()
75 if exit1 != 0 or exit2 != 0:
76 raise Exception("bwa aln exited non-zero (0x%x, 0x%x)" % (exit1, exit2))
78 # output alignments in sam format to pipe
79 sam_pipe_r, sam_pipe_w = os.pipe()
87 arvados_bwa.run('sampe',
89 alignments_1, alignments_2,
91 stdout=os.fdopen(sam_pipe_w, 'wb', 2**20))
94 # convert sam (sam_pipe_r) to bam (bam_pipe_w)
95 bam_pipe_r, bam_pipe_w = os.pipe()
104 arvados_samtools.run('view',
107 stdin=os.fdopen(sam_pipe_r, 'rb', 2**20),
108 stdout=os.fdopen(bam_pipe_w, 'wb', 2**20))
111 # copy bam (bam_pipe_r) to Keep
112 out_bam_filename = os.path.split(reads_1)[-1] + '.bam'
113 out = arvados.CollectionWriter()
114 out.start_new_stream()
115 out.start_new_file(out_bam_filename)
116 out.write(os.fdopen(bam_pipe_r, 'rb', 2**20))
118 # make sure everyone exited nicely
119 pid3, exit3 = os.waitpid(sam_pid, 0)
121 raise Exception("bwa sampe exited non-zero (0x%x)" % exit3)
122 pid4, exit4 = os.waitpid(bam_pid, 0)
124 raise Exception("samtools view exited non-zero (0x%x)" % exit4)
127 this_task.set_output(out.finish())