7 import arvados_samtools
8 from arvados_ipc import *
10 class InvalidArgumentError(Exception):
13 arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
15 this_job = arvados.current_job()
16 this_task = arvados.current_task()
17 tmpdir = arvados.current_task().tmpdir
18 arvados.util.clear_tmpdir()
20 known_sites_files = arvados.getjobparam(
23 'Mills_and_1000G_gold_standard.indels.b37.vcf',
25 bundle_dir = arvados.util.collection_extract(
26 collection = this_job['script_parameters']['gatk_bundle'],
29 'human_g1k_v37.fasta',
30 'human_g1k_v37.fasta.fai'
31 ] + known_sites_files + [v + '.idx' for v in known_sites_files],
33 ref_fasta_files = [os.path.join(bundle_dir, f)
34 for f in os.listdir(bundle_dir)
35 if re.search(r'\.fasta(\.gz)?$', f)]
37 input_collection = this_task['parameters']['input']
38 input_dir = arvados.util.collection_extract(
39 collection = input_collection,
40 path = os.path.join(this_task.tmpdir, 'input'))
42 for f in arvados.util.listdir_recursive(input_dir):
43 if re.search(r'\.bam$', f):
44 input_stream_name, input_file_name = os.path.split(f)
45 input_bam_files += [os.path.join(input_dir, f)]
46 if len(input_bam_files) != 1:
47 raise InvalidArgumentError("Expected exactly one bam file per task.")
50 for f in known_sites_files:
51 known_sites_args += ['-knownSites', os.path.join(bundle_dir, f)]
53 recal_file = os.path.join(tmpdir, 'recal.csv')
60 '-nct', arvados_gatk2.cpus_on_this_node(),
61 '-T', 'BaseRecalibrator',
62 '-R', ref_fasta_files[0],
63 '-I', input_bam_files[0],
67 pipe_setup(pipes, 'BQSR')
68 if 0 == named_fork(children, 'BQSR'):
69 pipe_closeallbut(pipes, ('BQSR', 'w'))
73 '-R', ref_fasta_files[0],
74 '-I', input_bam_files[0],
75 '-o', '/dev/fd/' + str(pipes['BQSR','w']),
77 '--disable_bam_indexing',
81 os.close(pipes.pop(('BQSR','w'), None))
83 out = arvados.CollectionWriter()
84 out.start_new_stream(input_stream_name)
86 out.start_new_file(input_file_name + '.recal.csv')
87 out.write(open(recal_file, 'rb'))
89 out.start_new_file(input_file_name)
91 buf = os.read(pipes['BQSR','r'], 2**20)
95 pipe_closeallbut(pipes)
97 if waitpid_and_check_children(children):
98 this_task.set_output(out.finish())