10211: Clarify expected result of double-close.
[arvados.git] / crunch_scripts / GATK2-realign
1 #!/usr/bin/env python
2
3 import os
4 import re
5 import arvados
6 import arvados_gatk2
7 import arvados_picard
8 import arvados_samtools
9 from arvados_ipc import *
10
11 class InvalidArgumentError(Exception):
12     pass
13
14 arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
15
16 this_job = arvados.current_job()
17 this_task = arvados.current_task()
18 tmpdir = arvados.current_task().tmpdir
19 arvados.util.clear_tmpdir()
20
21 known_sites_files = arvados.getjobparam(
22     'known_sites',
23     ['dbsnp_137.b37.vcf',
24      'Mills_and_1000G_gold_standard.indels.b37.vcf',
25      ])
26 bundle_dir = arvados.util.collection_extract(
27     collection = this_job['script_parameters']['gatk_bundle'],
28     files = [
29         'human_g1k_v37.dict',
30         'human_g1k_v37.fasta',
31         'human_g1k_v37.fasta.fai'
32         ] + known_sites_files + [v + '.idx' for v in known_sites_files],
33     path = 'gatk_bundle')
34 ref_fasta_files = [os.path.join(bundle_dir, f)
35                    for f in os.listdir(bundle_dir)
36                    if re.search(r'\.fasta(\.gz)?$', f)]
37 regions_args = []
38 if 'regions' in this_job['script_parameters']:
39     regions_dir = arvados.util.collection_extract(
40         collection = this_job['script_parameters']['regions'],
41         path = 'regions')
42     region_padding = int(this_job['script_parameters']['region_padding'])
43     for f in os.listdir(regions_dir):
44         if re.search(r'\.bed$', f):
45             regions_args += [
46                 '--intervals', os.path.join(regions_dir, f),
47                 '--interval_padding', str(region_padding)
48                 ]
49
50 input_collection = this_task['parameters']['input']
51 input_dir = arvados.util.collection_extract(
52     collection = input_collection,
53     path = os.path.join(this_task.tmpdir, 'input'))
54 input_bam_files = []
55 for f in arvados.util.listdir_recursive(input_dir):
56     if re.search(r'\.bam$', f):
57         input_stream_name, input_file_name = os.path.split(f)
58         input_bam_files += [os.path.join(input_dir, f)]
59 if len(input_bam_files) != 1:
60     raise InvalidArgumentError("Expected exactly one bam file per task.")
61
62 known_sites_args = []
63 for f in known_sites_files:
64     known_sites_args += ['-known', os.path.join(bundle_dir, f)]
65
66 children = {}
67 pipes = {}
68
69 arvados_gatk2.run(
70     args=[
71         '-nt', arvados_gatk2.cpus_per_task(),
72         '-T', 'RealignerTargetCreator',
73         '-R', ref_fasta_files[0],
74         '-I', input_bam_files[0],
75         '-o', os.path.join(tmpdir, 'intervals.list')
76         ] + known_sites_args + regions_args)
77
78 pipe_setup(pipes, 'IndelRealigner')
79 if 0 == named_fork(children, 'IndelRealigner'):
80     pipe_closeallbut(pipes, ('IndelRealigner', 'w'))
81     arvados_gatk2.run(
82         args=[
83         '-T', 'IndelRealigner',
84         '-R', ref_fasta_files[0],
85         '-targetIntervals', os.path.join(tmpdir, 'intervals.list'),
86         '-I', input_bam_files[0],
87         '-o', '/dev/fd/' + str(pipes['IndelRealigner','w']),
88         '--disable_bam_indexing',
89         ] + known_sites_args + regions_args,
90         close_fds=False)
91     os._exit(0)
92 os.close(pipes.pop(('IndelRealigner','w'), None))
93
94 pipe_setup(pipes, 'bammanifest')
95 pipe_setup(pipes, 'bam')
96 if 0==named_fork(children, 'bammanifest'):
97     pipe_closeallbut(pipes,
98                      ('IndelRealigner', 'r'),
99                      ('bammanifest', 'w'),
100                      ('bam', 'w'))
101     out = arvados.CollectionWriter()
102     out.start_new_stream(input_stream_name)
103     out.start_new_file(input_file_name)
104     while True:
105         buf = os.read(pipes['IndelRealigner','r'], 2**20)
106         if len(buf) == 0:
107             break
108         os.write(pipes['bam','w'], buf)
109         out.write(buf)
110     os.write(pipes['bammanifest','w'], out.manifest_text())
111     os.close(pipes['bammanifest','w'])
112     os._exit(0)
113
114 pipe_setup(pipes, 'index')
115 if 0==named_fork(children, 'index'):
116     pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
117     arvados_picard.run(
118         'BuildBamIndex',
119         params={
120             'i': '/dev/fd/' + str(pipes['bam','r']),
121             'o': '/dev/fd/' + str(pipes['index','w']),
122             'quiet': 'true',
123             'validation_stringency': 'LENIENT'
124             },
125         close_fds=False)
126     os._exit(0)
127
128 pipe_setup(pipes, 'indexmanifest')
129 if 0==named_fork(children, 'indexmanifest'):
130     pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
131     out = arvados.CollectionWriter()
132     out.start_new_stream(input_stream_name)
133     out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
134     while True:
135         buf = os.read(pipes['index','r'], 2**20)
136         if len(buf) == 0:
137             break
138         out.write(buf)
139     os.write(pipes['indexmanifest','w'], out.manifest_text())
140     os.close(pipes['indexmanifest','w'])
141     os._exit(0)
142
143 pipe_closeallbut(pipes, ('bammanifest', 'r'), ('indexmanifest', 'r'))
144 outmanifest = ''
145 for which in ['bammanifest', 'indexmanifest']:
146     with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
147         while True:
148             buf = f.read()
149             if buf == '':
150                 break
151             outmanifest += buf
152
153 all_ok = True
154 for (childname, pid) in children.items():
155     all_ok = all_ok and waitpid_and_check_exit(pid, childname)
156
157 if all_ok:
158     this_task.set_output(outmanifest)
159 else:
160     sys.exit(1)