5 import arvados_samtools
11 arvados_bwa.one_task_per_pair_input_file(if_sequence=0, and_end_task=True)
13 this_job = arvados.current_job()
14 this_task = arvados.current_task()
15 ref_dir = arvados.util.collection_extract(
16 collection = this_job['script_parameters']['reference_index'],
21 for f in os.listdir(ref_dir):
22 basename = re.sub(r'\.bwt$', '', f)
24 ref_basename = os.path.join(ref_dir, basename)
25 if ref_basename == None:
26 raise Exception("Could not find *.bwt in reference collection.")
28 tmp_dir = arvados.current_task().tmpdir
31 def input_filename(self):
32 for s in arvados.CollectionReader(self.collection).all_streams():
33 for f in s.all_files():
34 return f.decompressed_name()
35 def generate_input(self):
36 for s in arvados.CollectionReader(self.collection).all_streams():
37 for f in s.all_files():
38 for s in f.readall_decompressed():
40 def aln(self, input_param):
41 self.collection = this_task['parameters'][input_param]
42 reads_filename = os.path.join(tmp_dir, self.input_filename())
43 aln_filename = os.path.join(tmp_dir, self.input_filename() + '.sai')
44 reads_pipe_r, reads_pipe_w = os.pipe()
46 os.close(reads_pipe_r)
47 reads_file = open(reads_filename, 'wb')
48 for s in self.generate_input():
49 if len(s) != os.write(reads_pipe_w, s):
50 raise Exception("short write")
53 os.close(reads_pipe_w)
55 os.close(reads_pipe_w)
57 aln_file = open(aln_filename, 'wb')
58 bwa_proc = subprocess.Popen(
59 [arvados_bwa.bwa_binary(),
63 stdin=os.fdopen(reads_pipe_r, 'rb', 2**20),
66 return reads_filename, aln_filename
68 reads_1, alignments_1 = Aligner().aln('input_1')
69 reads_2, alignments_2 = Aligner().aln('input_2')
70 pid1, exit1 = os.wait()
71 pid2, exit2 = os.wait()
72 if exit1 != 0 or exit2 != 0:
73 raise Exception("bwa aln exited non-zero (0x%x, 0x%x)" % (exit1, exit2))
75 # output alignments in sam format to pipe
76 sam_pipe_r, sam_pipe_w = os.pipe()
84 arvados_bwa.run('sampe',
86 alignments_1, alignments_2,
88 stdout=os.fdopen(sam_pipe_w, 'wb', 2**20))
91 # convert sam (sam_pipe_r) to bam (bam_pipe_w)
92 bam_pipe_r, bam_pipe_w = os.pipe()
101 arvados_samtools.run('view',
104 stdin=os.fdopen(sam_pipe_r, 'rb', 2**20),
105 stdout=os.fdopen(bam_pipe_w, 'wb', 2**20))
108 # copy bam (bam_pipe_r) to Keep
109 out_bam_filename = os.path.split(reads_1)[-1] + '.bam'
110 out = arvados.CollectionWriter()
111 out.start_new_stream()
112 out.start_new_file(out_bam_filename)
113 out.write(os.fdopen(bam_pipe_r, 'rb', 2**20))
115 # make sure everyone exited nicely
116 pid3, exit3 = os.waitpid(sam_pid, 0)
118 raise Exception("bwa sampe exited non-zero (0x%x)" % exit3)
119 pid4, exit4 = os.waitpid(bam_pid, 0)
121 raise Exception("samtools view exited non-zero (0x%x)" % exit4)
124 this_task.set_output(out.finish())