15332: Updated federation demo to do something a little more interesting.
[arvados.git] / crunch_scripts / GATK2-bqsr
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 import os
7 import re
8 import arvados
9 import arvados_gatk2
10 import arvados_samtools
11 from arvados_ipc import *
12
13 class InvalidArgumentError(Exception):
14     pass
15
16 arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
17
18 this_job = arvados.current_job()
19 this_task = arvados.current_task()
20 tmpdir = arvados.current_task().tmpdir
21 arvados.util.clear_tmpdir()
22
23 known_sites_files = arvados.getjobparam(
24     'known_sites',
25     ['dbsnp_137.b37.vcf',
26      'Mills_and_1000G_gold_standard.indels.b37.vcf',
27      ])
28 bundle_dir = arvados.util.collection_extract(
29     collection = this_job['script_parameters']['gatk_bundle'],
30     files = [
31         'human_g1k_v37.dict',
32         'human_g1k_v37.fasta',
33         'human_g1k_v37.fasta.fai'
34         ] + known_sites_files + [v + '.idx' for v in known_sites_files],
35     path = 'gatk_bundle')
36 ref_fasta_files = [os.path.join(bundle_dir, f)
37                    for f in os.listdir(bundle_dir)
38                    if re.search(r'\.fasta(\.gz)?$', f)]
39
40 input_collection = this_task['parameters']['input']
41 input_dir = arvados.util.collection_extract(
42     collection = input_collection,
43     path = os.path.join(this_task.tmpdir, 'input'))
44 input_bam_files = []
45 for f in arvados.util.listdir_recursive(input_dir):
46     if re.search(r'\.bam$', f):
47         input_stream_name, input_file_name = os.path.split(f)
48         input_bam_files += [os.path.join(input_dir, f)]
49 if len(input_bam_files) != 1:
50     raise InvalidArgumentError("Expected exactly one bam file per task.")
51
52 known_sites_args = []
53 for f in known_sites_files:
54     known_sites_args += ['-knownSites', os.path.join(bundle_dir, f)]
55
56 recal_file = os.path.join(tmpdir, 'recal.csv')
57
58 children = {}
59 pipes = {}
60
61 arvados_gatk2.run(
62     args=[
63         '-nct', arvados_gatk2.cpus_on_this_node(),
64         '-T', 'BaseRecalibrator',
65         '-R', ref_fasta_files[0],
66         '-I', input_bam_files[0],
67         '-o', recal_file,
68         ] + known_sites_args)
69
70 pipe_setup(pipes, 'BQSR')
71 if 0 == named_fork(children, 'BQSR'):
72     pipe_closeallbut(pipes, ('BQSR', 'w'))
73     arvados_gatk2.run(
74         args=[
75         '-T', 'PrintReads',
76         '-R', ref_fasta_files[0],
77         '-I', input_bam_files[0],
78         '-o', '/dev/fd/' + str(pipes['BQSR','w']),
79         '-BQSR', recal_file,
80         '--disable_bam_indexing',
81         ],
82         close_fds=False)
83     os._exit(0)
84 os.close(pipes.pop(('BQSR','w'), None))
85
86 out = arvados.CollectionWriter()
87 out.start_new_stream(input_stream_name)
88
89 out.start_new_file(input_file_name + '.recal.csv')
90 out.write(open(recal_file, 'rb'))
91
92 out.start_new_file(input_file_name)
93 while True:
94     buf = os.read(pipes['BQSR','r'], 2**20)
95     if len(buf) == 0:
96         break
97     out.write(buf)
98 pipe_closeallbut(pipes)
99
100 if waitpid_and_check_children(children):
101     this_task.set_output(out.finish())
102 else:
103     sys.exit(1)