9 from arvados_ipc import *
11 arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
13 this_job = arvados.current_job()
14 this_task = arvados.current_task()
15 ref_dir = arvados.util.collection_extract(
16 collection = this_job['script_parameters']['reference'],
19 ref_fasta_files = [os.path.join(ref_dir, f)
20 for f in os.listdir(ref_dir)
21 if re.search(r'\.fasta(\.gz)?$', f)]
22 input_collection = this_task['parameters']['input']
24 for s in arvados.CollectionReader(input_collection).all_streams():
25 for f in s.all_files():
26 input_stream_name = s.name()
27 input_file_name = f.name()
30 # Unfortunately, picard FixMateInformation cannot read from a pipe. We
31 # must copy the input to a temporary file before running picard.
32 input_bam_path = os.path.join(this_task.tmpdir, input_file_name)
33 with open(input_bam_path, 'wb') as bam:
34 for s in arvados.CollectionReader(input_collection).all_streams():
35 for f in s.all_files():
42 pipe_setup(pipes, 'fixmate')
43 if 0==named_fork(children, 'fixmate'):
44 pipe_closeallbut(pipes, ('fixmate', 'w'))
52 'validation_stringency': 'LENIENT',
53 'compression_level': 0
55 stdout=os.fdopen(pipes['fixmate','w'], 'wb', 2**20))
57 os.close(pipes.pop(('fixmate','w'), None))
59 pipe_setup(pipes, 'sortsam')
60 if 0==named_fork(children, 'sortsam'):
61 pipe_closeallbut(pipes, ('fixmate', 'r'), ('sortsam', 'w'))
69 'validation_stringency': 'LENIENT',
70 'compression_level': 0
72 stdin=os.fdopen(pipes['fixmate','r'], 'rb', 2**20),
73 stdout=os.fdopen(pipes['sortsam','w'], 'wb', 2**20))
76 pipe_setup(pipes, 'reordersam')
77 if 0==named_fork(children, 'reordersam'):
78 pipe_closeallbut(pipes, ('sortsam', 'r'), ('reordersam', 'w'))
84 'reference': ref_fasta_files[0],
86 'validation_stringency': 'LENIENT',
87 'compression_level': 0
89 stdin=os.fdopen(pipes['sortsam','r'], 'rb', 2**20),
90 stdout=os.fdopen(pipes['reordersam','w'], 'wb', 2**20))
93 pipe_setup(pipes, 'addrg')
94 if 0==named_fork(children, 'addrg'):
95 pipe_closeallbut(pipes, ('reordersam', 'r'), ('addrg', 'w'))
97 'AddOrReplaceReadGroups',
102 'rglb': this_job['script_parameters'].get('rglb', 0),
103 'rgpl': this_job['script_parameters'].get('rgpl', 'illumina'),
104 'rgpu': this_job['script_parameters'].get('rgpu', 0),
105 'rgsm': this_job['script_parameters'].get('rgsm', 0),
106 'validation_stringency': 'LENIENT'
108 stdin=os.fdopen(pipes['reordersam','r'], 'rb', 2**20),
109 stdout=os.fdopen(pipes['addrg','w'], 'wb', 2**20))
112 pipe_setup(pipes, 'bammanifest')
113 pipe_setup(pipes, 'bam')
114 pipe_setup(pipes, 'casm_in')
115 if 0==named_fork(children, 'bammanifest'):
116 pipe_closeallbut(pipes,
118 ('bammanifest', 'w'),
121 out = arvados.CollectionWriter()
122 out.start_new_stream(input_stream_name)
123 out.start_new_file(input_file_name)
125 buf = os.read(pipes['addrg','r'], 2**20)
128 os.write(pipes['bam','w'], buf)
129 os.write(pipes['casm_in','w'], buf)
131 os.write(pipes['bammanifest','w'], out.manifest_text())
132 os.close(pipes['bammanifest','w'])
135 pipe_setup(pipes, 'casm')
136 if 0 == named_fork(children, 'casm'):
137 pipe_closeallbut(pipes, ('casm_in', 'r'), ('casm', 'w'))
139 'CollectAlignmentSummaryMetrics',
141 'input': '/dev/fd/' + str(pipes['casm_in','r']),
142 'output': '/dev/fd/' + str(pipes['casm','w']),
143 'reference_sequence': ref_fasta_files[0],
144 'validation_stringency': 'LENIENT',
149 pipe_setup(pipes, 'index')
150 if 0==named_fork(children, 'index'):
151 pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
158 'validation_stringency': 'LENIENT'
160 stdin=os.fdopen(pipes['bam','r'], 'rb', 2**20),
161 stdout=os.fdopen(pipes['index','w'], 'wb', 2**20))
164 pipe_setup(pipes, 'indexmanifest')
165 if 0==named_fork(children, 'indexmanifest'):
166 pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
167 out = arvados.CollectionWriter()
168 out.start_new_stream(input_stream_name)
169 out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
171 buf = os.read(pipes['index','r'], 2**20)
175 os.write(pipes['indexmanifest','w'], out.manifest_text())
176 os.close(pipes['indexmanifest','w'])
179 pipe_closeallbut(pipes,
180 ('bammanifest', 'r'),
181 ('indexmanifest', 'r'),
186 for which in ['bammanifest', 'indexmanifest']:
187 with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
194 casm_out = arvados.CollectionWriter()
195 casm_out.start_new_stream(input_stream_name)
196 casm_out.start_new_file(input_file_name + '.casm.tsv')
197 casm_out.write(os.fdopen(pipes.pop(('casm','r'))))
199 outmanifest += casm_out.manifest_text()
202 for (childname, pid) in children.items():
203 all_ok = all_ok and waitpid_and_check_exit(pid, childname)
206 this_task.set_output(outmanifest)