--- /dev/null
+#!/usr/bin/env python
+
+import os
+import re
+import arvados
+import arvados_gatk2
+import arvados_picard
+import arvados_samtools
+from arvados_ipc import *
+
+class InvalidArgumentError(Exception):
+ pass
+
+arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
+
+this_job = arvados.current_job()
+this_task = arvados.current_task()
+tmpdir = arvados.current_task().tmpdir
+arvados.util.clear_tmpdir()
+
+known_sites_files = arvados.getjobparam(
+ 'known_sites',
+ ['dbsnp_137.b37.vcf',
+ 'Mills_and_1000G_gold_standard.indels.b37.vcf',
+ ])
+bundle_dir = arvados.util.collection_extract(
+ collection = this_job['script_parameters']['gatk_bundle'],
+ files = [
+ 'human_g1k_v37.dict',
+ 'human_g1k_v37.fasta',
+ 'human_g1k_v37.fasta.fai'
+ ] + known_sites_files + [v + '.idx' for v in known_sites_files],
+ path = 'gatk_bundle')
+ref_fasta_files = [os.path.join(bundle_dir, f)
+ for f in os.listdir(bundle_dir)
+ if re.search(r'\.fasta(\.gz)?$', f)]
+
+input_collection = this_task['parameters']['input']
+input_dir = arvados.util.collection_extract(
+ collection = input_collection,
+ path = os.path.join(this_task.tmpdir, 'input'))
+input_bam_files = []
+for f in arvados.util.listdir_recursive(input_dir):
+ if re.search(r'\.bam$', f):
+ input_stream_name, input_file_name = os.path.split(f)
+ input_bam_files += [os.path.join(input_dir, f)]
+if len(input_bam_files) != 1:
+ raise InvalidArgumentError("Expected exactly one bam file per task.")
+
+known_sites_args = []
+for f in known_sites_files:
+ known_sites_args += ['-knownSites', os.path.join(bundle_dir, f)]
+
+recal_file = os.path.join(tmpdir, 'recal.csv')
+
+children = {}
+pipes = {}
+
+arvados_gatk2.run(
+ args=[
+ '-nct', arvados_gatk2.cpus_on_this_node(),
+ '-T', 'BaseRecalibrator',
+ '-R', ref_fasta_files[0],
+ '-I', input_bam_files[0],
+ '-o', recal_file,
+ ] + known_sites_args)
+
+pipe_setup(pipes, 'BQSR')
+if 0 == named_fork(children, 'BQSR'):
+ pipe_closeallbut(pipes, ('BQSR', 'w'))
+ arvados_gatk2.run(
+ args=[
+ '-T', 'PrintReads',
+ '-R', ref_fasta_files[0],
+ '-I', input_bam_files[0],
+ '-o', '/dev/fd/' + str(pipes['BQSR','w']),
+ '-BQSR', recal_file,
+ '--disable_bam_indexing',
+ ],
+ close_fds=False)
+ os._exit(0)
+os.close(pipes.pop(('BQSR','w'), None))
+
+out = arvados.CollectionWriter()
+out.start_new_stream(input_stream_name)
+
+out.start_new_file(input_file_name + '.recal.csv')
+out.write(open(recal_file, 'rb'))
+
+out.start_new_file(input_file_name)
+while True:
+ buf = os.read(pipes['BQSR','r'], 2**20)
+ if len(buf) == 0:
+ break
+ out.write(buf)
+pipe_closeallbut(pipes)
+
+if waitpid_and_check_children(children):
+ this_task.set_output(out.finish())
+else:
+ sys.exit(1)
--- /dev/null
+#!/usr/bin/env python
+
+import os
+import re
+import string
+import threading
+import arvados
+import arvados_gatk2
+import arvados_picard
+from arvados_ipc import *
+
+class InvalidArgumentError(Exception):
+ pass
+
+this_job = arvados.current_job()
+this_task = arvados.current_task()
+tmpdir = arvados.current_task().tmpdir
+arvados.util.clear_tmpdir()
+
+bundle_dir = arvados.util.collection_extract(
+ collection = this_job['script_parameters']['gatk_bundle'],
+ files = [
+ 'human_g1k_v37.dict',
+ 'human_g1k_v37.fasta',
+ 'human_g1k_v37.fasta.fai',
+ 'dbsnp_137.b37.vcf',
+ 'dbsnp_137.b37.vcf.idx',
+ ],
+ path = 'gatk_bundle')
+ref_fasta_files = [os.path.join(bundle_dir, f)
+ for f in os.listdir(bundle_dir)
+ if re.search(r'\.fasta(\.gz)?$', f)]
+regions_args = []
+if 'regions' in this_job['script_parameters']:
+ regions_dir = arvados.util.collection_extract(
+ collection = this_job['script_parameters']['regions'],
+ path = 'regions')
+ region_padding = int(this_job['script_parameters']['region_padding'])
+ for f in os.listdir(regions_dir):
+ if re.search(r'\.bed$', f):
+ regions_args += [
+ '--intervals', os.path.join(regions_dir, f),
+ '--interval_padding', str(region_padding)
+ ]
+
+
+# Start a child process for each input file, feeding data to picard.
+
+input_child_names = []
+children = {}
+pipes = {}
+
+input_collection = this_job['script_parameters']['input']
+input_index = 0
+for s in arvados.CollectionReader(input_collection).all_streams():
+ for f in s.all_files():
+ if not re.search(r'\.bam$', f.name()):
+ continue
+ input_index += 1
+ childname = 'input-' + str(input_index)
+ input_child_names += [childname]
+ pipe_setup(pipes, childname)
+ childpid = named_fork(children, childname)
+ if childpid == 0:
+ pipe_closeallbut(pipes, (childname, 'w'))
+ for s in f.readall():
+ os.write(pipes[childname, 'w'], s)
+ os.close(pipes[childname, 'w'])
+ os._exit(0)
+ sys.stderr.write("pid %d writing %s to fd %d->%d\n" %
+ (childpid,
+ s.name()+'/'+f.name(),
+ pipes[childname, 'w'],
+ pipes[childname, 'r']))
+ pipe_closeallbut(pipes, *[(childname, 'r')
+ for childname in input_child_names])
+
+
+# Merge-sort the input files to merge.bam
+
+arvados_picard.run(
+ 'MergeSamFiles',
+ args=[
+ 'I=/dev/fd/' + str(pipes[childname, 'r'])
+ for childname in input_child_names
+ ],
+ params={
+ 'o': 'merge.bam',
+ 'quiet': 'true',
+ 'so': 'coordinate',
+ 'use_threading': 'true',
+ 'create_index': 'true',
+ 'validation_stringency': 'LENIENT',
+ },
+ close_fds=False,
+ )
+pipe_closeallbut(pipes)
+
+
+# Run CoverageBySample on merge.bam
+
+pipe_setup(pipes, 'stats_log')
+pipe_setup(pipes, 'stats_out')
+if 0 == named_fork(children, 'GATK'):
+ pipe_closeallbut(pipes,
+ ('stats_log', 'w'),
+ ('stats_out', 'w'))
+ arvados_gatk2.run(
+ args=[
+ '-T', 'CoverageBySample',
+ '-R', ref_fasta_files[0],
+ '-I', 'merge.bam',
+ '-o', '/dev/fd/' + str(pipes['stats_out', 'w']),
+ '--log_to_file', '/dev/fd/' + str(pipes['stats_log', 'w']),
+ ]
+ + regions_args,
+ close_fds=False)
+ pipe_closeallbut(pipes)
+ os._exit(0)
+pipe_closeallbut(pipes, ('stats_log', 'r'), ('stats_out', 'r'))
+
+
+# Start two threads to read from CoverageBySample pipes
+
+class ExceptionPropagatingThread(threading.Thread):
+ """
+ If a subclassed thread calls _raise(e) in run(), running join() on
+ the thread will raise e in the thread that calls join().
+ """
+ def __init__(self, *args, **kwargs):
+ super(ExceptionPropagatingThread, self).__init__(*args, **kwargs)
+ self.__exception = None
+ def join(self, *args, **kwargs):
+ ret = super(ExceptionPropagatingThread, self).join(*args, **kwargs)
+ if self.__exception:
+ raise self.__exception
+ return ret
+ def _raise(self, exception):
+ self.__exception = exception
+
+class StatsLogReader(ExceptionPropagatingThread):
+ def __init__(self, **kwargs):
+ super(StatsLogReader, self).__init__()
+ self.args = kwargs
+ def run(self):
+ try:
+ for logline in self.args['infile']:
+ x = re.search('Processing (\d+) bp from intervals', logline)
+ if x:
+ self._total_bp = int(x.group(1))
+ except Exception as e:
+ self._raise(e)
+ def total_bp(self):
+ self.join()
+ return self._total_bp
+stats_log_thr = StatsLogReader(infile=os.fdopen(pipes.pop(('stats_log', 'r'))))
+stats_log_thr.start()
+
+class StatsOutReader(ExceptionPropagatingThread):
+ """
+ Read output of CoverageBySample and collect a histogram of
+ coverage (last column) -> number of loci (number of rows).
+ """
+ def __init__(self, **kwargs):
+ super(StatsOutReader, self).__init__()
+ self.args = kwargs
+ def run(self):
+ try:
+ hist = [0]
+ histtot = 0
+ for line in self.args['infile']:
+ try:
+ i = int(string.split(line)[-1])
+ except ValueError:
+ continue
+ if i >= 1:
+ if len(hist) <= i:
+ hist.extend([0 for x in range(1+i-len(hist))])
+ hist[i] += 1
+ histtot += 1
+ hist[0] = stats_log_thr.total_bp() - histtot
+ self._histogram = hist
+ except Exception as e:
+ self._raise(e)
+ def histogram(self):
+ self.join()
+ return self._histogram
+stats_out_thr = StatsOutReader(infile=os.fdopen(pipes.pop(('stats_out', 'r'))))
+stats_out_thr.start()
+
+
+# Run UnifiedGenotyper on merge.bam
+
+arvados_gatk2.run(
+ args=[
+ '-nt', arvados_gatk2.cpus_on_this_node(),
+ '-T', 'UnifiedGenotyper',
+ '-R', ref_fasta_files[0],
+ '-I', 'merge.bam',
+ '-o', os.path.join(tmpdir, 'out.vcf'),
+ '--dbsnp', os.path.join(bundle_dir, 'dbsnp_137.b37.vcf'),
+ '-metrics', 'UniGenMetrics',
+ '-A', 'DepthOfCoverage',
+ '-A', 'AlleleBalance',
+ '-A', 'QualByDepth',
+ '-A', 'HaplotypeScore',
+ '-A', 'MappingQualityRankSumTest',
+ '-A', 'ReadPosRankSumTest',
+ '-A', 'FisherStrand',
+ '-glm', 'both',
+ ]
+ + regions_args
+ + arvados.getjobparam('GATK2_UnifiedGenotyper_args',[]))
+
+# Copy the output VCF file to Keep
+
+out = arvados.CollectionWriter()
+out.start_new_stream()
+out.start_new_file('out.vcf')
+out.write(open(os.path.join(tmpdir, 'out.vcf'), 'rb'))
+
+
+# Write statistics to Keep
+
+out.start_new_file('mincoverage_nlocus.csv')
+sofar = 0
+hist = stats_out_thr.histogram()
+total_bp = stats_log_thr.total_bp()
+for i in range(len(hist)):
+ out.write("%d,%d,%f\n" %
+ (i,
+ total_bp - sofar,
+ 100.0 * (total_bp - sofar) / total_bp))
+ sofar += hist[i]
+
+if waitpid_and_check_children(children):
+ this_task.set_output(out.finish())
+else:
+ sys.exit(1)
--- /dev/null
+#!/usr/bin/env python
+
+import os
+import re
+import arvados
+import arvados_gatk2
+import arvados_picard
+import arvados_samtools
+from arvados_ipc import *
+
+class InvalidArgumentError(Exception):
+ pass
+
+arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
+
+this_job = arvados.current_job()
+this_task = arvados.current_task()
+tmpdir = arvados.current_task().tmpdir
+arvados.util.clear_tmpdir()
+
+known_sites_files = arvados.getjobparam(
+ 'known_sites',
+ ['dbsnp_137.b37.vcf',
+ 'Mills_and_1000G_gold_standard.indels.b37.vcf',
+ ])
+bundle_dir = arvados.util.collection_extract(
+ collection = this_job['script_parameters']['gatk_bundle'],
+ files = [
+ 'human_g1k_v37.dict',
+ 'human_g1k_v37.fasta',
+ 'human_g1k_v37.fasta.fai'
+ ] + known_sites_files + [v + '.idx' for v in known_sites_files],
+ path = 'gatk_bundle')
+ref_fasta_files = [os.path.join(bundle_dir, f)
+ for f in os.listdir(bundle_dir)
+ if re.search(r'\.fasta(\.gz)?$', f)]
+regions_args = []
+if 'regions' in this_job['script_parameters']:
+ regions_dir = arvados.util.collection_extract(
+ collection = this_job['script_parameters']['regions'],
+ path = 'regions')
+ region_padding = int(this_job['script_parameters']['region_padding'])
+ for f in os.listdir(regions_dir):
+ if re.search(r'\.bed$', f):
+ regions_args += [
+ '--intervals', os.path.join(regions_dir, f),
+ '--interval_padding', str(region_padding)
+ ]
+
+input_collection = this_task['parameters']['input']
+input_dir = arvados.util.collection_extract(
+ collection = input_collection,
+ path = os.path.join(this_task.tmpdir, 'input'))
+input_bam_files = []
+for f in arvados.util.listdir_recursive(input_dir):
+ if re.search(r'\.bam$', f):
+ input_stream_name, input_file_name = os.path.split(f)
+ input_bam_files += [os.path.join(input_dir, f)]
+if len(input_bam_files) != 1:
+ raise InvalidArgumentError("Expected exactly one bam file per task.")
+
+known_sites_args = []
+for f in known_sites_files:
+ known_sites_args += ['-known', os.path.join(bundle_dir, f)]
+
+children = {}
+pipes = {}
+
+arvados_gatk2.run(
+ args=[
+ '-nt', arvados_gatk2.cpus_per_task(),
+ '-T', 'RealignerTargetCreator',
+ '-R', ref_fasta_files[0],
+ '-I', input_bam_files[0],
+ '-o', os.path.join(tmpdir, 'intervals.list')
+ ] + known_sites_args + regions_args)
+
+pipe_setup(pipes, 'IndelRealigner')
+if 0 == named_fork(children, 'IndelRealigner'):
+ pipe_closeallbut(pipes, ('IndelRealigner', 'w'))
+ arvados_gatk2.run(
+ args=[
+ '-T', 'IndelRealigner',
+ '-R', ref_fasta_files[0],
+ '-targetIntervals', os.path.join(tmpdir, 'intervals.list'),
+ '-I', input_bam_files[0],
+ '-o', '/dev/fd/' + str(pipes['IndelRealigner','w']),
+ '--disable_bam_indexing',
+ ] + known_sites_args + regions_args,
+ close_fds=False)
+ os._exit(0)
+os.close(pipes.pop(('IndelRealigner','w'), None))
+
+pipe_setup(pipes, 'bammanifest')
+pipe_setup(pipes, 'bam')
+if 0==named_fork(children, 'bammanifest'):
+ pipe_closeallbut(pipes,
+ ('IndelRealigner', 'r'),
+ ('bammanifest', 'w'),
+ ('bam', 'w'))
+ out = arvados.CollectionWriter()
+ out.start_new_stream(input_stream_name)
+ out.start_new_file(input_file_name)
+ while True:
+ buf = os.read(pipes['IndelRealigner','r'], 2**20)
+ if len(buf) == 0:
+ break
+ os.write(pipes['bam','w'], buf)
+ out.write(buf)
+ os.write(pipes['bammanifest','w'], out.manifest_text())
+ os.close(pipes['bammanifest','w'])
+ os._exit(0)
+
+pipe_setup(pipes, 'index')
+if 0==named_fork(children, 'index'):
+ pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
+ arvados_picard.run(
+ 'BuildBamIndex',
+ params={
+ 'i': '/dev/fd/' + str(pipes['bam','r']),
+ 'o': '/dev/fd/' + str(pipes['index','w']),
+ 'quiet': 'true',
+ 'validation_stringency': 'LENIENT'
+ },
+ close_fds=False)
+ os._exit(0)
+
+pipe_setup(pipes, 'indexmanifest')
+if 0==named_fork(children, 'indexmanifest'):
+ pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
+ out = arvados.CollectionWriter()
+ out.start_new_stream(input_stream_name)
+ out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
+ while True:
+ buf = os.read(pipes['index','r'], 2**20)
+ if len(buf) == 0:
+ break
+ out.write(buf)
+ os.write(pipes['indexmanifest','w'], out.manifest_text())
+ os.close(pipes['indexmanifest','w'])
+ os._exit(0)
+
+pipe_closeallbut(pipes, ('bammanifest', 'r'), ('indexmanifest', 'r'))
+outmanifest = ''
+for which in ['bammanifest', 'indexmanifest']:
+ with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
+ while True:
+ buf = f.read()
+ if buf == '':
+ break
+ outmanifest += buf
+
+all_ok = True
+for (childname, pid) in children.items():
+ all_ok = all_ok and waitpid_and_check_exit(pid, childname)
+
+if all_ok:
+ this_task.set_output(outmanifest)
+else:
+ sys.exit(1)
--- /dev/null
+import arvados
+import re
+import os
+import sys
+import fcntl
+import subprocess
+
+bwa_install_path = None
+
+def setup():
+ global bwa_install_path
+ if bwa_install_path:
+ return bwa_install_path
+ bwa_path = arvados.util.tarball_extract(
+ tarball = arvados.current_job()['script_parameters']['bwa_tbz'],
+ path = 'bwa')
+
+ # build "bwa" binary
+ lockfile = open(os.path.split(bwa_path)[0] + '.bwa-make.lock',
+ 'w')
+ fcntl.flock(lockfile, fcntl.LOCK_EX)
+ arvados.util.run_command(['make', '-j16'], cwd=bwa_path)
+ lockfile.close()
+
+ bwa_install_path = bwa_path
+ return bwa_path
+
+def bwa_binary():
+ global bwa_install_path
+ return os.path.join(bwa_install_path, 'bwa')
+
+def run(command, command_args, **kwargs):
+ global bwa_install_path
+ execargs = [bwa_binary(),
+ command]
+ execargs += command_args
+ sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
+ arvados.util.run_command(
+ execargs,
+ cwd=arvados.current_task().tmpdir,
+ stderr=sys.stderr,
+ stdin=kwargs.get('stdin', subprocess.PIPE),
+ stdout=kwargs.get('stdout', sys.stderr))
+
+def one_task_per_pair_input_file(if_sequence=0, and_end_task=True):
+ if if_sequence != arvados.current_task()['sequence']:
+ return
+ job_input = arvados.current_job()['script_parameters']['input']
+ cr = arvados.CollectionReader(job_input)
+ all_files = []
+ for s in cr.all_streams():
+ all_files += list(s.all_files())
+ for s in cr.all_streams():
+ for left_file in s.all_files():
+ left_name = left_file.name()
+ right_file = None
+ right_name = re.sub(r'(.*_)1\.', '\g<1>2.', left_name)
+ if right_name == left_name:
+ continue
+ for f2 in s.all_files():
+ if right_name == f2.name():
+ right_file = f2
+ if right_file != None:
+ new_task_attrs = {
+ 'job_uuid': arvados.current_job()['uuid'],
+ 'created_by_job_task_uuid': arvados.current_task()['uuid'],
+ 'sequence': if_sequence + 1,
+ 'parameters': {
+ 'input_1':left_file.as_manifest(),
+ 'input_2':right_file.as_manifest()
+ }
+ }
+ arvados.api().job_tasks().create(body=new_task_attrs).execute()
+ if and_end_task:
+ arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'],
+ body={'success':True}
+ ).execute()
+ exit(0)
+
+setup()
--- /dev/null
+import arvados
+import re
+import os
+import sys
+import fcntl
+import subprocess
+
+gatk2_install_path = None
+
+def install_path():
+ global gatk2_install_path
+ if gatk2_install_path:
+ return gatk2_install_path
+ gatk2_install_path = arvados.util.tarball_extract(
+ tarball = arvados.current_job()['script_parameters']['gatk_tbz'],
+ path = 'gatk2')
+ return gatk2_install_path
+
+def memory_limit():
+ taskspernode = int(os.environ.get('CRUNCH_NODE_SLOTS', '1'))
+ with open('/proc/meminfo', 'r') as f:
+ ram = int(re.search(r'MemTotal:\s*(\d+)', f.read()).group(1)) / 1024
+ if taskspernode > 1:
+ ram = ram / taskspernode
+ return max(ram-700, 500)
+
+def cpus_on_this_node():
+ with open('/proc/cpuinfo', 'r') as cpuinfo:
+ return max(int(os.environ.get('SLURM_CPUS_ON_NODE', 1)),
+ len(re.findall(r'^processor\s*:\s*\d',
+ cpuinfo.read(),
+ re.MULTILINE)))
+
+def cpus_per_task():
+ return max(1, (cpus_on_this_node()
+ / int(os.environ.get('CRUNCH_NODE_SLOTS', 1))))
+
+def run(**kwargs):
+ kwargs.setdefault('cwd', arvados.current_task().tmpdir)
+ kwargs.setdefault('stdout', sys.stderr)
+ execargs = ['java',
+ '-Xmx%dm' % memory_limit(),
+ '-Djava.io.tmpdir=' + arvados.current_task().tmpdir,
+ '-jar', os.path.join(install_path(), 'GenomeAnalysisTK.jar')]
+ execargs += [str(arg) for arg in kwargs.pop('args', [])]
+ sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
+ return arvados.util.run_command(execargs, **kwargs)
+
--- /dev/null
+import os
+import re
+import sys
+import subprocess
+
+def pipe_setup(pipes, name):
+ pipes[name,'r'], pipes[name,'w'] = os.pipe()
+
+def pipe_closeallbut(pipes, *keepus):
+ for n,m in pipes.keys():
+ if (n,m) not in keepus:
+ os.close(pipes.pop((n,m), None))
+
+def named_fork(children, name):
+ children[name] = os.fork()
+ return children[name]
+
+def waitpid_and_check_children(children):
+ """
+ Given a dict of childname->pid, wait for each child process to
+ finish, and report non-zero exit status on stderr. Return True if
+ all children exited 0.
+ """
+ all_ok = True
+ for (childname, pid) in children.items():
+ # all_ok must be on RHS here -- we need to call waitpid() on
+ # every child, even if all_ok is already False.
+ all_ok = waitpid_and_check_exit(pid, childname) and all_ok
+ return all_ok
+
+def waitpid_and_check_exit(pid, childname=''):
+ """
+ Wait for a child process to finish. If it exits non-zero, report
+ exit status on stderr (mentioning the given childname) and return
+ False. If it exits zero, return True.
+ """
+ _, childstatus = os.waitpid(pid, 0)
+ exitvalue = childstatus >> 8
+ signal = childstatus & 127
+ dumpedcore = childstatus & 128
+ if childstatus != 0:
+ sys.stderr.write("%s child %d failed: exit %d signal %d core %s\n"
+ % (childname, pid, exitvalue, signal,
+ ('y' if dumpedcore else 'n')))
+ return False
+ return True
+
--- /dev/null
+import arvados
+import re
+import os
+import sys
+import fcntl
+import subprocess
+
+picard_install_path = None
+
+def install_path():
+ global picard_install_path
+ if picard_install_path:
+ return picard_install_path
+ zipball = arvados.current_job()['script_parameters']['picard_zip']
+ extracted = arvados.util.zipball_extract(
+ zipball = zipball,
+ path = 'picard')
+ for f in os.listdir(extracted):
+ if (re.search(r'^picard-tools-[\d\.]+$', f) and
+ os.path.exists(os.path.join(extracted, f, '.'))):
+ picard_install_path = os.path.join(extracted, f)
+ break
+ if not picard_install_path:
+ raise Exception("picard-tools-{version} directory not found in %s" %
+ zipball)
+ return picard_install_path
+
+def run(module, **kwargs):
+ kwargs.setdefault('cwd', arvados.current_task().tmpdir)
+ execargs = ['java',
+ '-Xmx1500m',
+ '-Djava.io.tmpdir=' + arvados.current_task().tmpdir,
+ '-jar', os.path.join(install_path(), module + '.jar')]
+ execargs += [str(arg) for arg in kwargs.pop('args', [])]
+ for key, value in kwargs.pop('params', {}).items():
+ execargs += [key.upper() + '=' + str(value)]
+ sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
+ return arvados.util.run_command(execargs, **kwargs)
--- /dev/null
+import arvados
+import re
+import os
+import sys
+import fcntl
+import subprocess
+
+samtools_path = None
+
+def samtools_install_path():
+ global samtools_path
+ if samtools_path:
+ return samtools_path
+ samtools_path = arvados.util.tarball_extract(
+ tarball = arvados.current_job()['script_parameters']['samtools_tgz'],
+ path = 'samtools')
+
+ # build "samtools" binary
+ lockfile = open(os.path.split(samtools_path)[0] + '.samtools-make.lock',
+ 'w')
+ fcntl.flock(lockfile, fcntl.LOCK_EX)
+ arvados.util.run_command(['make', '-j16'], cwd=samtools_path)
+ lockfile.close()
+
+ return samtools_path
+
+def samtools_binary():
+ return os.path.join(samtools_install_path(), 'samtools')
+
+def run(command, command_args, **kwargs):
+ execargs = [samtools_binary(),
+ command]
+ execargs += command_args
+ sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
+ arvados.util.run_command(
+ execargs,
+ cwd=arvados.current_task().tmpdir,
+ stdin=kwargs.get('stdin', subprocess.PIPE),
+ stderr=kwargs.get('stderr', sys.stderr),
+ stdout=kwargs.get('stdout', sys.stderr))
+
+def one_task_per_bam_file(if_sequence=0, and_end_task=True):
+ if if_sequence != arvados.current_task()['sequence']:
+ return
+ job_input = arvados.current_job()['script_parameters']['input']
+ cr = arvados.CollectionReader(job_input)
+ bam = {}
+ bai = {}
+ for s in cr.all_streams():
+ for f in s.all_files():
+ if re.search(r'\.bam$', f.name()):
+ bam[s.name(), f.name()] = f
+ elif re.search(r'\.bai$', f.name()):
+ bai[s.name(), f.name()] = f
+ for ((s_name, f_name), bam_f) in bam.items():
+ bai_f = bai.get((s_name, re.sub(r'bam$', 'bai', f_name)), None)
+ task_input = bam_f.as_manifest()
+ if bai_f:
+ task_input += bai_f.as_manifest()
+ new_task_attrs = {
+ 'job_uuid': arvados.current_job()['uuid'],
+ 'created_by_job_task_uuid': arvados.current_task()['uuid'],
+ 'sequence': if_sequence + 1,
+ 'parameters': {
+ 'input': task_input
+ }
+ }
+ arvados.api().job_tasks().create(body=new_task_attrs).execute()
+ if and_end_task:
+ arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'],
+ body={'success':True}
+ ).execute()
+ exit(0)
--- /dev/null
+#!/usr/bin/env python
+
+import arvados
+import arvados_bwa
+import arvados_samtools
+import os
+import re
+import sys
+import subprocess
+
+arvados_bwa.one_task_per_pair_input_file(if_sequence=0, and_end_task=True)
+
+this_job = arvados.current_job()
+this_task = arvados.current_task()
+ref_dir = arvados.util.collection_extract(
+ collection = this_job['script_parameters']['reference_index'],
+ path = 'reference',
+ decompress = False)
+
+ref_basename = None
+for f in os.listdir(ref_dir):
+ basename = re.sub(r'\.bwt$', '', f)
+ if basename != f:
+ ref_basename = os.path.join(ref_dir, basename)
+if ref_basename == None:
+ raise Exception("Could not find *.bwt in reference collection.")
+
+tmp_dir = arvados.current_task().tmpdir
+
+class Aligner:
+ def input_filename(self):
+ for s in arvados.CollectionReader(self.collection).all_streams():
+ for f in s.all_files():
+ return f.decompressed_name()
+ def generate_input(self):
+ for s in arvados.CollectionReader(self.collection).all_streams():
+ for f in s.all_files():
+ for s in f.readall_decompressed():
+ yield s
+ def aln(self, input_param):
+ self.collection = this_task['parameters'][input_param]
+ reads_filename = os.path.join(tmp_dir, self.input_filename())
+ aln_filename = os.path.join(tmp_dir, self.input_filename() + '.sai')
+ reads_pipe_r, reads_pipe_w = os.pipe()
+ if os.fork() == 0:
+ os.close(reads_pipe_r)
+ reads_file = open(reads_filename, 'wb')
+ for s in self.generate_input():
+ if len(s) != os.write(reads_pipe_w, s):
+ raise Exception("short write")
+ reads_file.write(s)
+ reads_file.close()
+ os.close(reads_pipe_w)
+ sys.exit(0)
+ os.close(reads_pipe_w)
+
+ aln_file = open(aln_filename, 'wb')
+ bwa_proc = subprocess.Popen(
+ [arvados_bwa.bwa_binary(),
+ 'aln', '-t', '16',
+ ref_basename,
+ '-'],
+ stdin=os.fdopen(reads_pipe_r, 'rb', 2**20),
+ stdout=aln_file)
+ aln_file.close()
+ return reads_filename, aln_filename
+
+reads_1, alignments_1 = Aligner().aln('input_1')
+reads_2, alignments_2 = Aligner().aln('input_2')
+pid1, exit1 = os.wait()
+pid2, exit2 = os.wait()
+if exit1 != 0 or exit2 != 0:
+ raise Exception("bwa aln exited non-zero (0x%x, 0x%x)" % (exit1, exit2))
+
+# output alignments in sam format to pipe
+sam_pipe_r, sam_pipe_w = os.pipe()
+sam_pid = os.fork()
+if sam_pid != 0:
+ # parent
+ os.close(sam_pipe_w)
+else:
+ # child
+ os.close(sam_pipe_r)
+ arvados_bwa.run('sampe',
+ [ref_basename,
+ alignments_1, alignments_2,
+ reads_1, reads_2],
+ stdout=os.fdopen(sam_pipe_w, 'wb', 2**20))
+ sys.exit(0)
+
+# convert sam (sam_pipe_r) to bam (bam_pipe_w)
+bam_pipe_r, bam_pipe_w = os.pipe()
+bam_pid = os.fork()
+if bam_pid != 0:
+ # parent
+ os.close(bam_pipe_w)
+ os.close(sam_pipe_r)
+else:
+ # child
+ os.close(bam_pipe_r)
+ arvados_samtools.run('view',
+ ['-S', '-b',
+ '-'],
+ stdin=os.fdopen(sam_pipe_r, 'rb', 2**20),
+ stdout=os.fdopen(bam_pipe_w, 'wb', 2**20))
+ sys.exit(0)
+
+# copy bam (bam_pipe_r) to Keep
+out_bam_filename = os.path.split(reads_1)[-1] + '.bam'
+out = arvados.CollectionWriter()
+out.start_new_stream()
+out.start_new_file(out_bam_filename)
+out.write(os.fdopen(bam_pipe_r, 'rb', 2**20))
+
+# make sure everyone exited nicely
+pid3, exit3 = os.waitpid(sam_pid, 0)
+if exit3 != 0:
+ raise Exception("bwa sampe exited non-zero (0x%x)" % exit3)
+pid4, exit4 = os.waitpid(bam_pid, 0)
+if exit4 != 0:
+ raise Exception("samtools view exited non-zero (0x%x)" % exit4)
+
+# proclaim success
+this_task.set_output(out.finish())
--- /dev/null
+#!/usr/bin/env python
+
+import arvados
+import arvados_bwa
+import os
+import re
+import sys
+
+this_job = arvados.current_job()
+this_task = arvados.current_task()
+ref_dir = arvados.util.collection_extract(
+ collection = this_job['script_parameters']['input'],
+ path = 'reference',
+ decompress = False)
+
+ref_fasta_files = (os.path.join(ref_dir, f)
+ for f in os.listdir(ref_dir)
+ if re.search(r'\.fasta(\.gz)?$', f))
+
+# build reference index
+arvados_bwa.run('index',
+ ['-a', 'bwtsw'] + list(ref_fasta_files))
+
+# move output files to new empty directory
+out_dir = os.path.join(arvados.current_task().tmpdir, 'out')
+arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr)
+os.mkdir(out_dir)
+for f in os.listdir(ref_dir):
+ if re.search(r'\.(amb|ann|bwt|pac|rbwt|rpac|rsa|sa)$', f):
+ sys.stderr.write("bwa output: %s (%d)\n" %
+ (f, os.stat(os.path.join(ref_dir, f)).st_size))
+ os.rename(os.path.join(ref_dir, f),
+ os.path.join(out_dir, f))
+
+# store output
+out = arvados.CollectionWriter()
+out.write_directory_tree(out_dir, max_manifest_depth=0)
+this_task.set_output(out.finish())
--- /dev/null
+#!/usr/bin/env python
+
+import arvados
+import os
+import re
+import sys
+import subprocess
+import arvados_picard
+from arvados_ipc import *
+
+arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
+
+this_job = arvados.current_job()
+this_task = arvados.current_task()
+ref_dir = arvados.util.collection_extract(
+ collection = this_job['script_parameters']['reference'],
+ path = 'reference',
+ decompress = True)
+ref_fasta_files = [os.path.join(ref_dir, f)
+ for f in os.listdir(ref_dir)
+ if re.search(r'\.fasta(\.gz)?$', f)]
+input_collection = this_task['parameters']['input']
+
+for s in arvados.CollectionReader(input_collection).all_streams():
+ for f in s.all_files():
+ input_stream_name = s.name()
+ input_file_name = f.name()
+ break
+
+# Unfortunately, picard FixMateInformation cannot read from a pipe. We
+# must copy the input to a temporary file before running picard.
+input_bam_path = os.path.join(this_task.tmpdir, input_file_name)
+with open(input_bam_path, 'wb') as bam:
+ for s in arvados.CollectionReader(input_collection).all_streams():
+ for f in s.all_files():
+ for s in f.readall():
+ bam.write(s)
+
+children = {}
+pipes = {}
+
+pipe_setup(pipes, 'fixmate')
+if 0==named_fork(children, 'fixmate'):
+ pipe_closeallbut(pipes, ('fixmate', 'w'))
+ arvados_picard.run(
+ 'FixMateInformation',
+ params={
+ 'i': input_bam_path,
+ 'o': '/dev/stdout',
+ 'quiet': 'true',
+ 'so': 'coordinate',
+ 'validation_stringency': 'LENIENT',
+ 'compression_level': 0
+ },
+ stdout=os.fdopen(pipes['fixmate','w'], 'wb', 2**20))
+ os._exit(0)
+os.close(pipes.pop(('fixmate','w'), None))
+
+pipe_setup(pipes, 'sortsam')
+if 0==named_fork(children, 'sortsam'):
+ pipe_closeallbut(pipes, ('fixmate', 'r'), ('sortsam', 'w'))
+ arvados_picard.run(
+ 'SortSam',
+ params={
+ 'i': '/dev/stdin',
+ 'o': '/dev/stdout',
+ 'quiet': 'true',
+ 'so': 'coordinate',
+ 'validation_stringency': 'LENIENT',
+ 'compression_level': 0
+ },
+ stdin=os.fdopen(pipes['fixmate','r'], 'rb', 2**20),
+ stdout=os.fdopen(pipes['sortsam','w'], 'wb', 2**20))
+ os._exit(0)
+
+pipe_setup(pipes, 'reordersam')
+if 0==named_fork(children, 'reordersam'):
+ pipe_closeallbut(pipes, ('sortsam', 'r'), ('reordersam', 'w'))
+ arvados_picard.run(
+ 'ReorderSam',
+ params={
+ 'i': '/dev/stdin',
+ 'o': '/dev/stdout',
+ 'reference': ref_fasta_files[0],
+ 'quiet': 'true',
+ 'validation_stringency': 'LENIENT',
+ 'compression_level': 0
+ },
+ stdin=os.fdopen(pipes['sortsam','r'], 'rb', 2**20),
+ stdout=os.fdopen(pipes['reordersam','w'], 'wb', 2**20))
+ os._exit(0)
+
+pipe_setup(pipes, 'addrg')
+if 0==named_fork(children, 'addrg'):
+ pipe_closeallbut(pipes, ('reordersam', 'r'), ('addrg', 'w'))
+ arvados_picard.run(
+ 'AddOrReplaceReadGroups',
+ params={
+ 'i': '/dev/stdin',
+ 'o': '/dev/stdout',
+ 'quiet': 'true',
+ 'rglb': this_job['script_parameters'].get('rglb', 0),
+ 'rgpl': this_job['script_parameters'].get('rgpl', 'illumina'),
+ 'rgpu': this_job['script_parameters'].get('rgpu', 0),
+ 'rgsm': this_job['script_parameters'].get('rgsm', 0),
+ 'validation_stringency': 'LENIENT'
+ },
+ stdin=os.fdopen(pipes['reordersam','r'], 'rb', 2**20),
+ stdout=os.fdopen(pipes['addrg','w'], 'wb', 2**20))
+ os._exit(0)
+
+pipe_setup(pipes, 'bammanifest')
+pipe_setup(pipes, 'bam')
+pipe_setup(pipes, 'casm_in')
+if 0==named_fork(children, 'bammanifest'):
+ pipe_closeallbut(pipes,
+ ('addrg', 'r'),
+ ('bammanifest', 'w'),
+ ('bam', 'w'),
+ ('casm_in', 'w'))
+ out = arvados.CollectionWriter()
+ out.start_new_stream(input_stream_name)
+ out.start_new_file(input_file_name)
+ while True:
+ buf = os.read(pipes['addrg','r'], 2**20)
+ if len(buf) == 0:
+ break
+ os.write(pipes['bam','w'], buf)
+ os.write(pipes['casm_in','w'], buf)
+ out.write(buf)
+ os.write(pipes['bammanifest','w'], out.manifest_text())
+ os.close(pipes['bammanifest','w'])
+ os._exit(0)
+
+pipe_setup(pipes, 'casm')
+if 0 == named_fork(children, 'casm'):
+ pipe_closeallbut(pipes, ('casm_in', 'r'), ('casm', 'w'))
+ arvados_picard.run(
+ 'CollectAlignmentSummaryMetrics',
+ params={
+ 'input': '/dev/fd/' + str(pipes['casm_in','r']),
+ 'output': '/dev/fd/' + str(pipes['casm','w']),
+ 'reference_sequence': ref_fasta_files[0],
+ 'validation_stringency': 'LENIENT',
+ },
+ close_fds=False)
+ os._exit(0)
+
+pipe_setup(pipes, 'index')
+if 0==named_fork(children, 'index'):
+ pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
+ arvados_picard.run(
+ 'BuildBamIndex',
+ params={
+ 'i': '/dev/stdin',
+ 'o': '/dev/stdout',
+ 'quiet': 'true',
+ 'validation_stringency': 'LENIENT'
+ },
+ stdin=os.fdopen(pipes['bam','r'], 'rb', 2**20),
+ stdout=os.fdopen(pipes['index','w'], 'wb', 2**20))
+ os._exit(0)
+
+pipe_setup(pipes, 'indexmanifest')
+if 0==named_fork(children, 'indexmanifest'):
+ pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
+ out = arvados.CollectionWriter()
+ out.start_new_stream(input_stream_name)
+ out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
+ while True:
+ buf = os.read(pipes['index','r'], 2**20)
+ if len(buf) == 0:
+ break
+ out.write(buf)
+ os.write(pipes['indexmanifest','w'], out.manifest_text())
+ os.close(pipes['indexmanifest','w'])
+ os._exit(0)
+
+pipe_closeallbut(pipes,
+ ('bammanifest', 'r'),
+ ('indexmanifest', 'r'),
+ ('casm', 'r'))
+
+outmanifest = ''
+
+for which in ['bammanifest', 'indexmanifest']:
+ with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
+ while True:
+ buf = f.read()
+ if buf == '':
+ break
+ outmanifest += buf
+
+casm_out = arvados.CollectionWriter()
+casm_out.start_new_stream(input_stream_name)
+casm_out.start_new_file(input_file_name + '.casm.tsv')
+casm_out.write(os.fdopen(pipes.pop(('casm','r'))))
+
+outmanifest += casm_out.manifest_text()
+
+all_ok = True
+for (childname, pid) in children.items():
+ all_ok = all_ok and waitpid_and_check_exit(pid, childname)
+
+if all_ok:
+ this_task.set_output(outmanifest)
+else:
+ sys.exit(1)