11158: Support incremental update of project contents based on websocket events.
[arvados.git] / crunch_scripts / bwa-aln
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 import arvados
7 import arvados_bwa
8 import arvados_samtools
9 import os
10 import re
11 import sys
12 import subprocess
13
14 arvados_bwa.one_task_per_pair_input_file(if_sequence=0, and_end_task=True)
15
16 this_job = arvados.current_job()
17 this_task = arvados.current_task()
18 ref_dir = arvados.util.collection_extract(
19     collection = this_job['script_parameters']['reference_index'],
20     path = 'reference',
21     decompress = False)
22
23 ref_basename = None
24 for f in os.listdir(ref_dir):
25     basename = re.sub(r'\.bwt$', '', f)
26     if basename != f:
27         ref_basename = os.path.join(ref_dir, basename)
28 if ref_basename == None:
29     raise Exception("Could not find *.bwt in reference collection.")
30
31 tmp_dir = arvados.current_task().tmpdir
32
33 class Aligner:
34     def input_filename(self):
35         for s in arvados.CollectionReader(self.collection).all_streams():
36             for f in s.all_files():
37                 return f.decompressed_name()
38     def generate_input(self):
39         for s in arvados.CollectionReader(self.collection).all_streams():
40             for f in s.all_files():
41                 for s in f.readall_decompressed():
42                     yield s
43     def aln(self, input_param):
44         self.collection = this_task['parameters'][input_param]
45         reads_filename = os.path.join(tmp_dir, self.input_filename())
46         aln_filename = os.path.join(tmp_dir, self.input_filename() + '.sai')
47         reads_pipe_r, reads_pipe_w = os.pipe()
48         if os.fork() == 0:
49             os.close(reads_pipe_r)
50             reads_file = open(reads_filename, 'wb')
51             for s in self.generate_input():
52                 if len(s) != os.write(reads_pipe_w, s):
53                     raise Exception("short write")
54                 reads_file.write(s)
55             reads_file.close()
56             os.close(reads_pipe_w)
57             sys.exit(0)
58         os.close(reads_pipe_w)
59
60         aln_file = open(aln_filename, 'wb')
61         bwa_proc = subprocess.Popen(
62             [arvados_bwa.bwa_binary(),
63              'aln', '-t', '16',
64              ref_basename,
65              '-'],
66             stdin=os.fdopen(reads_pipe_r, 'rb', 2**20),
67             stdout=aln_file)
68         aln_file.close()
69         return reads_filename, aln_filename
70
71 reads_1, alignments_1 = Aligner().aln('input_1')
72 reads_2, alignments_2 = Aligner().aln('input_2')
73 pid1, exit1 = os.wait()
74 pid2, exit2 = os.wait()
75 if exit1 != 0 or exit2 != 0:
76     raise Exception("bwa aln exited non-zero (0x%x, 0x%x)" % (exit1, exit2))
77
78 # output alignments in sam format to pipe
79 sam_pipe_r, sam_pipe_w = os.pipe()
80 sam_pid = os.fork()
81 if sam_pid != 0:
82     # parent
83     os.close(sam_pipe_w)
84 else:
85     # child
86     os.close(sam_pipe_r)
87     arvados_bwa.run('sampe',
88                     [ref_basename,
89                      alignments_1, alignments_2,
90                      reads_1, reads_2],
91                     stdout=os.fdopen(sam_pipe_w, 'wb', 2**20))
92     sys.exit(0)
93
94 # convert sam (sam_pipe_r) to bam (bam_pipe_w)
95 bam_pipe_r, bam_pipe_w = os.pipe()
96 bam_pid = os.fork()
97 if bam_pid != 0:
98     # parent
99     os.close(bam_pipe_w)
100     os.close(sam_pipe_r)
101 else:
102     # child
103     os.close(bam_pipe_r)
104     arvados_samtools.run('view',
105                          ['-S', '-b',
106                           '-'],
107                          stdin=os.fdopen(sam_pipe_r, 'rb', 2**20),
108                          stdout=os.fdopen(bam_pipe_w, 'wb', 2**20))
109     sys.exit(0)
110
111 # copy bam (bam_pipe_r) to Keep
112 out_bam_filename = os.path.split(reads_1)[-1] + '.bam'
113 out = arvados.CollectionWriter()
114 out.start_new_stream()
115 out.start_new_file(out_bam_filename)
116 out.write(os.fdopen(bam_pipe_r, 'rb', 2**20))
117
118 # make sure everyone exited nicely
119 pid3, exit3 = os.waitpid(sam_pid, 0)
120 if exit3 != 0:
121     raise Exception("bwa sampe exited non-zero (0x%x)" % exit3)
122 pid4, exit4 = os.waitpid(bam_pid, 0)
123 if exit4 != 0:
124     raise Exception("samtools view exited non-zero (0x%x)" % exit4)
125
126 # proclaim success
127 this_task.set_output(out.finish())