Merge branch '14087-federated-collection-by-pdh' refs #14087
[arvados.git] / crunch_scripts / GATK2-realign
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 import os
7 import re
8 import arvados
9 import arvados_gatk2
10 import arvados_picard
11 import arvados_samtools
12 from arvados_ipc import *
13
14 class InvalidArgumentError(Exception):
15     pass
16
17 arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
18
19 this_job = arvados.current_job()
20 this_task = arvados.current_task()
21 tmpdir = arvados.current_task().tmpdir
22 arvados.util.clear_tmpdir()
23
24 known_sites_files = arvados.getjobparam(
25     'known_sites',
26     ['dbsnp_137.b37.vcf',
27      'Mills_and_1000G_gold_standard.indels.b37.vcf',
28      ])
29 bundle_dir = arvados.util.collection_extract(
30     collection = this_job['script_parameters']['gatk_bundle'],
31     files = [
32         'human_g1k_v37.dict',
33         'human_g1k_v37.fasta',
34         'human_g1k_v37.fasta.fai'
35         ] + known_sites_files + [v + '.idx' for v in known_sites_files],
36     path = 'gatk_bundle')
37 ref_fasta_files = [os.path.join(bundle_dir, f)
38                    for f in os.listdir(bundle_dir)
39                    if re.search(r'\.fasta(\.gz)?$', f)]
40 regions_args = []
41 if 'regions' in this_job['script_parameters']:
42     regions_dir = arvados.util.collection_extract(
43         collection = this_job['script_parameters']['regions'],
44         path = 'regions')
45     region_padding = int(this_job['script_parameters']['region_padding'])
46     for f in os.listdir(regions_dir):
47         if re.search(r'\.bed$', f):
48             regions_args += [
49                 '--intervals', os.path.join(regions_dir, f),
50                 '--interval_padding', str(region_padding)
51                 ]
52
53 input_collection = this_task['parameters']['input']
54 input_dir = arvados.util.collection_extract(
55     collection = input_collection,
56     path = os.path.join(this_task.tmpdir, 'input'))
57 input_bam_files = []
58 for f in arvados.util.listdir_recursive(input_dir):
59     if re.search(r'\.bam$', f):
60         input_stream_name, input_file_name = os.path.split(f)
61         input_bam_files += [os.path.join(input_dir, f)]
62 if len(input_bam_files) != 1:
63     raise InvalidArgumentError("Expected exactly one bam file per task.")
64
65 known_sites_args = []
66 for f in known_sites_files:
67     known_sites_args += ['-known', os.path.join(bundle_dir, f)]
68
69 children = {}
70 pipes = {}
71
72 arvados_gatk2.run(
73     args=[
74         '-nt', arvados_gatk2.cpus_per_task(),
75         '-T', 'RealignerTargetCreator',
76         '-R', ref_fasta_files[0],
77         '-I', input_bam_files[0],
78         '-o', os.path.join(tmpdir, 'intervals.list')
79         ] + known_sites_args + regions_args)
80
81 pipe_setup(pipes, 'IndelRealigner')
82 if 0 == named_fork(children, 'IndelRealigner'):
83     pipe_closeallbut(pipes, ('IndelRealigner', 'w'))
84     arvados_gatk2.run(
85         args=[
86         '-T', 'IndelRealigner',
87         '-R', ref_fasta_files[0],
88         '-targetIntervals', os.path.join(tmpdir, 'intervals.list'),
89         '-I', input_bam_files[0],
90         '-o', '/dev/fd/' + str(pipes['IndelRealigner','w']),
91         '--disable_bam_indexing',
92         ] + known_sites_args + regions_args,
93         close_fds=False)
94     os._exit(0)
95 os.close(pipes.pop(('IndelRealigner','w'), None))
96
97 pipe_setup(pipes, 'bammanifest')
98 pipe_setup(pipes, 'bam')
99 if 0==named_fork(children, 'bammanifest'):
100     pipe_closeallbut(pipes,
101                      ('IndelRealigner', 'r'),
102                      ('bammanifest', 'w'),
103                      ('bam', 'w'))
104     out = arvados.CollectionWriter()
105     out.start_new_stream(input_stream_name)
106     out.start_new_file(input_file_name)
107     while True:
108         buf = os.read(pipes['IndelRealigner','r'], 2**20)
109         if len(buf) == 0:
110             break
111         os.write(pipes['bam','w'], buf)
112         out.write(buf)
113     os.write(pipes['bammanifest','w'], out.manifest_text())
114     os.close(pipes['bammanifest','w'])
115     os._exit(0)
116
117 pipe_setup(pipes, 'index')
118 if 0==named_fork(children, 'index'):
119     pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
120     arvados_picard.run(
121         'BuildBamIndex',
122         params={
123             'i': '/dev/fd/' + str(pipes['bam','r']),
124             'o': '/dev/fd/' + str(pipes['index','w']),
125             'quiet': 'true',
126             'validation_stringency': 'LENIENT'
127             },
128         close_fds=False)
129     os._exit(0)
130
131 pipe_setup(pipes, 'indexmanifest')
132 if 0==named_fork(children, 'indexmanifest'):
133     pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
134     out = arvados.CollectionWriter()
135     out.start_new_stream(input_stream_name)
136     out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
137     while True:
138         buf = os.read(pipes['index','r'], 2**20)
139         if len(buf) == 0:
140             break
141         out.write(buf)
142     os.write(pipes['indexmanifest','w'], out.manifest_text())
143     os.close(pipes['indexmanifest','w'])
144     os._exit(0)
145
146 pipe_closeallbut(pipes, ('bammanifest', 'r'), ('indexmanifest', 'r'))
147 outmanifest = ''
148 for which in ['bammanifest', 'indexmanifest']:
149     with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
150         while True:
151             buf = f.read()
152             if buf == '':
153                 break
154             outmanifest += buf
155
156 all_ok = True
157 for (childname, pid) in children.items():
158     all_ok = all_ok and waitpid_and_check_exit(pid, childname)
159
160 if all_ok:
161     this_task.set_output(outmanifest)
162 else:
163     sys.exit(1)