Merge branch '1637-improve-arv-tutorial'
[arvados.git] / crunch_scripts / GATK2-bqsr
1 #!/usr/bin/env python
2
3 import os
4 import re
5 import arvados
6 import arvados_gatk2
7 import arvados_picard
8 import arvados_samtools
9 from arvados_ipc import *
10
11 class InvalidArgumentError(Exception):
12     pass
13
14 arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
15
16 this_job = arvados.current_job()
17 this_task = arvados.current_task()
18 tmpdir = arvados.current_task().tmpdir
19 arvados.util.clear_tmpdir()
20
21 known_sites_files = arvados.getjobparam(
22     'known_sites',
23     ['dbsnp_137.b37.vcf',
24      'Mills_and_1000G_gold_standard.indels.b37.vcf',
25      ])
26 bundle_dir = arvados.util.collection_extract(
27     collection = this_job['script_parameters']['gatk_bundle'],
28     files = [
29         'human_g1k_v37.dict',
30         'human_g1k_v37.fasta',
31         'human_g1k_v37.fasta.fai'
32         ] + known_sites_files + [v + '.idx' for v in known_sites_files],
33     path = 'gatk_bundle')
34 ref_fasta_files = [os.path.join(bundle_dir, f)
35                    for f in os.listdir(bundle_dir)
36                    if re.search(r'\.fasta(\.gz)?$', f)]
37
38 input_collection = this_task['parameters']['input']
39 input_dir = arvados.util.collection_extract(
40     collection = input_collection,
41     path = os.path.join(this_task.tmpdir, 'input'))
42 input_bam_files = []
43 for f in arvados.util.listdir_recursive(input_dir):
44     if re.search(r'\.bam$', f):
45         input_stream_name, input_file_name = os.path.split(f)
46         input_bam_files += [os.path.join(input_dir, f)]
47 if len(input_bam_files) != 1:
48     raise InvalidArgumentError("Expected exactly one bam file per task.")
49
50 known_sites_args = []
51 for f in known_sites_files:
52     known_sites_args += ['-knownSites', os.path.join(bundle_dir, f)]
53
54 recal_file = os.path.join(tmpdir, 'recal.csv')
55
56 children = {}
57 pipes = {}
58
59 arvados_gatk2.run(
60     args=[
61         '-nct', arvados_gatk2.cpus_on_this_node(),
62         '-T', 'BaseRecalibrator',
63         '-R', ref_fasta_files[0],
64         '-I', input_bam_files[0],
65         '-o', recal_file,
66         ] + known_sites_args)
67
68 pipe_setup(pipes, 'BQSR')
69 if 0 == named_fork(children, 'BQSR'):
70     pipe_closeallbut(pipes, ('BQSR', 'w'))
71     arvados_gatk2.run(
72         args=[
73         '-T', 'PrintReads',
74         '-R', ref_fasta_files[0],
75         '-I', input_bam_files[0],
76         '-o', '/dev/fd/' + str(pipes['BQSR','w']),
77         '-BQSR', recal_file,
78         '--disable_bam_indexing',
79         ],
80         close_fds=False)
81     os._exit(0)
82 os.close(pipes.pop(('BQSR','w'), None))
83
84 out = arvados.CollectionWriter()
85 out.start_new_stream(input_stream_name)
86
87 out.start_new_file(input_file_name + '.recal.csv')
88 out.write(open(recal_file, 'rb'))
89
90 out.start_new_file(input_file_name)
91 while True:
92     buf = os.read(pipes['BQSR','r'], 2**20)
93     if len(buf) == 0:
94         break
95     out.write(buf)
96 pipe_closeallbut(pipes)
97
98 if waitpid_and_check_children(children):
99     this_task.set_output(out.finish())
100 else:
101     sys.exit(1)