10172: crunch_script only updates task record on error, will normally be
[arvados.git] / crunch_scripts / bwa-aln
1 #!/usr/bin/env python
2
3 import arvados
4 import arvados_bwa
5 import arvados_samtools
6 import os
7 import re
8 import sys
9 import subprocess
10
11 arvados_bwa.one_task_per_pair_input_file(if_sequence=0, and_end_task=True)
12
13 this_job = arvados.current_job()
14 this_task = arvados.current_task()
15 ref_dir = arvados.util.collection_extract(
16     collection = this_job['script_parameters']['reference_index'],
17     path = 'reference',
18     decompress = False)
19
20 ref_basename = None
21 for f in os.listdir(ref_dir):
22     basename = re.sub(r'\.bwt$', '', f)
23     if basename != f:
24         ref_basename = os.path.join(ref_dir, basename)
25 if ref_basename == None:
26     raise Exception("Could not find *.bwt in reference collection.")
27
28 tmp_dir = arvados.current_task().tmpdir
29
30 class Aligner:
31     def input_filename(self):
32         for s in arvados.CollectionReader(self.collection).all_streams():
33             for f in s.all_files():
34                 return f.decompressed_name()
35     def generate_input(self):
36         for s in arvados.CollectionReader(self.collection).all_streams():
37             for f in s.all_files():
38                 for s in f.readall_decompressed():
39                     yield s
40     def aln(self, input_param):
41         self.collection = this_task['parameters'][input_param]
42         reads_filename = os.path.join(tmp_dir, self.input_filename())
43         aln_filename = os.path.join(tmp_dir, self.input_filename() + '.sai')
44         reads_pipe_r, reads_pipe_w = os.pipe()
45         if os.fork() == 0:
46             os.close(reads_pipe_r)
47             reads_file = open(reads_filename, 'wb')
48             for s in self.generate_input():
49                 if len(s) != os.write(reads_pipe_w, s):
50                     raise Exception("short write")
51                 reads_file.write(s)
52             reads_file.close()
53             os.close(reads_pipe_w)
54             sys.exit(0)
55         os.close(reads_pipe_w)
56
57         aln_file = open(aln_filename, 'wb')
58         bwa_proc = subprocess.Popen(
59             [arvados_bwa.bwa_binary(),
60              'aln', '-t', '16',
61              ref_basename,
62              '-'],
63             stdin=os.fdopen(reads_pipe_r, 'rb', 2**20),
64             stdout=aln_file)
65         aln_file.close()
66         return reads_filename, aln_filename
67
68 reads_1, alignments_1 = Aligner().aln('input_1')
69 reads_2, alignments_2 = Aligner().aln('input_2')
70 pid1, exit1 = os.wait()
71 pid2, exit2 = os.wait()
72 if exit1 != 0 or exit2 != 0:
73     raise Exception("bwa aln exited non-zero (0x%x, 0x%x)" % (exit1, exit2))
74
75 # output alignments in sam format to pipe
76 sam_pipe_r, sam_pipe_w = os.pipe()
77 sam_pid = os.fork()
78 if sam_pid != 0:
79     # parent
80     os.close(sam_pipe_w)
81 else:
82     # child
83     os.close(sam_pipe_r)
84     arvados_bwa.run('sampe',
85                     [ref_basename,
86                      alignments_1, alignments_2,
87                      reads_1, reads_2],
88                     stdout=os.fdopen(sam_pipe_w, 'wb', 2**20))
89     sys.exit(0)
90
91 # convert sam (sam_pipe_r) to bam (bam_pipe_w)
92 bam_pipe_r, bam_pipe_w = os.pipe()
93 bam_pid = os.fork()
94 if bam_pid != 0:
95     # parent
96     os.close(bam_pipe_w)
97     os.close(sam_pipe_r)
98 else:
99     # child
100     os.close(bam_pipe_r)
101     arvados_samtools.run('view',
102                          ['-S', '-b',
103                           '-'],
104                          stdin=os.fdopen(sam_pipe_r, 'rb', 2**20),
105                          stdout=os.fdopen(bam_pipe_w, 'wb', 2**20))
106     sys.exit(0)
107
108 # copy bam (bam_pipe_r) to Keep
109 out_bam_filename = os.path.split(reads_1)[-1] + '.bam'
110 out = arvados.CollectionWriter()
111 out.start_new_stream()
112 out.start_new_file(out_bam_filename)
113 out.write(os.fdopen(bam_pipe_r, 'rb', 2**20))
114
115 # make sure everyone exited nicely
116 pid3, exit3 = os.waitpid(sam_pid, 0)
117 if exit3 != 0:
118     raise Exception("bwa sampe exited non-zero (0x%x)" % exit3)
119 pid4, exit4 = os.waitpid(bam_pid, 0)
120 if exit4 != 0:
121     raise Exception("samtools view exited non-zero (0x%x)" % exit4)
122
123 # proclaim success
124 this_task.set_output(out.finish())