2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
12 from arvados_ipc import *
14 arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
16 this_job = arvados.current_job()
17 this_task = arvados.current_task()
18 ref_dir = arvados.util.collection_extract(
19 collection = this_job['script_parameters']['reference'],
22 ref_fasta_files = [os.path.join(ref_dir, f)
23 for f in os.listdir(ref_dir)
24 if re.search(r'\.fasta(\.gz)?$', f)]
25 input_collection = this_task['parameters']['input']
27 for s in arvados.CollectionReader(input_collection).all_streams():
28 for f in s.all_files():
29 input_stream_name = s.name()
30 input_file_name = f.name()
33 # Unfortunately, picard FixMateInformation cannot read from a pipe. We
34 # must copy the input to a temporary file before running picard.
35 input_bam_path = os.path.join(this_task.tmpdir, input_file_name)
36 with open(input_bam_path, 'wb') as bam:
37 for s in arvados.CollectionReader(input_collection).all_streams():
38 for f in s.all_files():
45 pipe_setup(pipes, 'fixmate')
46 if 0==named_fork(children, 'fixmate'):
47 pipe_closeallbut(pipes, ('fixmate', 'w'))
55 'validation_stringency': 'LENIENT',
56 'compression_level': 0
58 stdout=os.fdopen(pipes['fixmate','w'], 'wb', 2**20))
60 os.close(pipes.pop(('fixmate','w'), None))
62 pipe_setup(pipes, 'sortsam')
63 if 0==named_fork(children, 'sortsam'):
64 pipe_closeallbut(pipes, ('fixmate', 'r'), ('sortsam', 'w'))
72 'validation_stringency': 'LENIENT',
73 'compression_level': 0
75 stdin=os.fdopen(pipes['fixmate','r'], 'rb', 2**20),
76 stdout=os.fdopen(pipes['sortsam','w'], 'wb', 2**20))
79 pipe_setup(pipes, 'reordersam')
80 if 0==named_fork(children, 'reordersam'):
81 pipe_closeallbut(pipes, ('sortsam', 'r'), ('reordersam', 'w'))
87 'reference': ref_fasta_files[0],
89 'validation_stringency': 'LENIENT',
90 'compression_level': 0
92 stdin=os.fdopen(pipes['sortsam','r'], 'rb', 2**20),
93 stdout=os.fdopen(pipes['reordersam','w'], 'wb', 2**20))
96 pipe_setup(pipes, 'addrg')
97 if 0==named_fork(children, 'addrg'):
98 pipe_closeallbut(pipes, ('reordersam', 'r'), ('addrg', 'w'))
100 'AddOrReplaceReadGroups',
105 'rglb': this_job['script_parameters'].get('rglb', 0),
106 'rgpl': this_job['script_parameters'].get('rgpl', 'illumina'),
107 'rgpu': this_job['script_parameters'].get('rgpu', 0),
108 'rgsm': this_job['script_parameters'].get('rgsm', 0),
109 'validation_stringency': 'LENIENT'
111 stdin=os.fdopen(pipes['reordersam','r'], 'rb', 2**20),
112 stdout=os.fdopen(pipes['addrg','w'], 'wb', 2**20))
115 pipe_setup(pipes, 'bammanifest')
116 pipe_setup(pipes, 'bam')
117 pipe_setup(pipes, 'casm_in')
118 if 0==named_fork(children, 'bammanifest'):
119 pipe_closeallbut(pipes,
121 ('bammanifest', 'w'),
124 out = arvados.CollectionWriter()
125 out.start_new_stream(input_stream_name)
126 out.start_new_file(input_file_name)
128 buf = os.read(pipes['addrg','r'], 2**20)
131 os.write(pipes['bam','w'], buf)
132 os.write(pipes['casm_in','w'], buf)
134 os.write(pipes['bammanifest','w'], out.manifest_text())
135 os.close(pipes['bammanifest','w'])
138 pipe_setup(pipes, 'casm')
139 if 0 == named_fork(children, 'casm'):
140 pipe_closeallbut(pipes, ('casm_in', 'r'), ('casm', 'w'))
142 'CollectAlignmentSummaryMetrics',
144 'input': '/dev/fd/' + str(pipes['casm_in','r']),
145 'output': '/dev/fd/' + str(pipes['casm','w']),
146 'reference_sequence': ref_fasta_files[0],
147 'validation_stringency': 'LENIENT',
152 pipe_setup(pipes, 'index')
153 if 0==named_fork(children, 'index'):
154 pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
161 'validation_stringency': 'LENIENT'
163 stdin=os.fdopen(pipes['bam','r'], 'rb', 2**20),
164 stdout=os.fdopen(pipes['index','w'], 'wb', 2**20))
167 pipe_setup(pipes, 'indexmanifest')
168 if 0==named_fork(children, 'indexmanifest'):
169 pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
170 out = arvados.CollectionWriter()
171 out.start_new_stream(input_stream_name)
172 out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
174 buf = os.read(pipes['index','r'], 2**20)
178 os.write(pipes['indexmanifest','w'], out.manifest_text())
179 os.close(pipes['indexmanifest','w'])
182 pipe_closeallbut(pipes,
183 ('bammanifest', 'r'),
184 ('indexmanifest', 'r'),
189 for which in ['bammanifest', 'indexmanifest']:
190 with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
197 casm_out = arvados.CollectionWriter()
198 casm_out.start_new_stream(input_stream_name)
199 casm_out.start_new_file(input_file_name + '.casm.tsv')
200 casm_out.write(os.fdopen(pipes.pop(('casm','r'))))
202 outmanifest += casm_out.manifest_text()
205 for (childname, pid) in children.items():
206 all_ok = all_ok and waitpid_and_check_exit(pid, childname)
209 this_task.set_output(outmanifest)