Add example scripts and libraries for bwa, samtools, picard, gatk2
authorTom Clegg <tom@clinicalfuture.com>
Mon, 9 Dec 2013 18:25:48 +0000 (10:25 -0800)
committerTom Clegg <tom@clinicalfuture.com>
Mon, 9 Dec 2013 18:25:48 +0000 (10:25 -0800)
closes #1598

crunch_scripts/GATK2-bqsr [new file with mode: 0755]
crunch_scripts/GATK2-merge-call [new file with mode: 0755]
crunch_scripts/GATK2-realign [new file with mode: 0755]
crunch_scripts/arvados_bwa.py [new file with mode: 0644]
crunch_scripts/arvados_gatk2.py [new file with mode: 0644]
crunch_scripts/arvados_ipc.py [new file with mode: 0644]
crunch_scripts/arvados_picard.py [new file with mode: 0644]
crunch_scripts/arvados_samtools.py [new file with mode: 0644]
crunch_scripts/bwa-aln [new file with mode: 0755]
crunch_scripts/bwa-index [new file with mode: 0755]
crunch_scripts/picard-gatk2-prep [new file with mode: 0755]

diff --git a/crunch_scripts/GATK2-bqsr b/crunch_scripts/GATK2-bqsr
new file mode 100755 (executable)
index 0000000..cdf86b9
--- /dev/null
@@ -0,0 +1,101 @@
+#!/usr/bin/env python
+
+import os
+import re
+import arvados
+import arvados_gatk2
+import arvados_picard
+import arvados_samtools
+from arvados_ipc import *
+
+class InvalidArgumentError(Exception):
+    pass
+
+arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
+
+this_job = arvados.current_job()
+this_task = arvados.current_task()
+tmpdir = arvados.current_task().tmpdir
+arvados.util.clear_tmpdir()
+
+known_sites_files = arvados.getjobparam(
+    'known_sites',
+    ['dbsnp_137.b37.vcf',
+     'Mills_and_1000G_gold_standard.indels.b37.vcf',
+     ])
+bundle_dir = arvados.util.collection_extract(
+    collection = this_job['script_parameters']['gatk_bundle'],
+    files = [
+        'human_g1k_v37.dict',
+        'human_g1k_v37.fasta',
+        'human_g1k_v37.fasta.fai'
+        ] + known_sites_files + [v + '.idx' for v in known_sites_files],
+    path = 'gatk_bundle')
+ref_fasta_files = [os.path.join(bundle_dir, f)
+                   for f in os.listdir(bundle_dir)
+                   if re.search(r'\.fasta(\.gz)?$', f)]
+
+input_collection = this_task['parameters']['input']
+input_dir = arvados.util.collection_extract(
+    collection = input_collection,
+    path = os.path.join(this_task.tmpdir, 'input'))
+input_bam_files = []
+for f in arvados.util.listdir_recursive(input_dir):
+    if re.search(r'\.bam$', f):
+        input_stream_name, input_file_name = os.path.split(f)
+        input_bam_files += [os.path.join(input_dir, f)]
+if len(input_bam_files) != 1:
+    raise InvalidArgumentError("Expected exactly one bam file per task.")
+
+known_sites_args = []
+for f in known_sites_files:
+    known_sites_args += ['-knownSites', os.path.join(bundle_dir, f)]
+
+recal_file = os.path.join(tmpdir, 'recal.csv')
+
+children = {}
+pipes = {}
+
+arvados_gatk2.run(
+    args=[
+        '-nct', arvados_gatk2.cpus_on_this_node(),
+        '-T', 'BaseRecalibrator',
+        '-R', ref_fasta_files[0],
+        '-I', input_bam_files[0],
+        '-o', recal_file,
+        ] + known_sites_args)
+
+pipe_setup(pipes, 'BQSR')
+if 0 == named_fork(children, 'BQSR'):
+    pipe_closeallbut(pipes, ('BQSR', 'w'))
+    arvados_gatk2.run(
+        args=[
+        '-T', 'PrintReads',
+        '-R', ref_fasta_files[0],
+        '-I', input_bam_files[0],
+        '-o', '/dev/fd/' + str(pipes['BQSR','w']),
+        '-BQSR', recal_file,
+        '--disable_bam_indexing',
+        ],
+        close_fds=False)
+    os._exit(0)
+os.close(pipes.pop(('BQSR','w'), None))
+
+out = arvados.CollectionWriter()
+out.start_new_stream(input_stream_name)
+
+out.start_new_file(input_file_name + '.recal.csv')
+out.write(open(recal_file, 'rb'))
+
+out.start_new_file(input_file_name)
+while True:
+    buf = os.read(pipes['BQSR','r'], 2**20)
+    if len(buf) == 0:
+        break
+    out.write(buf)
+pipe_closeallbut(pipes)
+
+if waitpid_and_check_children(children):
+    this_task.set_output(out.finish())
+else:
+    sys.exit(1)
diff --git a/crunch_scripts/GATK2-merge-call b/crunch_scripts/GATK2-merge-call
new file mode 100755 (executable)
index 0000000..c584449
--- /dev/null
@@ -0,0 +1,239 @@
+#!/usr/bin/env python
+
+import os
+import re
+import string
+import threading
+import arvados
+import arvados_gatk2
+import arvados_picard
+from arvados_ipc import *
+
+class InvalidArgumentError(Exception):
+    pass
+
+this_job = arvados.current_job()
+this_task = arvados.current_task()
+tmpdir = arvados.current_task().tmpdir
+arvados.util.clear_tmpdir()
+
+bundle_dir = arvados.util.collection_extract(
+    collection = this_job['script_parameters']['gatk_bundle'],
+    files = [
+        'human_g1k_v37.dict',
+        'human_g1k_v37.fasta',
+        'human_g1k_v37.fasta.fai',
+        'dbsnp_137.b37.vcf',
+        'dbsnp_137.b37.vcf.idx',
+        ],
+    path = 'gatk_bundle')
+ref_fasta_files = [os.path.join(bundle_dir, f)
+                   for f in os.listdir(bundle_dir)
+                   if re.search(r'\.fasta(\.gz)?$', f)]
+regions_args = []
+if 'regions' in this_job['script_parameters']:
+    regions_dir = arvados.util.collection_extract(
+        collection = this_job['script_parameters']['regions'],
+        path = 'regions')
+    region_padding = int(this_job['script_parameters']['region_padding'])
+    for f in os.listdir(regions_dir):
+        if re.search(r'\.bed$', f):
+            regions_args += [
+                '--intervals', os.path.join(regions_dir, f),
+                '--interval_padding', str(region_padding)
+                ]
+
+
+# Start a child process for each input file, feeding data to picard.
+
+input_child_names = []
+children = {}
+pipes = {}
+
+input_collection = this_job['script_parameters']['input']
+input_index = 0
+for s in arvados.CollectionReader(input_collection).all_streams():
+    for f in s.all_files():
+        if not re.search(r'\.bam$', f.name()):
+            continue
+        input_index += 1
+        childname = 'input-' + str(input_index)
+        input_child_names += [childname]
+        pipe_setup(pipes, childname)
+        childpid = named_fork(children, childname)
+        if childpid == 0:
+            pipe_closeallbut(pipes, (childname, 'w'))
+            for s in f.readall():
+                os.write(pipes[childname, 'w'], s)
+            os.close(pipes[childname, 'w'])
+            os._exit(0)
+        sys.stderr.write("pid %d writing %s to fd %d->%d\n" %
+                         (childpid,
+                          s.name()+'/'+f.name(),
+                          pipes[childname, 'w'],
+                          pipes[childname, 'r']))
+        pipe_closeallbut(pipes, *[(childname, 'r')
+                                  for childname in input_child_names])
+
+
+# Merge-sort the input files to merge.bam
+
+arvados_picard.run(
+    'MergeSamFiles',
+    args=[
+        'I=/dev/fd/' + str(pipes[childname, 'r'])
+        for childname in input_child_names
+        ],
+    params={
+        'o': 'merge.bam',
+        'quiet': 'true',
+        'so': 'coordinate',
+        'use_threading': 'true',
+        'create_index': 'true',
+        'validation_stringency': 'LENIENT',
+        },
+    close_fds=False,
+    )
+pipe_closeallbut(pipes)
+
+
+# Run CoverageBySample on merge.bam
+
+pipe_setup(pipes, 'stats_log')
+pipe_setup(pipes, 'stats_out')
+if 0 == named_fork(children, 'GATK'):
+    pipe_closeallbut(pipes,
+                     ('stats_log', 'w'),
+                     ('stats_out', 'w'))
+    arvados_gatk2.run(
+        args=[
+            '-T', 'CoverageBySample',
+            '-R', ref_fasta_files[0],
+            '-I', 'merge.bam',
+            '-o', '/dev/fd/' + str(pipes['stats_out', 'w']),
+            '--log_to_file', '/dev/fd/' + str(pipes['stats_log', 'w']),
+            ]
+        + regions_args,
+        close_fds=False)
+    pipe_closeallbut(pipes)
+    os._exit(0)
+pipe_closeallbut(pipes, ('stats_log', 'r'), ('stats_out', 'r'))
+
+
+# Start two threads to read from CoverageBySample pipes
+
+class ExceptionPropagatingThread(threading.Thread):
+    """
+    If a subclassed thread calls _raise(e) in run(), running join() on
+    the thread will raise e in the thread that calls join().
+    """
+    def __init__(self, *args, **kwargs):
+        super(ExceptionPropagatingThread, self).__init__(*args, **kwargs)
+        self.__exception = None
+    def join(self, *args, **kwargs):
+        ret = super(ExceptionPropagatingThread, self).join(*args, **kwargs)
+        if self.__exception:
+            raise self.__exception
+        return ret
+    def _raise(self, exception):
+        self.__exception = exception
+
+class StatsLogReader(ExceptionPropagatingThread):
+    def __init__(self, **kwargs):
+        super(StatsLogReader, self).__init__()
+        self.args = kwargs
+    def run(self):
+        try:
+            for logline in self.args['infile']:
+                x = re.search('Processing (\d+) bp from intervals', logline)
+                if x:
+                    self._total_bp = int(x.group(1))
+        except Exception as e:
+            self._raise(e)
+    def total_bp(self):
+        self.join()
+        return self._total_bp
+stats_log_thr = StatsLogReader(infile=os.fdopen(pipes.pop(('stats_log', 'r'))))
+stats_log_thr.start()
+
+class StatsOutReader(ExceptionPropagatingThread):
+    """
+    Read output of CoverageBySample and collect a histogram of
+    coverage (last column) -> number of loci (number of rows).
+    """
+    def __init__(self, **kwargs):
+        super(StatsOutReader, self).__init__()
+        self.args = kwargs
+    def run(self):
+        try:
+            hist = [0]
+            histtot = 0
+            for line in self.args['infile']:
+                try:
+                    i = int(string.split(line)[-1])
+                except ValueError:
+                    continue
+                if i >= 1:
+                    if len(hist) <= i:
+                        hist.extend([0 for x in range(1+i-len(hist))])
+                    hist[i] += 1
+                    histtot += 1
+            hist[0] = stats_log_thr.total_bp() - histtot
+            self._histogram = hist
+        except Exception as e:
+            self._raise(e)
+    def histogram(self):
+        self.join()
+        return self._histogram
+stats_out_thr = StatsOutReader(infile=os.fdopen(pipes.pop(('stats_out', 'r'))))
+stats_out_thr.start()
+
+
+# Run UnifiedGenotyper on merge.bam
+
+arvados_gatk2.run(
+    args=[
+        '-nt', arvados_gatk2.cpus_on_this_node(),
+        '-T', 'UnifiedGenotyper',
+        '-R', ref_fasta_files[0],
+        '-I', 'merge.bam',
+        '-o', os.path.join(tmpdir, 'out.vcf'),
+        '--dbsnp', os.path.join(bundle_dir, 'dbsnp_137.b37.vcf'),
+        '-metrics', 'UniGenMetrics',
+        '-A', 'DepthOfCoverage',
+        '-A', 'AlleleBalance',
+        '-A', 'QualByDepth',
+        '-A', 'HaplotypeScore',
+        '-A', 'MappingQualityRankSumTest',
+        '-A', 'ReadPosRankSumTest',
+        '-A', 'FisherStrand',
+        '-glm', 'both',
+        ]
+    + regions_args
+    + arvados.getjobparam('GATK2_UnifiedGenotyper_args',[]))
+
+# Copy the output VCF file to Keep
+
+out = arvados.CollectionWriter()
+out.start_new_stream()
+out.start_new_file('out.vcf')
+out.write(open(os.path.join(tmpdir, 'out.vcf'), 'rb'))
+
+
+# Write statistics to Keep
+
+out.start_new_file('mincoverage_nlocus.csv')
+sofar = 0
+hist = stats_out_thr.histogram()
+total_bp = stats_log_thr.total_bp()
+for i in range(len(hist)):
+    out.write("%d,%d,%f\n" %
+              (i,
+               total_bp - sofar,
+               100.0 * (total_bp - sofar) / total_bp))
+    sofar += hist[i]
+
+if waitpid_and_check_children(children):
+    this_task.set_output(out.finish())
+else:
+    sys.exit(1)
diff --git a/crunch_scripts/GATK2-realign b/crunch_scripts/GATK2-realign
new file mode 100755 (executable)
index 0000000..be45828
--- /dev/null
@@ -0,0 +1,160 @@
+#!/usr/bin/env python
+
+import os
+import re
+import arvados
+import arvados_gatk2
+import arvados_picard
+import arvados_samtools
+from arvados_ipc import *
+
+class InvalidArgumentError(Exception):
+    pass
+
+arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
+
+this_job = arvados.current_job()
+this_task = arvados.current_task()
+tmpdir = arvados.current_task().tmpdir
+arvados.util.clear_tmpdir()
+
+known_sites_files = arvados.getjobparam(
+    'known_sites',
+    ['dbsnp_137.b37.vcf',
+     'Mills_and_1000G_gold_standard.indels.b37.vcf',
+     ])
+bundle_dir = arvados.util.collection_extract(
+    collection = this_job['script_parameters']['gatk_bundle'],
+    files = [
+        'human_g1k_v37.dict',
+        'human_g1k_v37.fasta',
+        'human_g1k_v37.fasta.fai'
+        ] + known_sites_files + [v + '.idx' for v in known_sites_files],
+    path = 'gatk_bundle')
+ref_fasta_files = [os.path.join(bundle_dir, f)
+                   for f in os.listdir(bundle_dir)
+                   if re.search(r'\.fasta(\.gz)?$', f)]
+regions_args = []
+if 'regions' in this_job['script_parameters']:
+    regions_dir = arvados.util.collection_extract(
+        collection = this_job['script_parameters']['regions'],
+        path = 'regions')
+    region_padding = int(this_job['script_parameters']['region_padding'])
+    for f in os.listdir(regions_dir):
+        if re.search(r'\.bed$', f):
+            regions_args += [
+                '--intervals', os.path.join(regions_dir, f),
+                '--interval_padding', str(region_padding)
+                ]
+
+input_collection = this_task['parameters']['input']
+input_dir = arvados.util.collection_extract(
+    collection = input_collection,
+    path = os.path.join(this_task.tmpdir, 'input'))
+input_bam_files = []
+for f in arvados.util.listdir_recursive(input_dir):
+    if re.search(r'\.bam$', f):
+        input_stream_name, input_file_name = os.path.split(f)
+        input_bam_files += [os.path.join(input_dir, f)]
+if len(input_bam_files) != 1:
+    raise InvalidArgumentError("Expected exactly one bam file per task.")
+
+known_sites_args = []
+for f in known_sites_files:
+    known_sites_args += ['-known', os.path.join(bundle_dir, f)]
+
+children = {}
+pipes = {}
+
+arvados_gatk2.run(
+    args=[
+        '-nt', arvados_gatk2.cpus_per_task(),
+        '-T', 'RealignerTargetCreator',
+        '-R', ref_fasta_files[0],
+        '-I', input_bam_files[0],
+        '-o', os.path.join(tmpdir, 'intervals.list')
+        ] + known_sites_args + regions_args)
+
+pipe_setup(pipes, 'IndelRealigner')
+if 0 == named_fork(children, 'IndelRealigner'):
+    pipe_closeallbut(pipes, ('IndelRealigner', 'w'))
+    arvados_gatk2.run(
+        args=[
+        '-T', 'IndelRealigner',
+        '-R', ref_fasta_files[0],
+        '-targetIntervals', os.path.join(tmpdir, 'intervals.list'),
+        '-I', input_bam_files[0],
+        '-o', '/dev/fd/' + str(pipes['IndelRealigner','w']),
+        '--disable_bam_indexing',
+        ] + known_sites_args + regions_args,
+        close_fds=False)
+    os._exit(0)
+os.close(pipes.pop(('IndelRealigner','w'), None))
+
+pipe_setup(pipes, 'bammanifest')
+pipe_setup(pipes, 'bam')
+if 0==named_fork(children, 'bammanifest'):
+    pipe_closeallbut(pipes,
+                     ('IndelRealigner', 'r'),
+                     ('bammanifest', 'w'),
+                     ('bam', 'w'))
+    out = arvados.CollectionWriter()
+    out.start_new_stream(input_stream_name)
+    out.start_new_file(input_file_name)
+    while True:
+        buf = os.read(pipes['IndelRealigner','r'], 2**20)
+        if len(buf) == 0:
+            break
+        os.write(pipes['bam','w'], buf)
+        out.write(buf)
+    os.write(pipes['bammanifest','w'], out.manifest_text())
+    os.close(pipes['bammanifest','w'])
+    os._exit(0)
+
+pipe_setup(pipes, 'index')
+if 0==named_fork(children, 'index'):
+    pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
+    arvados_picard.run(
+        'BuildBamIndex',
+        params={
+            'i': '/dev/fd/' + str(pipes['bam','r']),
+            'o': '/dev/fd/' + str(pipes['index','w']),
+            'quiet': 'true',
+            'validation_stringency': 'LENIENT'
+            },
+        close_fds=False)
+    os._exit(0)
+
+pipe_setup(pipes, 'indexmanifest')
+if 0==named_fork(children, 'indexmanifest'):
+    pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
+    out = arvados.CollectionWriter()
+    out.start_new_stream(input_stream_name)
+    out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
+    while True:
+        buf = os.read(pipes['index','r'], 2**20)
+        if len(buf) == 0:
+            break
+        out.write(buf)
+    os.write(pipes['indexmanifest','w'], out.manifest_text())
+    os.close(pipes['indexmanifest','w'])
+    os._exit(0)
+
+pipe_closeallbut(pipes, ('bammanifest', 'r'), ('indexmanifest', 'r'))
+outmanifest = ''
+for which in ['bammanifest', 'indexmanifest']:
+    with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
+        while True:
+            buf = f.read()
+            if buf == '':
+                break
+            outmanifest += buf
+
+all_ok = True
+for (childname, pid) in children.items():
+    all_ok = all_ok and waitpid_and_check_exit(pid, childname)
+
+if all_ok:
+    this_task.set_output(outmanifest)
+else:
+    sys.exit(1)
diff --git a/crunch_scripts/arvados_bwa.py b/crunch_scripts/arvados_bwa.py
new file mode 100644 (file)
index 0000000..b34d72f
--- /dev/null
@@ -0,0 +1,80 @@
+import arvados
+import re
+import os
+import sys
+import fcntl
+import subprocess
+
+bwa_install_path = None
+
+def setup():
+    global bwa_install_path
+    if bwa_install_path:
+        return bwa_install_path
+    bwa_path = arvados.util.tarball_extract(
+        tarball = arvados.current_job()['script_parameters']['bwa_tbz'],
+        path = 'bwa')
+
+    # build "bwa" binary
+    lockfile = open(os.path.split(bwa_path)[0] + '.bwa-make.lock',
+                    'w')
+    fcntl.flock(lockfile, fcntl.LOCK_EX)
+    arvados.util.run_command(['make', '-j16'], cwd=bwa_path)
+    lockfile.close()
+
+    bwa_install_path = bwa_path
+    return bwa_path
+
+def bwa_binary():
+    global bwa_install_path
+    return os.path.join(bwa_install_path, 'bwa')
+
+def run(command, command_args, **kwargs):
+    global bwa_install_path
+    execargs = [bwa_binary(),
+                command]
+    execargs += command_args
+    sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
+    arvados.util.run_command(
+        execargs,
+        cwd=arvados.current_task().tmpdir,
+        stderr=sys.stderr,
+        stdin=kwargs.get('stdin', subprocess.PIPE),
+        stdout=kwargs.get('stdout', sys.stderr))
+
+def one_task_per_pair_input_file(if_sequence=0, and_end_task=True):
+    if if_sequence != arvados.current_task()['sequence']:
+        return
+    job_input = arvados.current_job()['script_parameters']['input']
+    cr = arvados.CollectionReader(job_input)
+    all_files = []
+    for s in cr.all_streams():
+        all_files += list(s.all_files())
+    for s in cr.all_streams():
+        for left_file in s.all_files():
+            left_name = left_file.name()
+            right_file = None
+            right_name = re.sub(r'(.*_)1\.', '\g<1>2.', left_name)
+            if right_name == left_name:
+                continue
+            for f2 in s.all_files():
+                if right_name == f2.name():
+                    right_file = f2
+            if right_file != None:
+                new_task_attrs = {
+                    'job_uuid': arvados.current_job()['uuid'],
+                    'created_by_job_task_uuid': arvados.current_task()['uuid'],
+                    'sequence': if_sequence + 1,
+                    'parameters': {
+                        'input_1':left_file.as_manifest(),
+                        'input_2':right_file.as_manifest()
+                        }
+                    }
+                arvados.api().job_tasks().create(body=new_task_attrs).execute()
+    if and_end_task:
+        arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'],
+                                   body={'success':True}
+                                   ).execute()
+        exit(0)
+
+setup()
diff --git a/crunch_scripts/arvados_gatk2.py b/crunch_scripts/arvados_gatk2.py
new file mode 100644 (file)
index 0000000..47951ec
--- /dev/null
@@ -0,0 +1,48 @@
+import arvados
+import re
+import os
+import sys
+import fcntl
+import subprocess
+
+gatk2_install_path = None
+
+def install_path():
+    global gatk2_install_path
+    if gatk2_install_path:
+        return gatk2_install_path
+    gatk2_install_path = arvados.util.tarball_extract(
+        tarball = arvados.current_job()['script_parameters']['gatk_tbz'],
+        path = 'gatk2')
+    return gatk2_install_path
+
+def memory_limit():
+    taskspernode = int(os.environ.get('CRUNCH_NODE_SLOTS', '1'))
+    with open('/proc/meminfo', 'r') as f:
+        ram = int(re.search(r'MemTotal:\s*(\d+)', f.read()).group(1)) / 1024
+    if taskspernode > 1:
+        ram = ram / taskspernode
+    return max(ram-700, 500)
+
+def cpus_on_this_node():
+    with open('/proc/cpuinfo', 'r') as cpuinfo:
+        return max(int(os.environ.get('SLURM_CPUS_ON_NODE', 1)),
+                   len(re.findall(r'^processor\s*:\s*\d',
+                                  cpuinfo.read(),
+                                  re.MULTILINE)))
+
+def cpus_per_task():
+    return max(1, (cpus_on_this_node()
+                   / int(os.environ.get('CRUNCH_NODE_SLOTS', 1))))
+
+def run(**kwargs):
+    kwargs.setdefault('cwd', arvados.current_task().tmpdir)
+    kwargs.setdefault('stdout', sys.stderr)
+    execargs = ['java',
+                '-Xmx%dm' % memory_limit(),
+                '-Djava.io.tmpdir=' + arvados.current_task().tmpdir,
+                '-jar', os.path.join(install_path(), 'GenomeAnalysisTK.jar')]
+    execargs += [str(arg) for arg in kwargs.pop('args', [])]
+    sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
+    return arvados.util.run_command(execargs, **kwargs)
+
diff --git a/crunch_scripts/arvados_ipc.py b/crunch_scripts/arvados_ipc.py
new file mode 100644 (file)
index 0000000..7197e97
--- /dev/null
@@ -0,0 +1,47 @@
+import os
+import re
+import sys
+import subprocess
+
+def pipe_setup(pipes, name):
+    pipes[name,'r'], pipes[name,'w'] = os.pipe()
+
+def pipe_closeallbut(pipes, *keepus):
+    for n,m in pipes.keys():
+        if (n,m) not in keepus:
+            os.close(pipes.pop((n,m), None))
+
+def named_fork(children, name):
+    children[name] = os.fork()
+    return children[name]
+
+def waitpid_and_check_children(children):
+    """
+    Given a dict of childname->pid, wait for each child process to
+    finish, and report non-zero exit status on stderr. Return True if
+    all children exited 0.
+    """
+    all_ok = True
+    for (childname, pid) in children.items():
+        # all_ok must be on RHS here -- we need to call waitpid() on
+        # every child, even if all_ok is already False.
+        all_ok = waitpid_and_check_exit(pid, childname) and all_ok
+    return all_ok
+
+def waitpid_and_check_exit(pid, childname=''):
+    """
+    Wait for a child process to finish. If it exits non-zero, report
+    exit status on stderr (mentioning the given childname) and return
+    False. If it exits zero, return True.
+    """
+    _, childstatus = os.waitpid(pid, 0)
+    exitvalue = childstatus >> 8
+    signal = childstatus & 127
+    dumpedcore = childstatus & 128
+    if childstatus != 0:
+        sys.stderr.write("%s child %d failed: exit %d signal %d core %s\n"
+                         % (childname, pid, exitvalue, signal,
+                            ('y' if dumpedcore else 'n')))
+        return False
+    return True
+
diff --git a/crunch_scripts/arvados_picard.py b/crunch_scripts/arvados_picard.py
new file mode 100644 (file)
index 0000000..de9adeb
--- /dev/null
@@ -0,0 +1,38 @@
+import arvados
+import re
+import os
+import sys
+import fcntl
+import subprocess
+
+picard_install_path = None
+
+def install_path():
+    global picard_install_path
+    if picard_install_path:
+        return picard_install_path
+    zipball = arvados.current_job()['script_parameters']['picard_zip']
+    extracted = arvados.util.zipball_extract(
+        zipball = zipball,
+        path = 'picard')
+    for f in os.listdir(extracted):
+        if (re.search(r'^picard-tools-[\d\.]+$', f) and
+            os.path.exists(os.path.join(extracted, f, '.'))):
+            picard_install_path = os.path.join(extracted, f)
+            break
+    if not picard_install_path:
+        raise Exception("picard-tools-{version} directory not found in %s" %
+                        zipball)
+    return picard_install_path
+
+def run(module, **kwargs):
+    kwargs.setdefault('cwd', arvados.current_task().tmpdir)
+    execargs = ['java',
+                '-Xmx1500m',
+                '-Djava.io.tmpdir=' + arvados.current_task().tmpdir,
+                '-jar', os.path.join(install_path(), module + '.jar')]
+    execargs += [str(arg) for arg in kwargs.pop('args', [])]
+    for key, value in kwargs.pop('params', {}).items():
+        execargs += [key.upper() + '=' + str(value)]
+    sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
+    return arvados.util.run_command(execargs, **kwargs)
diff --git a/crunch_scripts/arvados_samtools.py b/crunch_scripts/arvados_samtools.py
new file mode 100644 (file)
index 0000000..dd7a428
--- /dev/null
@@ -0,0 +1,73 @@
+import arvados
+import re
+import os
+import sys
+import fcntl
+import subprocess
+
+samtools_path = None
+
+def samtools_install_path():
+    global samtools_path
+    if samtools_path:
+        return samtools_path
+    samtools_path = arvados.util.tarball_extract(
+        tarball = arvados.current_job()['script_parameters']['samtools_tgz'],
+        path = 'samtools')
+
+    # build "samtools" binary
+    lockfile = open(os.path.split(samtools_path)[0] + '.samtools-make.lock',
+                    'w')
+    fcntl.flock(lockfile, fcntl.LOCK_EX)
+    arvados.util.run_command(['make', '-j16'], cwd=samtools_path)
+    lockfile.close()
+
+    return samtools_path
+
+def samtools_binary():
+    return os.path.join(samtools_install_path(), 'samtools')
+
+def run(command, command_args, **kwargs):
+    execargs = [samtools_binary(),
+                command]
+    execargs += command_args
+    sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
+    arvados.util.run_command(
+        execargs,
+        cwd=arvados.current_task().tmpdir,
+        stdin=kwargs.get('stdin', subprocess.PIPE),
+        stderr=kwargs.get('stderr', sys.stderr),
+        stdout=kwargs.get('stdout', sys.stderr))
+
+def one_task_per_bam_file(if_sequence=0, and_end_task=True):
+    if if_sequence != arvados.current_task()['sequence']:
+        return
+    job_input = arvados.current_job()['script_parameters']['input']
+    cr = arvados.CollectionReader(job_input)
+    bam = {}
+    bai = {}
+    for s in cr.all_streams():
+        for f in s.all_files():
+            if re.search(r'\.bam$', f.name()):
+                bam[s.name(), f.name()] = f
+            elif re.search(r'\.bai$', f.name()):
+                bai[s.name(), f.name()] = f
+    for ((s_name, f_name), bam_f) in bam.items():
+        bai_f = bai.get((s_name, re.sub(r'bam$', 'bai', f_name)), None)
+        task_input = bam_f.as_manifest()
+        if bai_f:
+            task_input += bai_f.as_manifest()
+        new_task_attrs = {
+            'job_uuid': arvados.current_job()['uuid'],
+            'created_by_job_task_uuid': arvados.current_task()['uuid'],
+            'sequence': if_sequence + 1,
+            'parameters': {
+                'input': task_input
+                }
+            }
+        arvados.api().job_tasks().create(body=new_task_attrs).execute()
+    if and_end_task:
+        arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'],
+                                         body={'success':True}
+                                         ).execute()
+        exit(0)
diff --git a/crunch_scripts/bwa-aln b/crunch_scripts/bwa-aln
new file mode 100755 (executable)
index 0000000..89e8b3a
--- /dev/null
@@ -0,0 +1,124 @@
+#!/usr/bin/env python
+
+import arvados
+import arvados_bwa
+import arvados_samtools
+import os
+import re
+import sys
+import subprocess
+
+arvados_bwa.one_task_per_pair_input_file(if_sequence=0, and_end_task=True)
+
+this_job = arvados.current_job()
+this_task = arvados.current_task()
+ref_dir = arvados.util.collection_extract(
+    collection = this_job['script_parameters']['reference_index'],
+    path = 'reference',
+    decompress = False)
+
+ref_basename = None
+for f in os.listdir(ref_dir):
+    basename = re.sub(r'\.bwt$', '', f)
+    if basename != f:
+        ref_basename = os.path.join(ref_dir, basename)
+if ref_basename == None:
+    raise Exception("Could not find *.bwt in reference collection.")
+
+tmp_dir = arvados.current_task().tmpdir
+
+class Aligner:
+    def input_filename(self):
+        for s in arvados.CollectionReader(self.collection).all_streams():
+            for f in s.all_files():
+                return f.decompressed_name()
+    def generate_input(self):
+        for s in arvados.CollectionReader(self.collection).all_streams():
+            for f in s.all_files():
+                for s in f.readall_decompressed():
+                    yield s
+    def aln(self, input_param):
+        self.collection = this_task['parameters'][input_param]
+        reads_filename = os.path.join(tmp_dir, self.input_filename())
+        aln_filename = os.path.join(tmp_dir, self.input_filename() + '.sai')
+        reads_pipe_r, reads_pipe_w = os.pipe()
+        if os.fork() == 0:
+            os.close(reads_pipe_r)
+            reads_file = open(reads_filename, 'wb')
+            for s in self.generate_input():
+                if len(s) != os.write(reads_pipe_w, s):
+                    raise Exception("short write")
+                reads_file.write(s)
+            reads_file.close()
+            os.close(reads_pipe_w)
+            sys.exit(0)
+        os.close(reads_pipe_w)
+
+        aln_file = open(aln_filename, 'wb')
+        bwa_proc = subprocess.Popen(
+            [arvados_bwa.bwa_binary(),
+             'aln', '-t', '16',
+             ref_basename,
+             '-'],
+            stdin=os.fdopen(reads_pipe_r, 'rb', 2**20),
+            stdout=aln_file)
+        aln_file.close()
+        return reads_filename, aln_filename
+
+reads_1, alignments_1 = Aligner().aln('input_1')
+reads_2, alignments_2 = Aligner().aln('input_2')
+pid1, exit1 = os.wait()
+pid2, exit2 = os.wait()
+if exit1 != 0 or exit2 != 0:
+    raise Exception("bwa aln exited non-zero (0x%x, 0x%x)" % (exit1, exit2))
+
+# output alignments in sam format to pipe
+sam_pipe_r, sam_pipe_w = os.pipe()
+sam_pid = os.fork()
+if sam_pid != 0:
+    # parent
+    os.close(sam_pipe_w)
+else:
+    # child
+    os.close(sam_pipe_r)
+    arvados_bwa.run('sampe',
+                    [ref_basename,
+                     alignments_1, alignments_2,
+                     reads_1, reads_2],
+                    stdout=os.fdopen(sam_pipe_w, 'wb', 2**20))
+    sys.exit(0)
+
+# convert sam (sam_pipe_r) to bam (bam_pipe_w)
+bam_pipe_r, bam_pipe_w = os.pipe()
+bam_pid = os.fork()
+if bam_pid != 0:
+    # parent
+    os.close(bam_pipe_w)
+    os.close(sam_pipe_r)
+else:
+    # child
+    os.close(bam_pipe_r)
+    arvados_samtools.run('view',
+                         ['-S', '-b',
+                          '-'],
+                         stdin=os.fdopen(sam_pipe_r, 'rb', 2**20),
+                         stdout=os.fdopen(bam_pipe_w, 'wb', 2**20))
+    sys.exit(0)
+
+# copy bam (bam_pipe_r) to Keep
+out_bam_filename = os.path.split(reads_1)[-1] + '.bam'
+out = arvados.CollectionWriter()
+out.start_new_stream()
+out.start_new_file(out_bam_filename)
+out.write(os.fdopen(bam_pipe_r, 'rb', 2**20))
+
+# make sure everyone exited nicely
+pid3, exit3 = os.waitpid(sam_pid, 0)
+if exit3 != 0:
+    raise Exception("bwa sampe exited non-zero (0x%x)" % exit3)
+pid4, exit4 = os.waitpid(bam_pid, 0)
+if exit4 != 0:
+    raise Exception("samtools view exited non-zero (0x%x)" % exit4)
+
+# proclaim success
+this_task.set_output(out.finish())
diff --git a/crunch_scripts/bwa-index b/crunch_scripts/bwa-index
new file mode 100755 (executable)
index 0000000..3b21549
--- /dev/null
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+
+import arvados
+import arvados_bwa
+import os
+import re
+import sys
+
+this_job = arvados.current_job()
+this_task = arvados.current_task()
+ref_dir = arvados.util.collection_extract(
+    collection = this_job['script_parameters']['input'],
+    path = 'reference',
+    decompress = False)
+
+ref_fasta_files = (os.path.join(ref_dir, f)
+                   for f in os.listdir(ref_dir)
+                   if re.search(r'\.fasta(\.gz)?$', f))
+
+# build reference index
+arvados_bwa.run('index',
+                ['-a', 'bwtsw'] + list(ref_fasta_files))
+
+# move output files to new empty directory
+out_dir = os.path.join(arvados.current_task().tmpdir, 'out')
+arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr)
+os.mkdir(out_dir)
+for f in os.listdir(ref_dir):
+    if re.search(r'\.(amb|ann|bwt|pac|rbwt|rpac|rsa|sa)$', f):
+        sys.stderr.write("bwa output: %s (%d)\n" %
+                         (f, os.stat(os.path.join(ref_dir, f)).st_size))
+        os.rename(os.path.join(ref_dir, f),
+                  os.path.join(out_dir, f))
+
+# store output
+out = arvados.CollectionWriter()
+out.write_directory_tree(out_dir, max_manifest_depth=0)
+this_task.set_output(out.finish())
diff --git a/crunch_scripts/picard-gatk2-prep b/crunch_scripts/picard-gatk2-prep
new file mode 100755 (executable)
index 0000000..73b143a
--- /dev/null
@@ -0,0 +1,208 @@
+#!/usr/bin/env python
+
+import arvados
+import os
+import re
+import sys
+import subprocess
+import arvados_picard
+from arvados_ipc import *
+
+arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
+
+this_job = arvados.current_job()
+this_task = arvados.current_task()
+ref_dir = arvados.util.collection_extract(
+    collection = this_job['script_parameters']['reference'],
+    path = 'reference',
+    decompress = True)
+ref_fasta_files = [os.path.join(ref_dir, f)
+                   for f in os.listdir(ref_dir)
+                   if re.search(r'\.fasta(\.gz)?$', f)]
+input_collection = this_task['parameters']['input']
+
+for s in arvados.CollectionReader(input_collection).all_streams():
+    for f in s.all_files():
+        input_stream_name = s.name()
+        input_file_name = f.name()
+        break
+
+# Unfortunately, picard FixMateInformation cannot read from a pipe. We
+# must copy the input to a temporary file before running picard.
+input_bam_path = os.path.join(this_task.tmpdir, input_file_name)
+with open(input_bam_path, 'wb') as bam:
+    for s in arvados.CollectionReader(input_collection).all_streams():
+        for f in s.all_files():
+            for s in f.readall():
+                bam.write(s)
+
+children = {}
+pipes = {}
+
+pipe_setup(pipes, 'fixmate')
+if 0==named_fork(children, 'fixmate'):
+    pipe_closeallbut(pipes, ('fixmate', 'w'))
+    arvados_picard.run(
+        'FixMateInformation',
+        params={
+            'i': input_bam_path,
+            'o': '/dev/stdout',
+            'quiet': 'true',
+            'so': 'coordinate',
+            'validation_stringency': 'LENIENT',
+            'compression_level': 0
+            },
+        stdout=os.fdopen(pipes['fixmate','w'], 'wb', 2**20))
+    os._exit(0)
+os.close(pipes.pop(('fixmate','w'), None))
+
+pipe_setup(pipes, 'sortsam')
+if 0==named_fork(children, 'sortsam'):
+    pipe_closeallbut(pipes, ('fixmate', 'r'), ('sortsam', 'w'))
+    arvados_picard.run(
+        'SortSam',
+        params={
+            'i': '/dev/stdin',
+            'o': '/dev/stdout',
+            'quiet': 'true',
+            'so': 'coordinate',
+            'validation_stringency': 'LENIENT',
+            'compression_level': 0
+            },
+        stdin=os.fdopen(pipes['fixmate','r'], 'rb', 2**20),
+        stdout=os.fdopen(pipes['sortsam','w'], 'wb', 2**20))
+    os._exit(0)
+
+pipe_setup(pipes, 'reordersam')
+if 0==named_fork(children, 'reordersam'):
+    pipe_closeallbut(pipes, ('sortsam', 'r'), ('reordersam', 'w'))
+    arvados_picard.run(
+        'ReorderSam',
+        params={
+            'i': '/dev/stdin',
+            'o': '/dev/stdout',
+            'reference': ref_fasta_files[0],
+            'quiet': 'true',
+            'validation_stringency': 'LENIENT',
+            'compression_level': 0
+            },
+        stdin=os.fdopen(pipes['sortsam','r'], 'rb', 2**20),
+        stdout=os.fdopen(pipes['reordersam','w'], 'wb', 2**20))
+    os._exit(0)
+
+pipe_setup(pipes, 'addrg')
+if 0==named_fork(children, 'addrg'):
+    pipe_closeallbut(pipes, ('reordersam', 'r'), ('addrg', 'w'))
+    arvados_picard.run(
+        'AddOrReplaceReadGroups',
+        params={
+            'i': '/dev/stdin',
+            'o': '/dev/stdout',
+            'quiet': 'true',
+            'rglb': this_job['script_parameters'].get('rglb', 0),
+            'rgpl': this_job['script_parameters'].get('rgpl', 'illumina'),
+            'rgpu': this_job['script_parameters'].get('rgpu', 0),
+            'rgsm': this_job['script_parameters'].get('rgsm', 0),
+            'validation_stringency': 'LENIENT'
+            },
+        stdin=os.fdopen(pipes['reordersam','r'], 'rb', 2**20),
+        stdout=os.fdopen(pipes['addrg','w'], 'wb', 2**20))
+    os._exit(0)
+
+pipe_setup(pipes, 'bammanifest')
+pipe_setup(pipes, 'bam')
+pipe_setup(pipes, 'casm_in')
+if 0==named_fork(children, 'bammanifest'):
+    pipe_closeallbut(pipes,
+                     ('addrg', 'r'),
+                     ('bammanifest', 'w'),
+                     ('bam', 'w'),
+                     ('casm_in', 'w'))
+    out = arvados.CollectionWriter()
+    out.start_new_stream(input_stream_name)
+    out.start_new_file(input_file_name)
+    while True:
+        buf = os.read(pipes['addrg','r'], 2**20)
+        if len(buf) == 0:
+            break
+        os.write(pipes['bam','w'], buf)
+        os.write(pipes['casm_in','w'], buf)
+        out.write(buf)
+    os.write(pipes['bammanifest','w'], out.manifest_text())
+    os.close(pipes['bammanifest','w'])
+    os._exit(0)
+
+pipe_setup(pipes, 'casm')
+if 0 == named_fork(children, 'casm'):
+    pipe_closeallbut(pipes, ('casm_in', 'r'), ('casm', 'w'))
+    arvados_picard.run(
+        'CollectAlignmentSummaryMetrics',
+        params={
+            'input': '/dev/fd/' + str(pipes['casm_in','r']),
+            'output': '/dev/fd/' + str(pipes['casm','w']),
+            'reference_sequence': ref_fasta_files[0],
+            'validation_stringency': 'LENIENT',
+            },
+        close_fds=False)
+    os._exit(0)
+
+pipe_setup(pipes, 'index')
+if 0==named_fork(children, 'index'):
+    pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
+    arvados_picard.run(
+        'BuildBamIndex',
+        params={
+            'i': '/dev/stdin',
+            'o': '/dev/stdout',
+            'quiet': 'true',
+            'validation_stringency': 'LENIENT'
+            },
+        stdin=os.fdopen(pipes['bam','r'], 'rb', 2**20),
+        stdout=os.fdopen(pipes['index','w'], 'wb', 2**20))
+    os._exit(0)
+
+pipe_setup(pipes, 'indexmanifest')
+if 0==named_fork(children, 'indexmanifest'):
+    pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
+    out = arvados.CollectionWriter()
+    out.start_new_stream(input_stream_name)
+    out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
+    while True:
+        buf = os.read(pipes['index','r'], 2**20)
+        if len(buf) == 0:
+            break
+        out.write(buf)
+    os.write(pipes['indexmanifest','w'], out.manifest_text())
+    os.close(pipes['indexmanifest','w'])
+    os._exit(0)
+
+pipe_closeallbut(pipes,
+                 ('bammanifest', 'r'),
+                 ('indexmanifest', 'r'),
+                 ('casm', 'r'))
+
+outmanifest = ''
+
+for which in ['bammanifest', 'indexmanifest']:
+    with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
+        while True:
+            buf = f.read()
+            if buf == '':
+                break
+            outmanifest += buf
+
+casm_out = arvados.CollectionWriter()
+casm_out.start_new_stream(input_stream_name)
+casm_out.start_new_file(input_file_name + '.casm.tsv')
+casm_out.write(os.fdopen(pipes.pop(('casm','r'))))
+
+outmanifest += casm_out.manifest_text()
+
+all_ok = True
+for (childname, pid) in children.items():
+    all_ok = all_ok and waitpid_and_check_exit(pid, childname)
+
+if all_ok:
+    this_task.set_output(outmanifest)
+else:
+    sys.exit(1)