8 import arvados_samtools
9 from arvados_ipc import *
11 class InvalidArgumentError(Exception):
14 arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
16 this_job = arvados.current_job()
17 this_task = arvados.current_task()
18 tmpdir = arvados.current_task().tmpdir
19 arvados.util.clear_tmpdir()
21 known_sites_files = arvados.getjobparam(
24 'Mills_and_1000G_gold_standard.indels.b37.vcf',
26 bundle_dir = arvados.util.collection_extract(
27 collection = this_job['script_parameters']['gatk_bundle'],
30 'human_g1k_v37.fasta',
31 'human_g1k_v37.fasta.fai'
32 ] + known_sites_files + [v + '.idx' for v in known_sites_files],
34 ref_fasta_files = [os.path.join(bundle_dir, f)
35 for f in os.listdir(bundle_dir)
36 if re.search(r'\.fasta(\.gz)?$', f)]
38 input_collection = this_task['parameters']['input']
39 input_dir = arvados.util.collection_extract(
40 collection = input_collection,
41 path = os.path.join(this_task.tmpdir, 'input'))
43 for f in arvados.util.listdir_recursive(input_dir):
44 if re.search(r'\.bam$', f):
45 input_stream_name, input_file_name = os.path.split(f)
46 input_bam_files += [os.path.join(input_dir, f)]
47 if len(input_bam_files) != 1:
48 raise InvalidArgumentError("Expected exactly one bam file per task.")
51 for f in known_sites_files:
52 known_sites_args += ['-knownSites', os.path.join(bundle_dir, f)]
54 recal_file = os.path.join(tmpdir, 'recal.csv')
61 '-nct', arvados_gatk2.cpus_on_this_node(),
62 '-T', 'BaseRecalibrator',
63 '-R', ref_fasta_files[0],
64 '-I', input_bam_files[0],
68 pipe_setup(pipes, 'BQSR')
69 if 0 == named_fork(children, 'BQSR'):
70 pipe_closeallbut(pipes, ('BQSR', 'w'))
74 '-R', ref_fasta_files[0],
75 '-I', input_bam_files[0],
76 '-o', '/dev/fd/' + str(pipes['BQSR','w']),
78 '--disable_bam_indexing',
82 os.close(pipes.pop(('BQSR','w'), None))
84 out = arvados.CollectionWriter()
85 out.start_new_stream(input_stream_name)
87 out.start_new_file(input_file_name + '.recal.csv')
88 out.write(open(recal_file, 'rb'))
90 out.start_new_file(input_file_name)
92 buf = os.read(pipes['BQSR','r'], 2**20)
96 pipe_closeallbut(pipes)
98 if waitpid_and_check_children(children):
99 this_task.set_output(out.finish())