4759: Update Node Manager to parse new Arvados API timestamps.
[arvados.git] / crunch_scripts / GATK2-bqsr
1 #!/usr/bin/env python
2
3 import os
4 import re
5 import arvados
6 import arvados_gatk2
7 import arvados_samtools
8 from arvados_ipc import *
9
10 class InvalidArgumentError(Exception):
11     pass
12
13 arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
14
15 this_job = arvados.current_job()
16 this_task = arvados.current_task()
17 tmpdir = arvados.current_task().tmpdir
18 arvados.util.clear_tmpdir()
19
20 known_sites_files = arvados.getjobparam(
21     'known_sites',
22     ['dbsnp_137.b37.vcf',
23      'Mills_and_1000G_gold_standard.indels.b37.vcf',
24      ])
25 bundle_dir = arvados.util.collection_extract(
26     collection = this_job['script_parameters']['gatk_bundle'],
27     files = [
28         'human_g1k_v37.dict',
29         'human_g1k_v37.fasta',
30         'human_g1k_v37.fasta.fai'
31         ] + known_sites_files + [v + '.idx' for v in known_sites_files],
32     path = 'gatk_bundle')
33 ref_fasta_files = [os.path.join(bundle_dir, f)
34                    for f in os.listdir(bundle_dir)
35                    if re.search(r'\.fasta(\.gz)?$', f)]
36
37 input_collection = this_task['parameters']['input']
38 input_dir = arvados.util.collection_extract(
39     collection = input_collection,
40     path = os.path.join(this_task.tmpdir, 'input'))
41 input_bam_files = []
42 for f in arvados.util.listdir_recursive(input_dir):
43     if re.search(r'\.bam$', f):
44         input_stream_name, input_file_name = os.path.split(f)
45         input_bam_files += [os.path.join(input_dir, f)]
46 if len(input_bam_files) != 1:
47     raise InvalidArgumentError("Expected exactly one bam file per task.")
48
49 known_sites_args = []
50 for f in known_sites_files:
51     known_sites_args += ['-knownSites', os.path.join(bundle_dir, f)]
52
53 recal_file = os.path.join(tmpdir, 'recal.csv')
54
55 children = {}
56 pipes = {}
57
58 arvados_gatk2.run(
59     args=[
60         '-nct', arvados_gatk2.cpus_on_this_node(),
61         '-T', 'BaseRecalibrator',
62         '-R', ref_fasta_files[0],
63         '-I', input_bam_files[0],
64         '-o', recal_file,
65         ] + known_sites_args)
66
67 pipe_setup(pipes, 'BQSR')
68 if 0 == named_fork(children, 'BQSR'):
69     pipe_closeallbut(pipes, ('BQSR', 'w'))
70     arvados_gatk2.run(
71         args=[
72         '-T', 'PrintReads',
73         '-R', ref_fasta_files[0],
74         '-I', input_bam_files[0],
75         '-o', '/dev/fd/' + str(pipes['BQSR','w']),
76         '-BQSR', recal_file,
77         '--disable_bam_indexing',
78         ],
79         close_fds=False)
80     os._exit(0)
81 os.close(pipes.pop(('BQSR','w'), None))
82
83 out = arvados.CollectionWriter()
84 out.start_new_stream(input_stream_name)
85
86 out.start_new_file(input_file_name + '.recal.csv')
87 out.write(open(recal_file, 'rb'))
88
89 out.start_new_file(input_file_name)
90 while True:
91     buf = os.read(pipes['BQSR','r'], 2**20)
92     if len(buf) == 0:
93         break
94     out.write(buf)
95 pipe_closeallbut(pipes)
96
97 if waitpid_and_check_children(children):
98     this_task.set_output(out.finish())
99 else:
100     sys.exit(1)