15133: Delete crunch_scripts, start clearing out API server
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Tue, 6 Aug 2019 19:20:47 +0000 (15:20 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 7 Aug 2019 19:16:46 +0000 (15:16 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

42 files changed:
crunch_scripts/GATK2-VariantFiltration [deleted file]
crunch_scripts/GATK2-bqsr [deleted file]
crunch_scripts/GATK2-merge-call [deleted file]
crunch_scripts/GATK2-realign [deleted file]
crunch_scripts/arvados-bcbio-nextgen.py [deleted file]
crunch_scripts/arvados_bwa.py [deleted file]
crunch_scripts/arvados_gatk2.py [deleted file]
crunch_scripts/arvados_ipc.py [deleted file]
crunch_scripts/arvados_picard.py [deleted file]
crunch_scripts/arvados_samtools.py [deleted file]
crunch_scripts/bwa-aln [deleted file]
crunch_scripts/bwa-index [deleted file]
crunch_scripts/collection-merge [deleted file]
crunch_scripts/crunchrunner [deleted file]
crunch_scripts/crunchutil/__init__.py [deleted file]
crunch_scripts/crunchutil/robust_put.py [deleted file]
crunch_scripts/crunchutil/subst.py [deleted file]
crunch_scripts/crunchutil/vwd.py [deleted file]
crunch_scripts/cwl-runner [deleted file]
crunch_scripts/decompress-all.py [deleted file]
crunch_scripts/file-select [deleted file]
crunch_scripts/grep [deleted file]
crunch_scripts/hash [deleted file]
crunch_scripts/pgp-survey-import [deleted file]
crunch_scripts/pgp-survey-parse [deleted file]
crunch_scripts/picard-gatk2-prep [deleted file]
crunch_scripts/pyrtg.py [deleted file]
crunch_scripts/rtg-fasta2sdf [deleted file]
crunch_scripts/rtg-fastq2sdf [deleted file]
crunch_scripts/rtg-map [deleted file]
crunch_scripts/rtg-snp [deleted file]
crunch_scripts/run-command [deleted file]
crunch_scripts/split-fastq.py [deleted file]
crunch_scripts/test/task_output_dir [deleted file]
services/api/app/controllers/arvados/v1/job_tasks_controller.rb
services/api/app/controllers/arvados/v1/jobs_controller.rb
services/api/app/controllers/arvados/v1/pipeline_instances_controller.rb
services/api/app/controllers/arvados/v1/pipeline_templates_controller.rb
services/api/lib/crunch_dispatch.rb [deleted file]
services/api/script/crunch-dispatch.rb [deleted file]
services/api/script/crunch_failure_report.py [deleted file]
services/api/script/fail-jobs.rb [deleted file]

diff --git a/crunch_scripts/GATK2-VariantFiltration b/crunch_scripts/GATK2-VariantFiltration
deleted file mode 100755 (executable)
index 0ef4a74..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-
-arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-gatk_path = arvados.util.tarball_extract(
-    tarball = this_job['script_parameters']['gatk_binary_tarball'],
-    path = 'gatk')
-bundle_path = arvados.util.collection_extract(
-    collection = this_job['script_parameters']['gatk_bundle'],
-    path = 'gatk-bundle',
-    files = ['human_g1k_v37.dict', 'human_g1k_v37.fasta', 'human_g1k_v37.fasta.fai'])
-this_task_input = this_task['parameters']['input']
-
-input_file = list(arvados.CollectionReader(this_task_input).all_files())[0]
-
-# choose vcf temporary file names
-vcf_in = os.path.join(arvados.current_task().tmpdir,
-                      os.path.basename(input_file.name()))
-vcf_out = re.sub('(.*)\\.vcf', '\\1-filtered.vcf', vcf_in)
-
-# fetch the unfiltered data
-vcf_in_file = open(vcf_in, 'w')
-for buf in input_file.readall():
-    vcf_in_file.write(buf)
-vcf_in_file.close()
-
-stdoutdata, stderrdata = arvados.util.run_command(
-    ['java', '-Xmx1g',
-     '-jar', os.path.join(gatk_path,'GenomeAnalysisTK.jar'),
-     '-T', 'VariantFiltration', '--variant', vcf_in,
-     '--out', vcf_out,
-     '--filterExpression', 'QD < 2.0',
-     '--filterName', 'GATK_QD',
-     '--filterExpression', 'MQ < 40.0',
-     '--filterName', 'GATK_MQ',
-     '--filterExpression', 'FS > 60.0',
-     '--filterName', 'GATK_FS',
-     '--filterExpression', 'MQRankSum < -12.5',
-     '--filterName', 'GATK_MQRankSum',
-     '--filterExpression', 'ReadPosRankSum < -8.0',
-     '--filterName', 'GATK_ReadPosRankSum',
-     '-R', os.path.join(bundle_path, 'human_g1k_v37.fasta')],
-    cwd=arvados.current_task().tmpdir)
-
-# store the filtered data
-with open(vcf_out, 'rb') as f:
-    out = arvados.CollectionWriter()
-    while True:
-        buf = f.read()
-        if len(buf) == 0:
-            break
-        out.write(buf)
-out.set_current_file_name(os.path.basename(vcf_out))
-
-this_task.set_output(out.finish())
diff --git a/crunch_scripts/GATK2-bqsr b/crunch_scripts/GATK2-bqsr
deleted file mode 100755 (executable)
index ab78226..0000000
+++ /dev/null
@@ -1,103 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import os
-import re
-import arvados
-import arvados_gatk2
-import arvados_samtools
-from arvados_ipc import *
-
-class InvalidArgumentError(Exception):
-    pass
-
-arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-tmpdir = arvados.current_task().tmpdir
-arvados.util.clear_tmpdir()
-
-known_sites_files = arvados.getjobparam(
-    'known_sites',
-    ['dbsnp_137.b37.vcf',
-     'Mills_and_1000G_gold_standard.indels.b37.vcf',
-     ])
-bundle_dir = arvados.util.collection_extract(
-    collection = this_job['script_parameters']['gatk_bundle'],
-    files = [
-        'human_g1k_v37.dict',
-        'human_g1k_v37.fasta',
-        'human_g1k_v37.fasta.fai'
-        ] + known_sites_files + [v + '.idx' for v in known_sites_files],
-    path = 'gatk_bundle')
-ref_fasta_files = [os.path.join(bundle_dir, f)
-                   for f in os.listdir(bundle_dir)
-                   if re.search(r'\.fasta(\.gz)?$', f)]
-
-input_collection = this_task['parameters']['input']
-input_dir = arvados.util.collection_extract(
-    collection = input_collection,
-    path = os.path.join(this_task.tmpdir, 'input'))
-input_bam_files = []
-for f in arvados.util.listdir_recursive(input_dir):
-    if re.search(r'\.bam$', f):
-        input_stream_name, input_file_name = os.path.split(f)
-        input_bam_files += [os.path.join(input_dir, f)]
-if len(input_bam_files) != 1:
-    raise InvalidArgumentError("Expected exactly one bam file per task.")
-
-known_sites_args = []
-for f in known_sites_files:
-    known_sites_args += ['-knownSites', os.path.join(bundle_dir, f)]
-
-recal_file = os.path.join(tmpdir, 'recal.csv')
-
-children = {}
-pipes = {}
-
-arvados_gatk2.run(
-    args=[
-        '-nct', arvados_gatk2.cpus_on_this_node(),
-        '-T', 'BaseRecalibrator',
-        '-R', ref_fasta_files[0],
-        '-I', input_bam_files[0],
-        '-o', recal_file,
-        ] + known_sites_args)
-
-pipe_setup(pipes, 'BQSR')
-if 0 == named_fork(children, 'BQSR'):
-    pipe_closeallbut(pipes, ('BQSR', 'w'))
-    arvados_gatk2.run(
-        args=[
-        '-T', 'PrintReads',
-        '-R', ref_fasta_files[0],
-        '-I', input_bam_files[0],
-        '-o', '/dev/fd/' + str(pipes['BQSR','w']),
-        '-BQSR', recal_file,
-        '--disable_bam_indexing',
-        ],
-        close_fds=False)
-    os._exit(0)
-os.close(pipes.pop(('BQSR','w'), None))
-
-out = arvados.CollectionWriter()
-out.start_new_stream(input_stream_name)
-
-out.start_new_file(input_file_name + '.recal.csv')
-out.write(open(recal_file, 'rb'))
-
-out.start_new_file(input_file_name)
-while True:
-    buf = os.read(pipes['BQSR','r'], 2**20)
-    if len(buf) == 0:
-        break
-    out.write(buf)
-pipe_closeallbut(pipes)
-
-if waitpid_and_check_children(children):
-    this_task.set_output(out.finish())
-else:
-    sys.exit(1)
diff --git a/crunch_scripts/GATK2-merge-call b/crunch_scripts/GATK2-merge-call
deleted file mode 100755 (executable)
index 6d17517..0000000
+++ /dev/null
@@ -1,242 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import os
-import re
-import string
-import threading
-import arvados
-import arvados_gatk2
-import arvados_picard
-from arvados_ipc import *
-
-class InvalidArgumentError(Exception):
-    pass
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-tmpdir = arvados.current_task().tmpdir
-arvados.util.clear_tmpdir()
-
-bundle_dir = arvados.util.collection_extract(
-    collection = this_job['script_parameters']['gatk_bundle'],
-    files = [
-        'human_g1k_v37.dict',
-        'human_g1k_v37.fasta',
-        'human_g1k_v37.fasta.fai',
-        'dbsnp_137.b37.vcf',
-        'dbsnp_137.b37.vcf.idx',
-        ],
-    path = 'gatk_bundle')
-ref_fasta_files = [os.path.join(bundle_dir, f)
-                   for f in os.listdir(bundle_dir)
-                   if re.search(r'\.fasta(\.gz)?$', f)]
-regions_args = []
-if 'regions' in this_job['script_parameters']:
-    regions_dir = arvados.util.collection_extract(
-        collection = this_job['script_parameters']['regions'],
-        path = 'regions')
-    region_padding = int(this_job['script_parameters']['region_padding'])
-    for f in os.listdir(regions_dir):
-        if re.search(r'\.bed$', f):
-            regions_args += [
-                '--intervals', os.path.join(regions_dir, f),
-                '--interval_padding', str(region_padding)
-                ]
-
-
-# Start a child process for each input file, feeding data to picard.
-
-input_child_names = []
-children = {}
-pipes = {}
-
-input_collection = this_job['script_parameters']['input']
-input_index = 0
-for s in arvados.CollectionReader(input_collection).all_streams():
-    for f in s.all_files():
-        if not re.search(r'\.bam$', f.name()):
-            continue
-        input_index += 1
-        childname = 'input-' + str(input_index)
-        input_child_names += [childname]
-        pipe_setup(pipes, childname)
-        childpid = named_fork(children, childname)
-        if childpid == 0:
-            pipe_closeallbut(pipes, (childname, 'w'))
-            for s in f.readall():
-                os.write(pipes[childname, 'w'], s)
-            os.close(pipes[childname, 'w'])
-            os._exit(0)
-        sys.stderr.write("pid %d writing %s to fd %d->%d\n" %
-                         (childpid,
-                          s.name()+'/'+f.name(),
-                          pipes[childname, 'w'],
-                          pipes[childname, 'r']))
-        pipe_closeallbut(pipes, *[(childname, 'r')
-                                  for childname in input_child_names])
-
-
-# Merge-sort the input files to merge.bam
-
-arvados_picard.run(
-    'MergeSamFiles',
-    args=[
-        'I=/dev/fd/' + str(pipes[childname, 'r'])
-        for childname in input_child_names
-        ],
-    params={
-        'o': 'merge.bam',
-        'quiet': 'true',
-        'so': 'coordinate',
-        'use_threading': 'true',
-        'create_index': 'true',
-        'validation_stringency': 'LENIENT',
-        },
-    close_fds=False,
-    )
-pipe_closeallbut(pipes)
-
-
-# Run CoverageBySample on merge.bam
-
-pipe_setup(pipes, 'stats_log')
-pipe_setup(pipes, 'stats_out')
-if 0 == named_fork(children, 'GATK'):
-    pipe_closeallbut(pipes,
-                     ('stats_log', 'w'),
-                     ('stats_out', 'w'))
-    arvados_gatk2.run(
-        args=[
-            '-T', 'CoverageBySample',
-            '-R', ref_fasta_files[0],
-            '-I', 'merge.bam',
-            '-o', '/dev/fd/' + str(pipes['stats_out', 'w']),
-            '--log_to_file', '/dev/fd/' + str(pipes['stats_log', 'w']),
-            ]
-        + regions_args,
-        close_fds=False)
-    pipe_closeallbut(pipes)
-    os._exit(0)
-pipe_closeallbut(pipes, ('stats_log', 'r'), ('stats_out', 'r'))
-
-
-# Start two threads to read from CoverageBySample pipes
-
-class ExceptionPropagatingThread(threading.Thread):
-    """
-    If a subclassed thread calls _raise(e) in run(), running join() on
-    the thread will raise e in the thread that calls join().
-    """
-    def __init__(self, *args, **kwargs):
-        super(ExceptionPropagatingThread, self).__init__(*args, **kwargs)
-        self.__exception = None
-    def join(self, *args, **kwargs):
-        ret = super(ExceptionPropagatingThread, self).join(*args, **kwargs)
-        if self.__exception:
-            raise self.__exception
-        return ret
-    def _raise(self, exception):
-        self.__exception = exception
-
-class StatsLogReader(ExceptionPropagatingThread):
-    def __init__(self, **kwargs):
-        super(StatsLogReader, self).__init__()
-        self.args = kwargs
-    def run(self):
-        try:
-            for logline in self.args['infile']:
-                x = re.search('Processing (\d+) bp from intervals', logline)
-                if x:
-                    self._total_bp = int(x.group(1))
-        except Exception as e:
-            self._raise(e)
-    def total_bp(self):
-        self.join()
-        return self._total_bp
-stats_log_thr = StatsLogReader(infile=os.fdopen(pipes.pop(('stats_log', 'r'))))
-stats_log_thr.start()
-
-class StatsOutReader(ExceptionPropagatingThread):
-    """
-    Read output of CoverageBySample and collect a histogram of
-    coverage (last column) -> number of loci (number of rows).
-    """
-    def __init__(self, **kwargs):
-        super(StatsOutReader, self).__init__()
-        self.args = kwargs
-    def run(self):
-        try:
-            hist = [0]
-            histtot = 0
-            for line in self.args['infile']:
-                try:
-                    i = int(string.split(line)[-1])
-                except ValueError:
-                    continue
-                if i >= 1:
-                    if len(hist) <= i:
-                        hist.extend([0 for x in range(1+i-len(hist))])
-                    hist[i] += 1
-                    histtot += 1
-            hist[0] = stats_log_thr.total_bp() - histtot
-            self._histogram = hist
-        except Exception as e:
-            self._raise(e)
-    def histogram(self):
-        self.join()
-        return self._histogram
-stats_out_thr = StatsOutReader(infile=os.fdopen(pipes.pop(('stats_out', 'r'))))
-stats_out_thr.start()
-
-
-# Run UnifiedGenotyper on merge.bam
-
-arvados_gatk2.run(
-    args=[
-        '-nt', arvados_gatk2.cpus_on_this_node(),
-        '-T', 'UnifiedGenotyper',
-        '-R', ref_fasta_files[0],
-        '-I', 'merge.bam',
-        '-o', os.path.join(tmpdir, 'out.vcf'),
-        '--dbsnp', os.path.join(bundle_dir, 'dbsnp_137.b37.vcf'),
-        '-metrics', 'UniGenMetrics',
-        '-A', 'DepthOfCoverage',
-        '-A', 'AlleleBalance',
-        '-A', 'QualByDepth',
-        '-A', 'HaplotypeScore',
-        '-A', 'MappingQualityRankSumTest',
-        '-A', 'ReadPosRankSumTest',
-        '-A', 'FisherStrand',
-        '-glm', 'both',
-        ]
-    + regions_args
-    + arvados.getjobparam('GATK2_UnifiedGenotyper_args',[]))
-
-# Copy the output VCF file to Keep
-
-out = arvados.CollectionWriter()
-out.start_new_stream()
-out.start_new_file('out.vcf')
-out.write(open(os.path.join(tmpdir, 'out.vcf'), 'rb'))
-
-
-# Write statistics to Keep
-
-out.start_new_file('mincoverage_nlocus.csv')
-sofar = 0
-hist = stats_out_thr.histogram()
-total_bp = stats_log_thr.total_bp()
-for i in range(len(hist)):
-    out.write("%d,%d,%f\n" %
-              (i,
-               total_bp - sofar,
-               100.0 * (total_bp - sofar) / total_bp))
-    sofar += hist[i]
-
-if waitpid_and_check_children(children):
-    this_task.set_output(out.finish())
-else:
-    sys.exit(1)
diff --git a/crunch_scripts/GATK2-realign b/crunch_scripts/GATK2-realign
deleted file mode 100755 (executable)
index 2787dff..0000000
+++ /dev/null
@@ -1,163 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import os
-import re
-import arvados
-import arvados_gatk2
-import arvados_picard
-import arvados_samtools
-from arvados_ipc import *
-
-class InvalidArgumentError(Exception):
-    pass
-
-arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-tmpdir = arvados.current_task().tmpdir
-arvados.util.clear_tmpdir()
-
-known_sites_files = arvados.getjobparam(
-    'known_sites',
-    ['dbsnp_137.b37.vcf',
-     'Mills_and_1000G_gold_standard.indels.b37.vcf',
-     ])
-bundle_dir = arvados.util.collection_extract(
-    collection = this_job['script_parameters']['gatk_bundle'],
-    files = [
-        'human_g1k_v37.dict',
-        'human_g1k_v37.fasta',
-        'human_g1k_v37.fasta.fai'
-        ] + known_sites_files + [v + '.idx' for v in known_sites_files],
-    path = 'gatk_bundle')
-ref_fasta_files = [os.path.join(bundle_dir, f)
-                   for f in os.listdir(bundle_dir)
-                   if re.search(r'\.fasta(\.gz)?$', f)]
-regions_args = []
-if 'regions' in this_job['script_parameters']:
-    regions_dir = arvados.util.collection_extract(
-        collection = this_job['script_parameters']['regions'],
-        path = 'regions')
-    region_padding = int(this_job['script_parameters']['region_padding'])
-    for f in os.listdir(regions_dir):
-        if re.search(r'\.bed$', f):
-            regions_args += [
-                '--intervals', os.path.join(regions_dir, f),
-                '--interval_padding', str(region_padding)
-                ]
-
-input_collection = this_task['parameters']['input']
-input_dir = arvados.util.collection_extract(
-    collection = input_collection,
-    path = os.path.join(this_task.tmpdir, 'input'))
-input_bam_files = []
-for f in arvados.util.listdir_recursive(input_dir):
-    if re.search(r'\.bam$', f):
-        input_stream_name, input_file_name = os.path.split(f)
-        input_bam_files += [os.path.join(input_dir, f)]
-if len(input_bam_files) != 1:
-    raise InvalidArgumentError("Expected exactly one bam file per task.")
-
-known_sites_args = []
-for f in known_sites_files:
-    known_sites_args += ['-known', os.path.join(bundle_dir, f)]
-
-children = {}
-pipes = {}
-
-arvados_gatk2.run(
-    args=[
-        '-nt', arvados_gatk2.cpus_per_task(),
-        '-T', 'RealignerTargetCreator',
-        '-R', ref_fasta_files[0],
-        '-I', input_bam_files[0],
-        '-o', os.path.join(tmpdir, 'intervals.list')
-        ] + known_sites_args + regions_args)
-
-pipe_setup(pipes, 'IndelRealigner')
-if 0 == named_fork(children, 'IndelRealigner'):
-    pipe_closeallbut(pipes, ('IndelRealigner', 'w'))
-    arvados_gatk2.run(
-        args=[
-        '-T', 'IndelRealigner',
-        '-R', ref_fasta_files[0],
-        '-targetIntervals', os.path.join(tmpdir, 'intervals.list'),
-        '-I', input_bam_files[0],
-        '-o', '/dev/fd/' + str(pipes['IndelRealigner','w']),
-        '--disable_bam_indexing',
-        ] + known_sites_args + regions_args,
-        close_fds=False)
-    os._exit(0)
-os.close(pipes.pop(('IndelRealigner','w'), None))
-
-pipe_setup(pipes, 'bammanifest')
-pipe_setup(pipes, 'bam')
-if 0==named_fork(children, 'bammanifest'):
-    pipe_closeallbut(pipes,
-                     ('IndelRealigner', 'r'),
-                     ('bammanifest', 'w'),
-                     ('bam', 'w'))
-    out = arvados.CollectionWriter()
-    out.start_new_stream(input_stream_name)
-    out.start_new_file(input_file_name)
-    while True:
-        buf = os.read(pipes['IndelRealigner','r'], 2**20)
-        if len(buf) == 0:
-            break
-        os.write(pipes['bam','w'], buf)
-        out.write(buf)
-    os.write(pipes['bammanifest','w'], out.manifest_text())
-    os.close(pipes['bammanifest','w'])
-    os._exit(0)
-
-pipe_setup(pipes, 'index')
-if 0==named_fork(children, 'index'):
-    pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
-    arvados_picard.run(
-        'BuildBamIndex',
-        params={
-            'i': '/dev/fd/' + str(pipes['bam','r']),
-            'o': '/dev/fd/' + str(pipes['index','w']),
-            'quiet': 'true',
-            'validation_stringency': 'LENIENT'
-            },
-        close_fds=False)
-    os._exit(0)
-
-pipe_setup(pipes, 'indexmanifest')
-if 0==named_fork(children, 'indexmanifest'):
-    pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
-    out = arvados.CollectionWriter()
-    out.start_new_stream(input_stream_name)
-    out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
-    while True:
-        buf = os.read(pipes['index','r'], 2**20)
-        if len(buf) == 0:
-            break
-        out.write(buf)
-    os.write(pipes['indexmanifest','w'], out.manifest_text())
-    os.close(pipes['indexmanifest','w'])
-    os._exit(0)
-
-pipe_closeallbut(pipes, ('bammanifest', 'r'), ('indexmanifest', 'r'))
-outmanifest = ''
-for which in ['bammanifest', 'indexmanifest']:
-    with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
-        while True:
-            buf = f.read()
-            if buf == '':
-                break
-            outmanifest += buf
-
-all_ok = True
-for (childname, pid) in children.items():
-    all_ok = all_ok and waitpid_and_check_exit(pid, childname)
-
-if all_ok:
-    this_task.set_output(outmanifest)
-else:
-    sys.exit(1)
diff --git a/crunch_scripts/arvados-bcbio-nextgen.py b/crunch_scripts/arvados-bcbio-nextgen.py
deleted file mode 100755 (executable)
index b7e19ec..0000000
+++ /dev/null
@@ -1,145 +0,0 @@
-#!/usr/bin/python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import subprocess
-import crunchutil.subst as subst
-import shutil
-import os
-import sys
-import time
-
-if len(arvados.current_task()['parameters']) > 0:
-    p = arvados.current_task()['parameters']
-else:
-    p = arvados.current_job()['script_parameters']
-
-t = arvados.current_task().tmpdir
-
-os.unlink("/usr/local/share/bcbio-nextgen/galaxy")
-os.mkdir("/usr/local/share/bcbio-nextgen/galaxy")
-shutil.copy("/usr/local/share/bcbio-nextgen/config/bcbio_system.yaml", "/usr/local/share/bcbio-nextgen/galaxy")
-
-with open("/usr/local/share/bcbio-nextgen/galaxy/tool_data_table_conf.xml", "w") as f:
-    f.write('''<tables>
-    <!-- Locations of indexes in the BWA mapper format -->
-    <table name="bwa_indexes" comment_char="#">
-        <columns>value, dbkey, name, path</columns>
-        <file path="tool-data/bwa_index.loc" />
-    </table>
-    <!-- Locations of indexes in the Bowtie2 mapper format -->
-    <table name="bowtie2_indexes" comment_char="#">
-        <columns>value, dbkey, name, path</columns>
-        <file path="tool-data/bowtie2_indices.loc" />
-    </table>
-    <!-- Locations of indexes in the Bowtie2 mapper format for TopHat2 to use -->
-    <table name="tophat2_indexes" comment_char="#">
-        <columns>value, dbkey, name, path</columns>
-        <file path="tool-data/bowtie2_indices.loc" />
-    </table>
-    <!-- Location of SAMTools indexes and other files -->
-    <table name="sam_fa_indexes" comment_char="#">
-        <columns>index, value, path</columns>
-        <file path="tool-data/sam_fa_indices.loc" />
-    </table>
-    <!-- Location of Picard dict file and other files -->
-    <table name="picard_indexes" comment_char="#">
-        <columns>value, dbkey, name, path</columns>
-        <file path="tool-data/picard_index.loc" />
-    </table>
-    <!-- Location of Picard dict files valid for GATK -->
-    <table name="gatk_picard_indexes" comment_char="#">
-        <columns>value, dbkey, name, path</columns>
-        <file path="tool-data/gatk_sorted_picard_index.loc" />
-    </table>
-</tables>
-''')
-
-os.mkdir("/usr/local/share/bcbio-nextgen/galaxy/tool-data")
-
-with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/bowtie2_indices.loc", "w") as f:
-    f.write(subst.do_substitution(p, "GRCh37\tGRCh37\tHuman (GRCh37)\t$(dir $(bowtie2_indices))\n"))
-
-with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/bwa_index.loc", "w") as f:
-    f.write(subst.do_substitution(p, "GRCh37\tGRCh37\tHuman (GRCh37)\t$(file $(bwa_index))\n"))
-
-with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/gatk_sorted_picard_index.loc", "w") as f:
-    f.write(subst.do_substitution(p, "GRCh37\tGRCh37\tHuman (GRCh37)\t$(file $(gatk_sorted_picard_index))\n"))
-
-with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/picard_index.loc", "w") as f:
-    f.write(subst.do_substitution(p, "GRCh37\tGRCh37\tHuman (GRCh37)\t$(file $(picard_index))\n"))
-
-with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/sam_fa_indices.loc", "w") as f:
-    f.write(subst.do_substitution(p, "index\tGRCh37\t$(file $(sam_fa_indices))\n"))
-
-with open("/tmp/crunch-job/freebayes-variant.yaml", "w") as f:
-    f.write('''
-# Template for whole genome Illumina variant calling with FreeBayes
-# This is a GATK-free pipeline without post-alignment BAM pre-processing
-# (recalibration and realignment)
----
-details:
-  - analysis: variant2
-    genome_build: GRCh37
-    # to do multi-sample variant calling, assign samples the same metadata / batch
-    # metadata:
-    #   batch: your-arbitrary-batch-name
-    algorithm:
-      aligner: bwa
-      mark_duplicates: true
-      recalibrate: false
-      realign: false
-      variantcaller: freebayes
-      platform: illumina
-      quality_format: Standard
-      # for targetted projects, set the region
-      # variant_regions: /path/to/your.bed
-''')
-
-os.unlink("/usr/local/share/bcbio-nextgen/gemini_data")
-os.symlink(arvados.get_job_param_mount("gemini_data"), "/usr/local/share/bcbio-nextgen/gemini_data")
-
-os.chdir(arvados.current_task().tmpdir)
-
-rcode = subprocess.call(["bcbio_nextgen.py", "--workflow", "template", "/tmp/crunch-job/freebayes-variant.yaml", "project1",
-                         subst.do_substitution(p, "$(file $(R1))"),
-                         subst.do_substitution(p, "$(file $(R2))")])
-
-os.chdir("project1/work")
-
-os.symlink("/usr/local/share/bcbio-nextgen/galaxy/tool-data", "tool-data")
-
-rcode = subprocess.call(["bcbio_nextgen.py", "../config/project1.yaml", "-n", os.environ['CRUNCH_NODE_SLOTS']])
-
-print("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
-
-if rcode == 0:
-    os.chdir("../final")
-
-    print("arvados-bcbio-nextgen: the follow output files will be saved to keep:")
-
-    subprocess.call(["find", ".", "-type", "f", "-printf", "arvados-bcbio-nextgen: %12.12s %h/%f\\n"])
-
-    print("arvados-bcbio-nextgen: start writing output to keep")
-
-    done = False
-    api = arvados.api('v1')
-    while not done:
-        try:
-            out = arvados.CollectionWriter()
-            out.write_directory_tree(".", max_manifest_depth=0)
-            outuuid = out.finish()
-            api.job_tasks().update(uuid=arvados.current_task()['uuid'],
-                                                 body={
-                                                     'output':outuuid,
-                                                     'success': (rcode == 0),
-                                                     'progress':1.0
-                                                 }).execute()
-            done = True
-        except Exception as e:
-            print("arvados-bcbio-nextgen: caught exception: {}".format(e))
-            time.sleep(5)
-
-sys.exit(rcode)
diff --git a/crunch_scripts/arvados_bwa.py b/crunch_scripts/arvados_bwa.py
deleted file mode 100644 (file)
index aefc1f0..0000000
+++ /dev/null
@@ -1,115 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-import os
-import sys
-import fcntl
-import subprocess
-
-bwa_install_path = None
-
-def install_path():
-    """
-    Extract the bwa source tree, build the bwa binary, and return the
-    path to the source tree.
-    """
-    global bwa_install_path
-    if bwa_install_path:
-        return bwa_install_path
-
-    bwa_install_path = arvados.util.tarball_extract(
-        tarball = arvados.current_job()['script_parameters']['bwa_tbz'],
-        path = 'bwa')
-
-    # build "bwa" binary
-    lockfile = open(os.path.split(bwa_install_path)[0] + '.bwa-make.lock',
-                    'w')
-    fcntl.flock(lockfile, fcntl.LOCK_EX)
-    arvados.util.run_command(['make', '-j16'], cwd=bwa_install_path)
-    lockfile.close()
-
-    return bwa_install_path
-
-def bwa_binary():
-    """
-    Return the path to the bwa executable.
-    """
-    return os.path.join(install_path(), 'bwa')
-
-def run(command, command_args, **kwargs):
-    """
-    Build and run the bwa binary.
-
-    command is the bwa module, e.g., "index" or "aln".
-
-    command_args is a list of additional command line arguments, e.g.,
-    ['-a', 'bwtsw', 'ref.fasta']
-
-    It is assumed that we are running in a Crunch job environment, and
-    the job's "bwa_tbz" parameter is a collection containing the bwa
-    source tree in a .tbz file.
-    """
-    execargs = [bwa_binary(),
-                command]
-    execargs += command_args
-    sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
-    arvados.util.run_command(
-        execargs,
-        cwd=arvados.current_task().tmpdir,
-        stderr=sys.stderr,
-        stdin=kwargs.get('stdin', subprocess.PIPE),
-        stdout=kwargs.get('stdout', sys.stderr))
-
-def one_task_per_pair_input_file(if_sequence=0, and_end_task=True):
-    """
-    Queue one task for each pair of fastq files in this job's input
-    collection.
-
-    Each new task will have two parameters, named "input_1" and
-    "input_2", each being a manifest containing a single fastq file.
-
-    A matching pair of files in the input collection is assumed to
-    have names "x_1.y" and "x_2.y".
-
-    Files in the input collection that are not part of a matched pair
-    are silently ignored.
-
-    if_sequence and and_end_task arguments have the same significance
-    as in arvados.job_setup.one_task_per_input_file().
-    """
-    if if_sequence != arvados.current_task()['sequence']:
-        return
-    job_input = arvados.current_job()['script_parameters']['input']
-    cr = arvados.CollectionReader(job_input)
-    all_files = []
-    for s in cr.all_streams():
-        all_files += list(s.all_files())
-    for s in cr.all_streams():
-        for left_file in s.all_files():
-            left_name = left_file.name()
-            right_file = None
-            right_name = re.sub(r'(.*_)1\.', '\g<1>2.', left_name)
-            if right_name == left_name:
-                continue
-            for f2 in s.all_files():
-                if right_name == f2.name():
-                    right_file = f2
-            if right_file != None:
-                new_task_attrs = {
-                    'job_uuid': arvados.current_job()['uuid'],
-                    'created_by_job_task_uuid': arvados.current_task()['uuid'],
-                    'sequence': if_sequence + 1,
-                    'parameters': {
-                        'input_1':left_file.as_manifest(),
-                        'input_2':right_file.as_manifest()
-                        }
-                    }
-                arvados.api().job_tasks().create(body=new_task_attrs).execute()
-    if and_end_task:
-        arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'],
-                                   body={'success':True}
-                                   ).execute()
-        exit(0)
diff --git a/crunch_scripts/arvados_gatk2.py b/crunch_scripts/arvados_gatk2.py
deleted file mode 100644 (file)
index fa00b44..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-import os
-import sys
-import fcntl
-import subprocess
-
-gatk2_install_path = None
-
-def install_path():
-    global gatk2_install_path
-    if gatk2_install_path:
-        return gatk2_install_path
-    gatk2_install_path = arvados.util.tarball_extract(
-        tarball = arvados.current_job()['script_parameters']['gatk_tbz'],
-        path = 'gatk2')
-    return gatk2_install_path
-
-def memory_limit():
-    taskspernode = int(os.environ.get('CRUNCH_NODE_SLOTS', '1'))
-    with open('/proc/meminfo', 'r') as f:
-        ram = int(re.search(r'MemTotal:\s*(\d+)', f.read()).group(1)) / 1024
-    if taskspernode > 1:
-        ram = ram / taskspernode
-    return max(ram-700, 500)
-
-def cpus_on_this_node():
-    with open('/proc/cpuinfo', 'r') as cpuinfo:
-        return max(int(os.environ.get('SLURM_CPUS_ON_NODE', 1)),
-                   len(re.findall(r'^processor\s*:\s*\d',
-                                  cpuinfo.read(),
-                                  re.MULTILINE)))
-
-def cpus_per_task():
-    return max(1, (cpus_on_this_node()
-                   / int(os.environ.get('CRUNCH_NODE_SLOTS', 1))))
-
-def run(**kwargs):
-    kwargs.setdefault('cwd', arvados.current_task().tmpdir)
-    kwargs.setdefault('stdout', sys.stderr)
-    execargs = ['java',
-                '-Xmx%dm' % memory_limit(),
-                '-Djava.io.tmpdir=' + arvados.current_task().tmpdir,
-                '-jar', os.path.join(install_path(), 'GenomeAnalysisTK.jar')]
-    execargs += [str(arg) for arg in kwargs.pop('args', [])]
-    sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
-    return arvados.util.run_command(execargs, **kwargs)
-
diff --git a/crunch_scripts/arvados_ipc.py b/crunch_scripts/arvados_ipc.py
deleted file mode 100644 (file)
index 9787162..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import os
-import re
-import sys
-import subprocess
-
-def pipe_setup(pipes, name):
-    pipes[name,'r'], pipes[name,'w'] = os.pipe()
-
-def pipe_closeallbut(pipes, *keepus):
-    for n,m in pipes.keys():
-        if (n,m) not in keepus:
-            os.close(pipes.pop((n,m), None))
-
-def named_fork(children, name):
-    children[name] = os.fork()
-    return children[name]
-
-def waitpid_and_check_children(children):
-    """
-    Given a dict of childname->pid, wait for each child process to
-    finish, and report non-zero exit status on stderr. Return True if
-    all children exited 0.
-    """
-    all_ok = True
-    for (childname, pid) in children.items():
-        # all_ok must be on RHS here -- we need to call waitpid() on
-        # every child, even if all_ok is already False.
-        all_ok = waitpid_and_check_exit(pid, childname) and all_ok
-    return all_ok
-
-def waitpid_and_check_exit(pid, childname=''):
-    """
-    Wait for a child process to finish. If it exits non-zero, report
-    exit status on stderr (mentioning the given childname) and return
-    False. If it exits zero, return True.
-    """
-    _, childstatus = os.waitpid(pid, 0)
-    exitvalue = childstatus >> 8
-    signal = childstatus & 127
-    dumpedcore = childstatus & 128
-    if childstatus != 0:
-        sys.stderr.write("%s child %d failed: exit %d signal %d core %s\n"
-                         % (childname, pid, exitvalue, signal,
-                            ('y' if dumpedcore else 'n')))
-        return False
-    return True
-
diff --git a/crunch_scripts/arvados_picard.py b/crunch_scripts/arvados_picard.py
deleted file mode 100644 (file)
index 3d830db..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-import os
-import sys
-import fcntl
-import subprocess
-
-picard_install_path = None
-
-def install_path():
-    global picard_install_path
-    if picard_install_path:
-        return picard_install_path
-    zipball = arvados.current_job()['script_parameters']['picard_zip']
-    extracted = arvados.util.zipball_extract(
-        zipball = zipball,
-        path = 'picard')
-    for f in os.listdir(extracted):
-        if (re.search(r'^picard-tools-[\d\.]+$', f) and
-            os.path.exists(os.path.join(extracted, f, '.'))):
-            picard_install_path = os.path.join(extracted, f)
-            break
-    if not picard_install_path:
-        raise Exception("picard-tools-{version} directory not found in %s" %
-                        zipball)
-    return picard_install_path
-
-def run(module, **kwargs):
-    kwargs.setdefault('cwd', arvados.current_task().tmpdir)
-    execargs = ['java',
-                '-Xmx1500m',
-                '-Djava.io.tmpdir=' + arvados.current_task().tmpdir,
-                '-jar', os.path.join(install_path(), module + '.jar')]
-    execargs += [str(arg) for arg in kwargs.pop('args', [])]
-    for key, value in kwargs.pop('params', {}).items():
-        execargs += [key.upper() + '=' + str(value)]
-    sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
-    return arvados.util.run_command(execargs, **kwargs)
diff --git a/crunch_scripts/arvados_samtools.py b/crunch_scripts/arvados_samtools.py
deleted file mode 100644 (file)
index 09992f6..0000000
+++ /dev/null
@@ -1,110 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-import os
-import sys
-import fcntl
-import subprocess
-
-samtools_path = None
-
-def samtools_install_path():
-    """
-    Extract the samtools source tree, build the samtools binary, and
-    return the path to the source tree.
-    """
-    global samtools_path
-    if samtools_path:
-        return samtools_path
-    samtools_path = arvados.util.tarball_extract(
-        tarball = arvados.current_job()['script_parameters']['samtools_tgz'],
-        path = 'samtools')
-
-    # build "samtools" binary
-    lockfile = open(os.path.split(samtools_path)[0] + '.samtools-make.lock',
-                    'w')
-    fcntl.flock(lockfile, fcntl.LOCK_EX)
-    arvados.util.run_command(['make', '-j16'], cwd=samtools_path)
-    lockfile.close()
-
-    return samtools_path
-
-def samtools_binary():
-    """
-    Return the path to the samtools executable.
-    """
-    return os.path.join(samtools_install_path(), 'samtools')
-
-def run(command, command_args, **kwargs):
-    """
-    Build and run the samtools binary.
-
-    command is the samtools subcommand, e.g., "view" or "sort".
-
-    command_args is a list of additional command line arguments, e.g.,
-    ['-bt', 'ref_list.txt', '-o', 'aln.bam', 'aln.sam.gz']
-
-    It is assumed that we are running in a Crunch job environment, and
-    the job's "samtools_tgz" parameter is a collection containing the
-    samtools source tree in a .tgz file.
-    """
-    execargs = [samtools_binary(),
-                command]
-    execargs += command_args
-    sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
-    arvados.util.run_command(
-        execargs,
-        cwd=arvados.current_task().tmpdir,
-        stdin=kwargs.get('stdin', subprocess.PIPE),
-        stderr=kwargs.get('stderr', sys.stderr),
-        stdout=kwargs.get('stdout', sys.stderr))
-
-def one_task_per_bam_file(if_sequence=0, and_end_task=True):
-    """
-    Queue one task for each bam file in this job's input collection.
-
-    Each new task will have an "input" parameter: a manifest
-    containing one .bam file and (if available) the corresponding .bai
-    index file.
-
-    Files in the input collection that are not named *.bam or *.bai
-    (as well as *.bai files that do not match any .bam file present)
-    are silently ignored.
-
-    if_sequence and and_end_task arguments have the same significance
-    as in arvados.job_setup.one_task_per_input_file().
-    """
-    if if_sequence != arvados.current_task()['sequence']:
-        return
-    job_input = arvados.current_job()['script_parameters']['input']
-    cr = arvados.CollectionReader(job_input)
-    bam = {}
-    bai = {}
-    for s in cr.all_streams():
-        for f in s.all_files():
-            if re.search(r'\.bam$', f.name()):
-                bam[s.name(), f.name()] = f
-            elif re.search(r'\.bai$', f.name()):
-                bai[s.name(), f.name()] = f
-    for ((s_name, f_name), bam_f) in bam.items():
-        bai_f = bai.get((s_name, re.sub(r'bam$', 'bai', f_name)), None)
-        task_input = bam_f.as_manifest()
-        if bai_f:
-            task_input += bai_f.as_manifest()
-        new_task_attrs = {
-            'job_uuid': arvados.current_job()['uuid'],
-            'created_by_job_task_uuid': arvados.current_task()['uuid'],
-            'sequence': if_sequence + 1,
-            'parameters': {
-                'input': task_input
-                }
-            }
-        arvados.api().job_tasks().create(body=new_task_attrs).execute()
-    if and_end_task:
-        arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'],
-                                         body={'success':True}
-                                         ).execute()
-        exit(0)
diff --git a/crunch_scripts/bwa-aln b/crunch_scripts/bwa-aln
deleted file mode 100755 (executable)
index e3d85a7..0000000
+++ /dev/null
@@ -1,127 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import arvados_bwa
-import arvados_samtools
-import os
-import re
-import sys
-import subprocess
-
-arvados_bwa.one_task_per_pair_input_file(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-ref_dir = arvados.util.collection_extract(
-    collection = this_job['script_parameters']['reference_index'],
-    path = 'reference',
-    decompress = False)
-
-ref_basename = None
-for f in os.listdir(ref_dir):
-    basename = re.sub(r'\.bwt$', '', f)
-    if basename != f:
-        ref_basename = os.path.join(ref_dir, basename)
-if ref_basename == None:
-    raise Exception("Could not find *.bwt in reference collection.")
-
-tmp_dir = arvados.current_task().tmpdir
-
-class Aligner:
-    def input_filename(self):
-        for s in arvados.CollectionReader(self.collection).all_streams():
-            for f in s.all_files():
-                return f.decompressed_name()
-    def generate_input(self):
-        for s in arvados.CollectionReader(self.collection).all_streams():
-            for f in s.all_files():
-                for s in f.readall_decompressed():
-                    yield s
-    def aln(self, input_param):
-        self.collection = this_task['parameters'][input_param]
-        reads_filename = os.path.join(tmp_dir, self.input_filename())
-        aln_filename = os.path.join(tmp_dir, self.input_filename() + '.sai')
-        reads_pipe_r, reads_pipe_w = os.pipe()
-        if os.fork() == 0:
-            os.close(reads_pipe_r)
-            reads_file = open(reads_filename, 'wb')
-            for s in self.generate_input():
-                if len(s) != os.write(reads_pipe_w, s):
-                    raise Exception("short write")
-                reads_file.write(s)
-            reads_file.close()
-            os.close(reads_pipe_w)
-            sys.exit(0)
-        os.close(reads_pipe_w)
-
-        aln_file = open(aln_filename, 'wb')
-        bwa_proc = subprocess.Popen(
-            [arvados_bwa.bwa_binary(),
-             'aln', '-t', '16',
-             ref_basename,
-             '-'],
-            stdin=os.fdopen(reads_pipe_r, 'rb', 2**20),
-            stdout=aln_file)
-        aln_file.close()
-        return reads_filename, aln_filename
-
-reads_1, alignments_1 = Aligner().aln('input_1')
-reads_2, alignments_2 = Aligner().aln('input_2')
-pid1, exit1 = os.wait()
-pid2, exit2 = os.wait()
-if exit1 != 0 or exit2 != 0:
-    raise Exception("bwa aln exited non-zero (0x%x, 0x%x)" % (exit1, exit2))
-
-# output alignments in sam format to pipe
-sam_pipe_r, sam_pipe_w = os.pipe()
-sam_pid = os.fork()
-if sam_pid != 0:
-    # parent
-    os.close(sam_pipe_w)
-else:
-    # child
-    os.close(sam_pipe_r)
-    arvados_bwa.run('sampe',
-                    [ref_basename,
-                     alignments_1, alignments_2,
-                     reads_1, reads_2],
-                    stdout=os.fdopen(sam_pipe_w, 'wb', 2**20))
-    sys.exit(0)
-
-# convert sam (sam_pipe_r) to bam (bam_pipe_w)
-bam_pipe_r, bam_pipe_w = os.pipe()
-bam_pid = os.fork()
-if bam_pid != 0:
-    # parent
-    os.close(bam_pipe_w)
-    os.close(sam_pipe_r)
-else:
-    # child
-    os.close(bam_pipe_r)
-    arvados_samtools.run('view',
-                         ['-S', '-b',
-                          '-'],
-                         stdin=os.fdopen(sam_pipe_r, 'rb', 2**20),
-                         stdout=os.fdopen(bam_pipe_w, 'wb', 2**20))
-    sys.exit(0)
-
-# copy bam (bam_pipe_r) to Keep
-out_bam_filename = os.path.split(reads_1)[-1] + '.bam'
-out = arvados.CollectionWriter()
-out.start_new_stream()
-out.start_new_file(out_bam_filename)
-out.write(os.fdopen(bam_pipe_r, 'rb', 2**20))
-
-# make sure everyone exited nicely
-pid3, exit3 = os.waitpid(sam_pid, 0)
-if exit3 != 0:
-    raise Exception("bwa sampe exited non-zero (0x%x)" % exit3)
-pid4, exit4 = os.waitpid(bam_pid, 0)
-if exit4 != 0:
-    raise Exception("samtools view exited non-zero (0x%x)" % exit4)
-
-# proclaim success
-this_task.set_output(out.finish())
diff --git a/crunch_scripts/bwa-index b/crunch_scripts/bwa-index
deleted file mode 100755 (executable)
index f5b7030..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import arvados_bwa
-import os
-import re
-import sys
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-ref_dir = arvados.util.collection_extract(
-    collection = this_job['script_parameters']['input'],
-    path = 'reference',
-    decompress = False)
-
-ref_fasta_files = (os.path.join(ref_dir, f)
-                   for f in os.listdir(ref_dir)
-                   if re.search(r'\.fasta(\.gz)?$', f))
-
-# build reference index
-arvados_bwa.run('index',
-                ['-a', 'bwtsw'] + list(ref_fasta_files))
-
-# move output files to new empty directory
-out_dir = os.path.join(arvados.current_task().tmpdir, 'out')
-arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr)
-os.mkdir(out_dir)
-for f in os.listdir(ref_dir):
-    if re.search(r'\.(amb|ann|bwt|pac|rbwt|rpac|rsa|sa)$', f):
-        sys.stderr.write("bwa output: %s (%d)\n" %
-                         (f, os.stat(os.path.join(ref_dir, f)).st_size))
-        os.rename(os.path.join(ref_dir, f),
-                  os.path.join(out_dir, f))
-
-# store output
-out = arvados.CollectionWriter()
-out.write_directory_tree(out_dir, max_manifest_depth=0)
-this_task.set_output(out.finish())
diff --git a/crunch_scripts/collection-merge b/crunch_scripts/collection-merge
deleted file mode 100755 (executable)
index f3aa5ce..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-# collection-merge
-#
-# Merge two or more collections together.  Can also be used to extract specific
-# files from a collection to produce a new collection.
-#
-# input:
-# An array of collections or collection/file paths in script_parameter["input"]
-#
-# output:
-# A manifest with the collections merged.  Duplicate file names will
-# have their contents concatenated in the order that they appear in the input
-# array.
-
-import arvados
-import md5
-import crunchutil.subst as subst
-import subprocess
-import os
-import hashlib
-
-p = arvados.current_job()['script_parameters']
-
-merged = ""
-src = []
-for c in p["input"]:
-    c = subst.do_substitution(p, c)
-    i = c.find('/')
-    if i == -1:
-        src.append(c)
-        merged += arvados.CollectionReader(c).manifest_text()
-    else:
-        src.append(c[0:i])
-        cr = arvados.CollectionReader(c[0:i])
-        j = c.rfind('/')
-        stream = c[i+1:j]
-        if stream == "":
-            stream = "."
-        fn = c[(j+1):]
-        for s in cr.all_streams():
-            if s.name() == stream:
-                if fn in s.files():
-                    merged += s.files()[fn].as_manifest()
-
-arvados.current_task().set_output(merged)
diff --git a/crunch_scripts/crunchrunner b/crunch_scripts/crunchrunner
deleted file mode 100755 (executable)
index 25d3ba5..0000000
+++ /dev/null
@@ -1,10 +0,0 @@
-#!/bin/sh
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-if test -n "$JOB_PARAMETER_CRUNCHRUNNER" ; then
-    exec $TASK_KEEPMOUNT/$JOB_PARAMETER_CRUNCHRUNNER
-else
-    exec /usr/local/bin/crunchrunner
-fi
diff --git a/crunch_scripts/crunchutil/__init__.py b/crunch_scripts/crunchutil/__init__.py
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/crunch_scripts/crunchutil/robust_put.py b/crunch_scripts/crunchutil/robust_put.py
deleted file mode 100644 (file)
index 27b0bf3..0000000
+++ /dev/null
@@ -1,56 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import arvados.commands.put as put
-import os
-import logging
-import time
-
-def machine_progress(bytes_written, bytes_expected):
-    return "upload wrote {} total {}\n".format(
-        bytes_written, -1 if (bytes_expected is None) else bytes_expected)
-
-class Args(object):
-    def __init__(self, fn):
-        self.filename = None
-        self.paths = [fn]
-        self.max_manifest_depth = 0
-
-# Upload to Keep with error recovery.
-# Return a uuid or raise an exception if there are too many failures.
-def upload(source_dir, logger=None):
-    if logger is None:
-        logger = logging.getLogger("arvados")
-
-    source_dir = os.path.abspath(source_dir)
-    done = False
-    if 'TASK_WORK' in os.environ:
-        resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
-    else:
-        resume_cache = put.ResumeCache(put.ResumeCache.make_path(Args(source_dir)))
-    reporter = put.progress_writer(machine_progress)
-    bytes_expected = put.expected_bytes_for([source_dir])
-    backoff = 1
-    outuuid = None
-    while not done:
-        try:
-            out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
-            out.do_queued_work()
-            out.write_directory_tree(source_dir, max_manifest_depth=0)
-            outuuid = out.finish()
-            done = True
-        except KeyboardInterrupt as e:
-            logger.critical("caught interrupt signal 2")
-            raise e
-        except Exception as e:
-            logger.exception("caught exception:")
-            backoff *= 2
-            if backoff > 256:
-                logger.critical("Too many upload failures, giving up")
-                raise e
-            else:
-                logger.warning("Sleeping for %s seconds before trying again" % backoff)
-                time.sleep(backoff)
-    return outuuid
diff --git a/crunch_scripts/crunchutil/subst.py b/crunch_scripts/crunchutil/subst.py
deleted file mode 100644 (file)
index 53def97..0000000
+++ /dev/null
@@ -1,102 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import glob
-import os
-import re
-import stat
-
-BACKSLASH_ESCAPE_RE = re.compile(r'\\(.)')
-
-class SubstitutionError(Exception):
-    pass
-
-def search(c):
-    DEFAULT = 0
-    DOLLAR = 1
-
-    i = 0
-    state = DEFAULT
-    start = None
-    depth = 0
-    while i < len(c):
-        if c[i] == '\\':
-            i += 1
-        elif state == DEFAULT:
-            if c[i] == '$':
-                state = DOLLAR
-                if depth == 0:
-                    start = i
-            elif c[i] == ')':
-                if depth == 1:
-                    return [start, i]
-                if depth > 0:
-                    depth -= 1
-        elif state == DOLLAR:
-            if c[i] == '(':
-                depth += 1
-            state = DEFAULT
-        i += 1
-    if depth != 0:
-        raise SubstitutionError("Substitution error, mismatched parentheses {}".format(c))
-    return None
-
-def sub_file(v):
-    path = os.path.join(os.environ['TASK_KEEPMOUNT'], v)
-    st = os.stat(path)
-    if st and stat.S_ISREG(st.st_mode):
-        return path
-    else:
-        raise SubstitutionError("$(file {}) is not accessible or is not a regular file".format(path))
-
-def sub_dir(v):
-    d = os.path.dirname(v)
-    if d == '':
-        d = v
-    path = os.path.join(os.environ['TASK_KEEPMOUNT'], d)
-    st = os.stat(path)
-    if st and stat.S_ISDIR(st.st_mode):
-        return path
-    else:
-        raise SubstitutionError("$(dir {}) is not accessible or is not a directory".format(path))
-
-def sub_basename(v):
-    return os.path.splitext(os.path.basename(v))[0]
-
-def sub_glob(v):
-    l = glob.glob(v)
-    if len(l) == 0:
-        raise SubstitutionError("$(glob {}) no match found".format(v))
-    else:
-        return l[0]
-
-default_subs = {"file ": sub_file,
-                "dir ": sub_dir,
-                "basename ": sub_basename,
-                "glob ": sub_glob}
-
-def do_substitution(p, c, subs=default_subs):
-    while True:
-        m = search(c)
-        if m is None:
-            return BACKSLASH_ESCAPE_RE.sub(r'\1', c)
-
-        v = do_substitution(p, c[m[0]+2 : m[1]])
-        var = True
-        for sub in subs:
-            if v.startswith(sub):
-                r = subs[sub](v[len(sub):])
-                var = False
-                break
-        if var:
-            if v in p:
-                r = p[v]
-            else:
-                raise SubstitutionError("Unknown variable or function '%s' while performing substitution on '%s'" % (v, c))
-            if r is None:
-                raise SubstitutionError("Substitution for '%s' is null while performing substitution on '%s'" % (v, c))
-            if not isinstance(r, basestring):
-                raise SubstitutionError("Substitution for '%s' must be a string while performing substitution on '%s'" % (v, c))
-
-        c = c[:m[0]] + r + c[m[1]+1:]
diff --git a/crunch_scripts/crunchutil/vwd.py b/crunch_scripts/crunchutil/vwd.py
deleted file mode 100644 (file)
index 3245da1..0000000
+++ /dev/null
@@ -1,107 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import stat
-import arvados.commands.run
-import logging
-
-# Implements "Virtual Working Directory"
-# Provides a way of emulating a shared writable directory in Keep based
-# on a "check out, edit, check in, merge" model.
-# At the moment, this only permits adding new files, applications
-# cannot modify or delete existing files.
-
-# Create a symlink tree rooted at target_dir mirroring arv-mounted
-# source_collection.  target_dir must be empty, and will be created if it
-# doesn't exist.
-def checkout(source_collection, target_dir, keepmount=None):
-    # create symlinks
-    if keepmount is None:
-        keepmount = os.environ['TASK_KEEPMOUNT']
-
-    if not os.path.exists(target_dir):
-        os.makedirs(target_dir)
-
-    l = os.listdir(target_dir)
-    if len(l) > 0:
-        raise Exception("target_dir must be empty before checkout, contains %s" % l)
-
-    stem = os.path.join(keepmount, source_collection)
-    for root, dirs, files in os.walk(os.path.join(keepmount, source_collection), topdown=True):
-        rel = root[len(stem)+1:]
-        for d in dirs:
-            os.mkdir(os.path.join(target_dir, rel, d))
-        for f in files:
-            os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f))
-
-def checkin(target_dir):
-    """Write files in `target_dir` to Keep.
-
-    Regular files or symlinks to files outside the keep mount are written to
-    Keep as normal files (Keep does not support symlinks).
-
-    Symlinks to files in the keep mount will result in files in the new
-    collection which reference existing Keep blocks, no data copying necessary.
-
-    Returns a new Collection object, with data flushed but the collection record
-    not saved to the API.
-
-    """
-
-    outputcollection = arvados.collection.Collection(num_retries=5)
-
-    if target_dir[-1:] != '/':
-        target_dir += '/'
-
-    collections = {}
-
-    logger = logging.getLogger("arvados")
-
-    last_error = None
-    for root, dirs, files in os.walk(target_dir):
-        for f in files:
-            try:
-                s = os.lstat(os.path.join(root, f))
-
-                writeIt = False
-
-                if stat.S_ISREG(s.st_mode):
-                    writeIt = True
-                elif stat.S_ISLNK(s.st_mode):
-                    # 1. check if it is a link into a collection
-                    real = os.path.split(os.path.realpath(os.path.join(root, f)))
-                    (pdh, branch) = arvados.commands.run.is_in_collection(real[0], real[1])
-                    if pdh is not None:
-                        # 2. load collection
-                        if pdh not in collections:
-                            # 2.1 make sure it is flushed (see #5787 note 11)
-                            fd = os.open(real[0], os.O_RDONLY)
-                            os.fsync(fd)
-                            os.close(fd)
-
-                            # 2.2 get collection from API server
-                            collections[pdh] = arvados.collection.CollectionReader(pdh,
-                                                                                   api_client=outputcollection._my_api(),
-                                                                                   keep_client=outputcollection._my_keep(),
-                                                                                   num_retries=5)
-                        # 3. copy arvfile to new collection
-                        outputcollection.copy(branch, os.path.join(root[len(target_dir):], f), source_collection=collections[pdh])
-                    else:
-                        writeIt = True
-
-                if writeIt:
-                    reldir = root[len(target_dir):]
-                    with outputcollection.open(os.path.join(reldir, f), "wb") as writer:
-                        with open(os.path.join(root, f), "rb") as reader:
-                            dat = reader.read(64*1024)
-                            while dat:
-                                writer.write(dat)
-                                dat = reader.read(64*1024)
-            except (IOError, OSError) as e:
-                logger.error(e)
-                last_error = e
-
-    return (outputcollection, last_error)
diff --git a/crunch_scripts/cwl-runner b/crunch_scripts/cwl-runner
deleted file mode 100755 (executable)
index 0c79844..0000000
+++ /dev/null
@@ -1,117 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-# Crunch script integration for running arvados-cwl-runner inside a crunch job.
-
-import arvados_cwl
-import sys
-
-try:
-    # Use the crunch script defined in the arvados_cwl package.  This helps
-    # prevent the crunch script from going out of sync with the rest of the
-    # arvados_cwl package.
-    import arvados_cwl.crunch_script
-    arvados_cwl.crunch_script.run()
-    sys.exit()
-except ImportError:
-    pass
-
-# When running against an older arvados-cwl-runner package without
-# arvados_cwl.crunch_script, fall back to the old code.
-
-
-# This gets the job record, transforms the script parameters into a valid CWL
-# input object, then executes the CWL runner to run the underlying workflow or
-# tool.  When the workflow completes, record the output object in an output
-# collection for this runner job.
-
-import arvados
-import arvados.collection
-import arvados.util
-import cwltool.main
-import logging
-import os
-import json
-import argparse
-import re
-import functools
-
-from arvados.api import OrderedJsonModel
-from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
-from cwltool.load_tool import load_tool
-
-# Print package versions
-logging.info(cwltool.main.versionstring())
-
-api = arvados.api("v1")
-
-try:
-    job_order_object = arvados.current_job()['script_parameters']
-
-    pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
-
-    def keeppath(v):
-        if pdh_path.match(v):
-            return "keep:%s" % v
-        else:
-            return v
-
-    def keeppathObj(v):
-        v["location"] = keeppath(v["location"])
-
-    job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"])
-
-    for k,v in job_order_object.items():
-        if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
-            job_order_object[k] = {
-                "class": "File",
-                "location": "keep:%s" % v
-            }
-
-    adjustFileObjs(job_order_object, keeppathObj)
-    adjustDirObjs(job_order_object, keeppathObj)
-    normalizeFilesDirs(job_order_object)
-    adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
-
-    output_name = None
-    if "arv:output_name" in job_order_object:
-        output_name = job_order_object["arv:output_name"]
-        del job_order_object["arv:output_name"]
-
-    runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
-                                      output_name=output_name)
-
-    t = load_tool(job_order_object, runner.arv_make_tool)
-
-    args = argparse.Namespace()
-    args.project_uuid = arvados.current_job()["owner_uuid"]
-    args.enable_reuse = True
-    args.submit = False
-    args.debug = True
-    args.quiet = False
-    args.ignore_docker_for_reuse = False
-    args.basedir = os.getcwd()
-    args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
-    outputObj = runner.arv_executor(t, job_order_object, **vars(args))
-
-    if runner.final_output_collection:
-        outputCollection = runner.final_output_collection.portable_data_hash()
-    else:
-        outputCollection = None
-
-    api.job_tasks().update(uuid=arvados.current_task()['uuid'],
-                                         body={
-                                             'output': outputCollection,
-                                             'success': True,
-                                             'progress':1.0
-                                         }).execute()
-except Exception as e:
-    logging.exception("Unhandled exception")
-    api.job_tasks().update(uuid=arvados.current_task()['uuid'],
-                                         body={
-                                             'output': None,
-                                             'success': False,
-                                             'progress':1.0
-                                         }).execute()
diff --git a/crunch_scripts/decompress-all.py b/crunch_scripts/decompress-all.py
deleted file mode 100755 (executable)
index 100ea12..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-#
-# decompress-all.py
-#
-# Decompress all compressed files in the collection using the "dtrx" tool and
-# produce a new collection with the contents.  Uncompressed files
-# are passed through.
-#
-# input:
-# A collection at script_parameters["input"]
-#
-# output:
-# A manifest of the uncompressed contents of the input collection.
-
-import arvados
-import re
-import subprocess
-import os
-import sys
-import crunchutil.robust_put as robust_put
-
-arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True,
-                                          input_as_path=True)
-
-task = arvados.current_task()
-
-input_file = task['parameters']['input']
-
-infile_parts = re.match(r"(^[a-f0-9]{32}\+\d+)(\+\S+)*(/.*)?(/[^/]+)$", input_file)
-
-outdir = os.path.join(task.tmpdir, "output")
-os.makedirs(outdir)
-os.chdir(outdir)
-
-if infile_parts is None:
-    print >>sys.stderr, "Failed to parse input filename '%s' as a Keep file\n" % input_file
-    sys.exit(1)
-
-cr = arvados.CollectionReader(infile_parts.group(1))
-streamname = infile_parts.group(3)[1:]
-filename = infile_parts.group(4)[1:]
-
-if streamname is not None:
-    subprocess.call(["mkdir", "-p", streamname])
-    os.chdir(streamname)
-else:
-    streamname = '.'
-
-m = re.match(r'.*\.(gz|Z|bz2|tgz|tbz|zip|rar|7z|cab|deb|rpm|cpio|gem)$', arvados.get_task_param_mount('input'), re.IGNORECASE)
-
-if m is not None:
-    rc = subprocess.call(["dtrx", "-r", "-n", "-q", arvados.get_task_param_mount('input')])
-    if rc == 0:
-        task.set_output(robust_put.upload(outdir))
-    else:
-        sys.exit(rc)
-else:
-    streamreader = filter(lambda s: s.name() == streamname, cr.all_streams())[0]
-    filereader = streamreader.files()[filename]
-    task.set_output(streamname + filereader.as_manifest()[1:])
diff --git a/crunch_scripts/file-select b/crunch_scripts/file-select
deleted file mode 100755 (executable)
index c4af05c..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-this_job_input = this_job['script_parameters']['input']
-manifest_text = ""
-for f in arvados.CollectionReader(this_job_input).all_files():
-    if f.name() in this_job['script_parameters']['names']:
-        manifest_text += f.as_manifest()
-
-this_task.set_output(arvados.Keep.put(manifest_text))
diff --git a/crunch_scripts/grep b/crunch_scripts/grep
deleted file mode 100755 (executable)
index a84c0f6..0000000
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-
-arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-this_task_input = this_task['parameters']['input']
-pattern = re.compile(this_job['script_parameters']['pattern'])
-
-input_file = list(arvados.CollectionReader(this_task_input).all_files())[0]
-out = arvados.CollectionWriter()
-out.set_current_file_name(input_file.decompressed_name())
-out.set_current_stream_name(input_file.stream_name())
-for line in input_file.readlines():
-    if pattern.search(line):
-        out.write(line)
-
-this_task.set_output(out.finish())
diff --git a/crunch_scripts/hash b/crunch_scripts/hash
deleted file mode 100755 (executable)
index 56eec7a..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-#!/usr/bin/env python                                                                                                                                                                            
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import hashlib
-import os
-
-arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-
-if 'algorithm' in this_job['script_parameters']:
-    alg = this_job['script_parameters']['algorithm']
-else:
-    alg = 'md5'
-digestor = hashlib.new(alg)
-
-input_file = arvados.get_task_param_mount('input')
-
-with open(input_file) as f:
-    while True:
-        buf = f.read(2**20)
-        if len(buf) == 0:
-            break
-        digestor.update(buf)
-
-hexdigest = digestor.hexdigest()
-
-file_name = '/'.join(this_task['parameters']['input'].split('/')[1:])
-
-out = arvados.CollectionWriter()
-out.set_current_file_name("md5sum.txt")
-out.write("%s %s\n" % (hexdigest, file_name))
-this_task.set_output(out.finish())
diff --git a/crunch_scripts/pgp-survey-import b/crunch_scripts/pgp-survey-import
deleted file mode 100755 (executable)
index f12e84b..0000000
+++ /dev/null
@@ -1,119 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import string
-import json
-import UserDict
-import sys
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-this_job_input = this_job['script_parameters']['input']
-
-out = arvados.CollectionWriter()
-out.set_current_file_name("arvados_objects.json")
-out.write("[\n")
-separator = ""
-
-traits = {}
-done_bytes = 0
-done_ratio = 0
-for input_file in arvados.CollectionReader(this_job_input).all_files():
-    for line_number, line in enumerate(input_file.readlines()):
-
-        done_bytes += len(line)
-        new_done_ratio = 1.0 * done_bytes / input_file.size()
-        if line_number == 2 or new_done_ratio - done_ratio > 0.05:
-            sys.stderr.write("progress: %d%% after %d lines\n" % (int(done_ratio * 100), line_number+1))
-            done_ratio = new_done_ratio
-
-        words = string.split(string.strip(line), "\t")
-        if line_number == 0:
-            headings = words
-            for t in arvados.api('v1').traits().list(
-                where={'name':words},
-                limit=1000
-                ).execute()['items']:
-                traits[t['name']] = t
-            for i, trait_name in enumerate(words[3:], start=3):
-                # find or create trait
-                if trait_name not in traits:
-                    traits_match = arvados.api('v1').traits().list(
-                        where={'name':trait_name}
-                        ).execute()['items']
-                    if len(traits_match) > 0:
-                        traits[trait_name] = traits_match[0]
-                    else:
-                        traits[trait_name] = arvados.api('v1').traits().create(
-                            trait={'name':trait_name}).execute()
-                out.write(separator)
-                out.write(json.dumps(traits[trait_name]))
-                separator = ",\n"
-        else:
-            huID_links_match = arvados.api('v1').links().list(
-                where={'link_class':'identifier','name':words[0]}
-                ).execute()['items']
-            if len(huID_links_match) > 0:
-                human_uuid = huID_links_match[0]['head_uuid']
-            else:
-                human = arvados.api('v1').humans().create(
-                    body={}
-                    ).execute()
-                huID_link = arvados.api('v1').links().create(
-                    body={
-                        'link_class':'identifier',
-                        'name':words[0],
-                        'head_kind':'arvados#human',
-                        'head_uuid':human['uuid']
-                        }
-                    ).execute()
-                human_uuid = human['uuid']
-            human_trait = {}
-            for t in arvados.api('v1').links().list(
-                limit=10000,
-                where={
-                    'tail_uuid':human_uuid,
-                    'tail_kind':'arvados#human',
-                    'head_kind':'arvados#trait',
-                    'link_class':'human_trait',
-                    'name':'pgp-survey-response'
-                    }
-                ).execute()['items']:
-                human_trait[t['head_uuid']] = t
-            for i, trait_value in enumerate(words[3:], start=3):
-                trait_uuid = traits[headings[i]]['uuid']
-                if trait_uuid in human_trait:
-                    trait_link = human_trait[trait_uuid]
-                    if trait_link['properties']['value'] != trait_value:
-                        # update database value to match survey response
-                        trait_link['properties']['value'] = trait_value
-                        arvados.api('v1').links().update(
-                            uuid=trait_link['uuid'],
-                            body={'properties':trait_link['properties']}
-                            ).execute()
-                    out.write(",\n")
-                    out.write(json.dumps(trait_link))
-                elif trait_value == '':
-                    # nothing in database, nothing in input
-                    pass
-                else:
-                    trait_link = {
-                        'tail_uuid':human_uuid,
-                        'tail_kind':'arvados#human',
-                        'head_uuid':traits[headings[i]]['uuid'],
-                        'head_kind':'arvados#trait',
-                        'link_class':'human_trait',
-                        'name':'pgp-survey-response',
-                        'properties': { 'value': trait_value }
-                        }
-                    arvados.api('v1').links().create(
-                        body=trait_link
-                        ).execute()
-                    out.write(",\n")
-                    out.write(json.dumps(trait_link))
-
-out.write("\n]\n")
-this_task.set_output(out.finish())
diff --git a/crunch_scripts/pgp-survey-parse b/crunch_scripts/pgp-survey-parse
deleted file mode 100755 (executable)
index ee852f1..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-parser_path = arvados.util.git_checkout(
-    url = this_job['script_parameters']['parser_url'],
-    version = this_job['script_parameters']['parser_version'],
-    path = 'parser')
-
-stdoutdata, stderrdata = arvados.util.run_command(
-    ["python", "demo.py"],
-    cwd=parser_path)
-
-out = arvados.CollectionWriter()
-out.write(stdoutdata)
-out.set_current_file_name('participant_traits.tsv')
-this_task.set_output(out.finish())
diff --git a/crunch_scripts/picard-gatk2-prep b/crunch_scripts/picard-gatk2-prep
deleted file mode 100755 (executable)
index 976060f..0000000
+++ /dev/null
@@ -1,211 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-import sys
-import subprocess
-import arvados_picard
-from arvados_ipc import *
-
-arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-ref_dir = arvados.util.collection_extract(
-    collection = this_job['script_parameters']['reference'],
-    path = 'reference',
-    decompress = True)
-ref_fasta_files = [os.path.join(ref_dir, f)
-                   for f in os.listdir(ref_dir)
-                   if re.search(r'\.fasta(\.gz)?$', f)]
-input_collection = this_task['parameters']['input']
-
-for s in arvados.CollectionReader(input_collection).all_streams():
-    for f in s.all_files():
-        input_stream_name = s.name()
-        input_file_name = f.name()
-        break
-
-# Unfortunately, picard FixMateInformation cannot read from a pipe. We
-# must copy the input to a temporary file before running picard.
-input_bam_path = os.path.join(this_task.tmpdir, input_file_name)
-with open(input_bam_path, 'wb') as bam:
-    for s in arvados.CollectionReader(input_collection).all_streams():
-        for f in s.all_files():
-            for s in f.readall():
-                bam.write(s)
-
-children = {}
-pipes = {}
-
-pipe_setup(pipes, 'fixmate')
-if 0==named_fork(children, 'fixmate'):
-    pipe_closeallbut(pipes, ('fixmate', 'w'))
-    arvados_picard.run(
-        'FixMateInformation',
-        params={
-            'i': input_bam_path,
-            'o': '/dev/stdout',
-            'quiet': 'true',
-            'so': 'coordinate',
-            'validation_stringency': 'LENIENT',
-            'compression_level': 0
-            },
-        stdout=os.fdopen(pipes['fixmate','w'], 'wb', 2**20))
-    os._exit(0)
-os.close(pipes.pop(('fixmate','w'), None))
-
-pipe_setup(pipes, 'sortsam')
-if 0==named_fork(children, 'sortsam'):
-    pipe_closeallbut(pipes, ('fixmate', 'r'), ('sortsam', 'w'))
-    arvados_picard.run(
-        'SortSam',
-        params={
-            'i': '/dev/stdin',
-            'o': '/dev/stdout',
-            'quiet': 'true',
-            'so': 'coordinate',
-            'validation_stringency': 'LENIENT',
-            'compression_level': 0
-            },
-        stdin=os.fdopen(pipes['fixmate','r'], 'rb', 2**20),
-        stdout=os.fdopen(pipes['sortsam','w'], 'wb', 2**20))
-    os._exit(0)
-
-pipe_setup(pipes, 'reordersam')
-if 0==named_fork(children, 'reordersam'):
-    pipe_closeallbut(pipes, ('sortsam', 'r'), ('reordersam', 'w'))
-    arvados_picard.run(
-        'ReorderSam',
-        params={
-            'i': '/dev/stdin',
-            'o': '/dev/stdout',
-            'reference': ref_fasta_files[0],
-            'quiet': 'true',
-            'validation_stringency': 'LENIENT',
-            'compression_level': 0
-            },
-        stdin=os.fdopen(pipes['sortsam','r'], 'rb', 2**20),
-        stdout=os.fdopen(pipes['reordersam','w'], 'wb', 2**20))
-    os._exit(0)
-
-pipe_setup(pipes, 'addrg')
-if 0==named_fork(children, 'addrg'):
-    pipe_closeallbut(pipes, ('reordersam', 'r'), ('addrg', 'w'))
-    arvados_picard.run(
-        'AddOrReplaceReadGroups',
-        params={
-            'i': '/dev/stdin',
-            'o': '/dev/stdout',
-            'quiet': 'true',
-            'rglb': this_job['script_parameters'].get('rglb', 0),
-            'rgpl': this_job['script_parameters'].get('rgpl', 'illumina'),
-            'rgpu': this_job['script_parameters'].get('rgpu', 0),
-            'rgsm': this_job['script_parameters'].get('rgsm', 0),
-            'validation_stringency': 'LENIENT'
-            },
-        stdin=os.fdopen(pipes['reordersam','r'], 'rb', 2**20),
-        stdout=os.fdopen(pipes['addrg','w'], 'wb', 2**20))
-    os._exit(0)
-
-pipe_setup(pipes, 'bammanifest')
-pipe_setup(pipes, 'bam')
-pipe_setup(pipes, 'casm_in')
-if 0==named_fork(children, 'bammanifest'):
-    pipe_closeallbut(pipes,
-                     ('addrg', 'r'),
-                     ('bammanifest', 'w'),
-                     ('bam', 'w'),
-                     ('casm_in', 'w'))
-    out = arvados.CollectionWriter()
-    out.start_new_stream(input_stream_name)
-    out.start_new_file(input_file_name)
-    while True:
-        buf = os.read(pipes['addrg','r'], 2**20)
-        if len(buf) == 0:
-            break
-        os.write(pipes['bam','w'], buf)
-        os.write(pipes['casm_in','w'], buf)
-        out.write(buf)
-    os.write(pipes['bammanifest','w'], out.manifest_text())
-    os.close(pipes['bammanifest','w'])
-    os._exit(0)
-
-pipe_setup(pipes, 'casm')
-if 0 == named_fork(children, 'casm'):
-    pipe_closeallbut(pipes, ('casm_in', 'r'), ('casm', 'w'))
-    arvados_picard.run(
-        'CollectAlignmentSummaryMetrics',
-        params={
-            'input': '/dev/fd/' + str(pipes['casm_in','r']),
-            'output': '/dev/fd/' + str(pipes['casm','w']),
-            'reference_sequence': ref_fasta_files[0],
-            'validation_stringency': 'LENIENT',
-            },
-        close_fds=False)
-    os._exit(0)
-
-pipe_setup(pipes, 'index')
-if 0==named_fork(children, 'index'):
-    pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
-    arvados_picard.run(
-        'BuildBamIndex',
-        params={
-            'i': '/dev/stdin',
-            'o': '/dev/stdout',
-            'quiet': 'true',
-            'validation_stringency': 'LENIENT'
-            },
-        stdin=os.fdopen(pipes['bam','r'], 'rb', 2**20),
-        stdout=os.fdopen(pipes['index','w'], 'wb', 2**20))
-    os._exit(0)
-
-pipe_setup(pipes, 'indexmanifest')
-if 0==named_fork(children, 'indexmanifest'):
-    pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
-    out = arvados.CollectionWriter()
-    out.start_new_stream(input_stream_name)
-    out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
-    while True:
-        buf = os.read(pipes['index','r'], 2**20)
-        if len(buf) == 0:
-            break
-        out.write(buf)
-    os.write(pipes['indexmanifest','w'], out.manifest_text())
-    os.close(pipes['indexmanifest','w'])
-    os._exit(0)
-
-pipe_closeallbut(pipes,
-                 ('bammanifest', 'r'),
-                 ('indexmanifest', 'r'),
-                 ('casm', 'r'))
-
-outmanifest = ''
-
-for which in ['bammanifest', 'indexmanifest']:
-    with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
-        while True:
-            buf = f.read()
-            if buf == '':
-                break
-            outmanifest += buf
-
-casm_out = arvados.CollectionWriter()
-casm_out.start_new_stream(input_stream_name)
-casm_out.start_new_file(input_file_name + '.casm.tsv')
-casm_out.write(os.fdopen(pipes.pop(('casm','r'))))
-
-outmanifest += casm_out.manifest_text()
-
-all_ok = True
-for (childname, pid) in children.items():
-    all_ok = all_ok and waitpid_and_check_exit(pid, childname)
-
-if all_ok:
-    this_task.set_output(outmanifest)
-else:
-    sys.exit(1)
diff --git a/crunch_scripts/pyrtg.py b/crunch_scripts/pyrtg.py
deleted file mode 100644 (file)
index d733270..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-import os
-import sys
-
-rtg_install_path = None
-
-def setup():
-    global rtg_install_path
-    if rtg_install_path:
-        return rtg_install_path
-    rtg_path = arvados.util.zipball_extract(
-        zipball = arvados.current_job()['script_parameters']['rtg_binary_zip'],
-        path = 'rtg')
-    rtg_license_path = arvados.util.collection_extract(
-        collection = arvados.current_job()['script_parameters']['rtg_license'],
-        path = 'license',
-        decompress = False)
-
-    # symlink to rtg-license.txt
-    license_txt_path = os.path.join(rtg_license_path, 'rtg-license.txt')
-    try:
-        os.symlink(license_txt_path, os.path.join(rtg_path,'rtg-license.txt'))
-    except OSError:
-        if not os.path.exists(os.path.join(rtg_path,'rtg-license.txt')):
-            os.symlink(license_txt_path, os.path.join(rtg_path,'rtg-license.txt'))
-
-    rtg_install_path = rtg_path
-    return rtg_path
-
-def run_rtg(command, output_dir, command_args, **kwargs):
-    global rtg_install_path
-    execargs = [os.path.join(rtg_install_path, 'rtg'),
-                command,
-                '-o', output_dir]
-    execargs += command_args
-    sys.stderr.write("run_rtg: exec %s\n" % str(execargs))
-    arvados.util.run_command(
-        execargs,
-        cwd=arvados.current_task().tmpdir,
-        stderr=sys.stderr,
-        stdout=sys.stderr)
-
-    # Exit status cannot be trusted in rtg 1.1.1.
-    assert_done(output_dir)
-
-    # Copy log files to stderr and delete them to avoid storing them
-    # in Keep with the output data.
-    for dirent in arvados.util.listdir_recursive(output_dir):
-        if is_log_file(dirent):
-            log_file = os.path.join(output_dir, dirent)
-            sys.stderr.write(' '.join(['==>', dirent, '<==\n']))
-            with open(log_file, 'rb') as f:
-                while True:
-                    buf = f.read(2**20)
-                    if len(buf) == 0:
-                        break
-                    sys.stderr.write(buf)
-            sys.stderr.write('\n') # in case log does not end in newline
-            os.unlink(log_file)
-
-def assert_done(output_dir):
-    # Sanity-check exit code.
-    done_file = os.path.join(output_dir, 'done')
-    if not os.path.exists(done_file):
-        raise Exception("rtg exited 0 but %s does not exist. abort.\n" % done_file)
-
-def is_log_file(filename):
-    return re.search(r'^(.*/)?(progress|done|\S+.log)$', filename)
-
-setup()
diff --git a/crunch_scripts/rtg-fasta2sdf b/crunch_scripts/rtg-fasta2sdf
deleted file mode 100755 (executable)
index f1ef617..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-import sys
-import pyrtg
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-fasta_path = arvados.util.collection_extract(
-    collection = this_job['script_parameters']['input'],
-    path = 'fasta',
-    decompress = False)
-fasta_files = filter(lambda f: f != '.locator', os.listdir(fasta_path))
-out_dir = os.path.join(arvados.current_task().tmpdir, 'ref-sdf')
-arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr)
-
-pyrtg.run_rtg('format', out_dir,
-              map(lambda f: os.path.join(fasta_path, f), fasta_files))
-
-out = arvados.CollectionWriter()
-out.write_directory_tree(out_dir, max_manifest_depth=0)
-this_task.set_output(out.finish())
diff --git a/crunch_scripts/rtg-fastq2sdf b/crunch_scripts/rtg-fastq2sdf
deleted file mode 100755 (executable)
index e42697f..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-import sys
-import pyrtg
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-fastq_path = arvados.util.collection_extract(
-    collection = this_job['script_parameters']['input'],
-    path = 'fastq')
-fastq_files = filter(lambda f: f != '.locator', os.listdir(fastq_path))
-tmp_dir_base = os.path.join(arvados.current_task().tmpdir, 'tmp')
-out_dir = os.path.join(arvados.current_task().tmpdir, 'reads')
-
-arvados.util.run_command(['rm', '-rf', tmp_dir_base], stderr=sys.stderr)
-arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr)
-os.mkdir(tmp_dir_base)
-
-# convert fastq to sdf
-tmp_dirs = []
-for leftarm in fastq_files:
-    if re.search(r'_1.f(ast)?q(.gz)?$', leftarm):
-        rightarm = re.sub(r'_1(.f(ast)?q(.gz)?)$', '_2\\1', leftarm)
-        if rightarm in fastq_files:
-            tmp_dirs += ['%s/%08d' % (tmp_dir_base, len(tmp_dirs))]
-            pyrtg.run_rtg('format', tmp_dirs[-1],
-                          ['-f', 'fastq',
-                           '-q', 'sanger',
-                           '-l', os.path.join(fastq_path, leftarm),
-                           '-r', os.path.join(fastq_path, rightarm)])
-
-# split sdf
-pyrtg.run_rtg('sdfsplit', out_dir,
-              ['-n', '1500000'] + tmp_dirs)
-
-# store output
-out = arvados.CollectionWriter()
-out.write_directory_tree(out_dir, max_manifest_depth=1)
-this_task.set_output(out.finish())
diff --git a/crunch_scripts/rtg-map b/crunch_scripts/rtg-map
deleted file mode 100755 (executable)
index f740888..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-import sys
-import pyrtg
-
-arvados.job_setup.one_task_per_input_stream(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-in_dir = os.path.join(this_task.tmpdir, 'input')
-arvados.util.run_command(['rm', '-rf', in_dir], stderr=sys.stderr)
-in_dir = arvados.util.stream_extract(
-    stream = arvados.StreamReader(this_task['parameters']['input']),
-    path = in_dir,
-    decompress = False)
-ref_dir = arvados.util.collection_extract(
-    collection = this_job['script_parameters']['reference'],
-    path = 'reference',
-    decompress = False)
-
-out_dir = os.path.join(arvados.current_task().tmpdir, 'out')
-arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr)
-
-# map reads
-pyrtg.run_rtg('map', out_dir,
-              ['-i', in_dir,
-               '-t', ref_dir,
-               '-a', '2',
-               '-b', '1',
-               '--sam-rg', '@RG\\tID:NA\\tSM:NA\\tPL:ILLUMINA'])
-
-# store output
-out = arvados.CollectionWriter()
-out.write_directory_tree(out_dir, this_task['parameters']['input'][0], 0)
-this_task.set_output(out.finish())
diff --git a/crunch_scripts/rtg-snp b/crunch_scripts/rtg-snp
deleted file mode 100755 (executable)
index 1d8a605..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-import sys
-import pyrtg
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-ref_dir = arvados.util.collection_extract(
-    collection = this_job['script_parameters']['reference'],
-    path = 'reference',
-    decompress = False)
-input_dir = arvados.util.collection_extract(
-    collection = this_job['script_parameters']['input'],
-    path = 'input')
-bam_files = map(lambda f: os.path.join(input_dir, f),
-                filter(lambda f: re.search(r'^(.*/)?alignments.bam$', f),
-                       arvados.util.listdir_recursive(input_dir)))
-out_dir = os.path.join(arvados.current_task().tmpdir, 'out')
-arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr)
-
-# call sequence variants
-pyrtg.run_rtg('snp', out_dir,
-              ['-t', ref_dir] + bam_files)
-
-# store output
-out = arvados.CollectionWriter()
-out.write_directory_tree(out_dir, max_manifest_depth=0)
-this_task.set_output(out.finish())
diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
deleted file mode 100755 (executable)
index 3fd08bf..0000000
+++ /dev/null
@@ -1,458 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import logging
-
-logger = logging.getLogger('run-command')
-log_handler = logging.StreamHandler()
-log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
-logger.addHandler(log_handler)
-logger.setLevel(logging.INFO)
-
-import arvados
-import re
-import os
-import subprocess
-import sys
-import shutil
-import crunchutil.subst as subst
-import time
-import arvados.commands.put as put
-import signal
-import stat
-import copy
-import traceback
-import pprint
-import multiprocessing
-import crunchutil.robust_put as robust_put
-import crunchutil.vwd as vwd
-import argparse
-import json
-import tempfile
-import errno
-
-parser = argparse.ArgumentParser()
-parser.add_argument('--dry-run', action='store_true')
-parser.add_argument('--script-parameters', type=str, default="{}")
-args = parser.parse_args()
-
-os.umask(0077)
-
-if not args.dry_run:
-    api = arvados.api('v1')
-    t = arvados.current_task().tmpdir
-    os.chdir(arvados.current_task().tmpdir)
-    os.mkdir("tmpdir")
-    os.mkdir("output")
-
-    os.chdir("output")
-
-    outdir = os.getcwd()
-
-    taskp = None
-    jobp = arvados.current_job()['script_parameters']
-    if len(arvados.current_task()['parameters']) > 0:
-        taskp = arvados.current_task()['parameters']
-else:
-    outdir = "/tmp"
-    jobp = json.loads(args.script_parameters)
-    os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
-    os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
-    os.environ['CRUNCH_SRC'] = '/tmp/crunch-src'
-    if 'TASK_KEEPMOUNT' not in os.environ:
-        os.environ['TASK_KEEPMOUNT'] = '/keep'
-
-def sub_tmpdir(v):
-    return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
-
-def sub_outdir(v):
-    return outdir
-
-def sub_cores(v):
-     return str(multiprocessing.cpu_count())
-
-def sub_jobid(v):
-     return os.environ['JOB_UUID']
-
-def sub_taskid(v):
-     return os.environ['TASK_UUID']
-
-def sub_jobsrc(v):
-     return os.environ['CRUNCH_SRC']
-
-subst.default_subs["task.tmpdir"] = sub_tmpdir
-subst.default_subs["task.outdir"] = sub_outdir
-subst.default_subs["job.srcdir"] = sub_jobsrc
-subst.default_subs["node.cores"] = sub_cores
-subst.default_subs["job.uuid"] = sub_jobid
-subst.default_subs["task.uuid"] = sub_taskid
-
-class SigHandler(object):
-    def __init__(self):
-        self.sig = None
-
-    def send_signal(self, subprocesses, signum):
-        for sp in subprocesses:
-            sp.send_signal(signum)
-        self.sig = signum
-
-# http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html
-def flatten(l, ltypes=(list, tuple)):
-    ltype = type(l)
-    l = list(l)
-    i = 0
-    while i < len(l):
-        while isinstance(l[i], ltypes):
-            if not l[i]:
-                l.pop(i)
-                i -= 1
-                break
-            else:
-                l[i:i + 1] = l[i]
-        i += 1
-    return ltype(l)
-
-def add_to_group(gr, match):
-    m = match.groups()
-    if m not in gr:
-        gr[m] = []
-    gr[m].append(match.group(0))
-
-class EvaluationError(Exception):
-    pass
-
-# Return the name of variable ('var') that will take on each value in 'items'
-# when performing an inner substitution
-def var_items(p, c, key):
-    if key not in c:
-        raise EvaluationError("'%s' was expected in 'p' but is missing" % key)
-
-    if "var" in c:
-        if not isinstance(c["var"], basestring):
-            raise EvaluationError("Value of 'var' must be a string")
-        # Var specifies the variable name for inner parameter substitution
-        return (c["var"], get_items(p, c[key]))
-    else:
-        # The component function ('key') value is a list, so return the list
-        # directly with no parameter selected.
-        if isinstance(c[key], list):
-            return (None, get_items(p, c[key]))
-        elif isinstance(c[key], basestring):
-            # check if c[key] is a string that looks like a parameter
-            m = re.match("^\$\((.*)\)$", c[key])
-            if m and m.group(1) in p:
-                return (m.group(1), get_items(p, c[key]))
-            else:
-                # backwards compatible, foreach specifies bare parameter name to use
-                return (c[key], get_items(p, p[c[key]]))
-        else:
-            raise EvaluationError("Value of '%s' must be a string or list" % key)
-
-# "p" is the parameter scope, "c" is the item to be expanded.
-# If "c" is a dict, apply function expansion.
-# If "c" is a list, recursively expand each item and return a new list.
-# If "c" is a string, apply parameter substitution
-def expand_item(p, c):
-    if isinstance(c, dict):
-        if "foreach" in c and "command" in c:
-            # Expand a command template for each item in the specified user
-            # parameter
-            var, items = var_items(p, c, "foreach")
-            if var is None:
-                raise EvaluationError("Must specify 'var' in foreach")
-            r = []
-            for i in items:
-                params = copy.copy(p)
-                params[var] = i
-                r.append(expand_item(params, c["command"]))
-            return r
-        elif "list" in c and "index" in c and "command" in c:
-            # extract a single item from a list
-            var, items = var_items(p, c, "list")
-            if var is None:
-                raise EvaluationError("Must specify 'var' in list")
-            params = copy.copy(p)
-            params[var] = items[int(c["index"])]
-            return expand_item(params, c["command"])
-        elif "regex" in c:
-            pattern = re.compile(c["regex"])
-            if "filter" in c:
-                # filter list so that it only includes items that match a
-                # regular expression
-                _, items = var_items(p, c, "filter")
-                return [i for i in items if pattern.match(i)]
-            elif "group" in c:
-                # generate a list of lists, where items are grouped on common
-                # subexpression match
-                _, items = var_items(p, c, "group")
-                groups = {}
-                for i in items:
-                    match = pattern.match(i)
-                    if match:
-                        add_to_group(groups, match)
-                return [groups[k] for k in groups]
-            elif "extract" in c:
-                # generate a list of lists, where items are split by
-                # subexpression match
-                _, items = var_items(p, c, "extract")
-                r = []
-                for i in items:
-                    match = pattern.match(i)
-                    if match:
-                        r.append(list(match.groups()))
-                return r
-        elif "batch" in c and "size" in c:
-            # generate a list of lists, where items are split into a batch size
-            _, items = var_items(p, c, "batch")
-            sz = int(c["size"])
-            r = []
-            for j in xrange(0, len(items), sz):
-                r.append(items[j:j+sz])
-            return r
-        raise EvaluationError("Missing valid list context function")
-    elif isinstance(c, list):
-        return [expand_item(p, arg) for arg in c]
-    elif isinstance(c, basestring):
-        m = re.match("^\$\((.*)\)$", c)
-        if m and m.group(1) in p:
-            return expand_item(p, p[m.group(1)])
-        else:
-            return subst.do_substitution(p, c)
-    else:
-        raise EvaluationError("expand_item() unexpected parameter type %s" % type(c))
-
-# Evaluate in a list context
-# "p" is the parameter scope, "value" will be evaluated
-# if "value" is a list after expansion, return that
-# if "value" is a path to a directory, return a list consisting of each entry in the directory
-# if "value" is a path to a file, return a list consisting of each line of the file
-def get_items(p, value):
-    value = expand_item(p, value)
-    if isinstance(value, list):
-        return value
-    elif isinstance(value, basestring):
-        mode = os.stat(value).st_mode
-        prefix = value[len(os.environ['TASK_KEEPMOUNT'])+1:]
-        if mode is not None:
-            if stat.S_ISDIR(mode):
-                items = [os.path.join(value, l) for l in os.listdir(value)]
-            elif stat.S_ISREG(mode):
-                with open(value) as f:
-                    items = [line.rstrip("\r\n") for line in f]
-            return items
-    raise EvaluationError("get_items did not yield a list")
-
-stdoutname = None
-stdoutfile = None
-stdinname = None
-stdinfile = None
-
-# Construct the cross product of all values of each variable listed in fvars
-def recursive_foreach(params, fvars):
-    var = fvars[0]
-    fvars = fvars[1:]
-    items = get_items(params, params[var])
-    logger.info("parallelizing on %s with items %s" % (var, items))
-    if items is not None:
-        for i in items:
-            params = copy.copy(params)
-            params[var] = i
-            if len(fvars) > 0:
-                recursive_foreach(params, fvars)
-            else:
-                if not args.dry_run:
-                    arvados.api().job_tasks().create(body={
-                        'job_uuid': arvados.current_job()['uuid'],
-                        'created_by_job_task_uuid': arvados.current_task()['uuid'],
-                        'sequence': 1,
-                        'parameters': params
-                    }).execute()
-                else:
-                    if isinstance(params["command"][0], list):
-                        for c in params["command"]:
-                            logger.info(flatten(expand_item(params, c)))
-                    else:
-                        logger.info(flatten(expand_item(params, params["command"])))
-    else:
-        logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
-        sys.exit(1)
-
-try:
-    if "task.foreach" in jobp:
-        if args.dry_run or arvados.current_task()['sequence'] == 0:
-            # This is the first task to start the other tasks and exit
-            fvars = jobp["task.foreach"]
-            if isinstance(fvars, basestring):
-                fvars = [fvars]
-            if not isinstance(fvars, list) or len(fvars) == 0:
-                logger.error("value of task.foreach must be a string or non-empty list")
-                sys.exit(1)
-            recursive_foreach(jobp, jobp["task.foreach"])
-            if not args.dry_run:
-                if "task.vwd" in jobp:
-                    # Set output of the first task to the base vwd collection so it
-                    # will be merged with output fragments from the other tasks by
-                    # crunch.
-                    arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
-                else:
-                    arvados.current_task().set_output(None)
-            sys.exit(0)
-    else:
-        # This is the only task so taskp/jobp are the same
-        taskp = jobp
-except Exception as e:
-    logger.exception("caught exception")
-    logger.error("job parameters were:")
-    logger.error(pprint.pformat(jobp))
-    sys.exit(1)
-
-try:
-    if not args.dry_run:
-        if "task.vwd" in taskp:
-            # Populate output directory with symlinks to files in collection
-            vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
-
-        if "task.cwd" in taskp:
-            os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
-
-    cmd = []
-    if isinstance(taskp["command"][0], list):
-        for c in taskp["command"]:
-            cmd.append(flatten(expand_item(taskp, c)))
-    else:
-        cmd.append(flatten(expand_item(taskp, taskp["command"])))
-
-    if "task.stdin" in taskp:
-        stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
-        if not args.dry_run:
-            stdinfile = open(stdinname, "rb")
-
-    if "task.stdout" in taskp:
-        stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
-        if not args.dry_run:
-            stdoutfile = open(stdoutname, "wb")
-
-    if "task.env" in taskp:
-        env = copy.copy(os.environ)
-        for k,v in taskp["task.env"].items():
-            env[k] = subst.do_substitution(taskp, v)
-    else:
-        env = None
-
-    logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
-
-    if args.dry_run:
-        sys.exit(0)
-except subst.SubstitutionError as e:
-    logger.error(str(e))
-    logger.error("task parameters were:")
-    logger.error(pprint.pformat(taskp))
-    sys.exit(1)
-except Exception as e:
-    logger.exception("caught exception")
-    logger.error("task parameters were:")
-    logger.error(pprint.pformat(taskp))
-    sys.exit(1)
-
-# rcode holds the return codes produced by each subprocess
-rcode = {}
-try:
-    subprocesses = []
-    close_streams = []
-    if stdinfile:
-        close_streams.append(stdinfile)
-    next_stdin = stdinfile
-
-    for i in xrange(len(cmd)):
-        if i == len(cmd)-1:
-            # this is the last command in the pipeline, so its stdout should go to stdoutfile
-            next_stdout = stdoutfile
-        else:
-            # this is an intermediate command in the pipeline, so its stdout should go to a pipe
-            next_stdout = subprocess.PIPE
-
-        sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout, env=env)
-
-        # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
-        # consuming process ends prematurely.
-        if sp.stdout:
-            close_streams.append(sp.stdout)
-
-        # Send this processes's stdout to to the next process's stdin
-        next_stdin = sp.stdout
-
-        subprocesses.append(sp)
-
-    # File descriptors have been handed off to the subprocesses, so close them here.
-    for s in close_streams:
-        s.close()
-
-    # Set up signal handling
-    sig = SigHandler()
-
-    # Forward terminate signals to the subprocesses.
-    signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
-    signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
-    signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
-
-    active = 1
-    pids = set([s.pid for s in subprocesses])
-    while len(pids) > 0:
-        try:
-            (pid, status) = os.wait()
-        except OSError as e:
-            if e.errno == errno.EINTR:
-                pass
-            else:
-                raise
-        else:
-            pids.discard(pid)
-            if not taskp.get("task.ignore_rcode"):
-                rcode[pid] = (status >> 8)
-            else:
-                rcode[pid] = 0
-
-    if sig.sig is not None:
-        logger.critical("terminating on signal %s" % sig.sig)
-        sys.exit(2)
-    else:
-        for i in xrange(len(cmd)):
-            r = rcode[subprocesses[i].pid]
-            logger.info("%s completed with exit code %i (%s)" % (cmd[i][0], r, "success" if r == 0 else "failed"))
-
-except Exception as e:
-    logger.exception("caught exception")
-
-# restore default signal handlers.
-signal.signal(signal.SIGINT, signal.SIG_DFL)
-signal.signal(signal.SIGTERM, signal.SIG_DFL)
-signal.signal(signal.SIGQUIT, signal.SIG_DFL)
-
-logger.info("the following output files will be saved to keep:")
-
-subprocess.call(["find", "-L", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir)
-
-logger.info("start writing output to keep")
-
-if "task.vwd" in taskp and "task.foreach" in jobp:
-    for root, dirs, files in os.walk(outdir):
-        for f in files:
-            s = os.lstat(os.path.join(root, f))
-            if stat.S_ISLNK(s.st_mode):
-                os.unlink(os.path.join(root, f))
-
-(outcollection, checkin_error) = vwd.checkin(outdir)
-
-# Success if we ran any subprocess, and they all exited 0.
-success = rcode and all(status == 0 for status in rcode.itervalues()) and not checkin_error
-
-api.job_tasks().update(uuid=arvados.current_task()['uuid'],
-                                     body={
-                                         'output': outcollection.manifest_text(),
-                                         'success': success,
-                                         'progress':1.0
-                                     }).execute()
-
-sys.exit(0 if success else 1)
diff --git a/crunch_scripts/split-fastq.py b/crunch_scripts/split-fastq.py
deleted file mode 100755 (executable)
index 61c384f..0000000
+++ /dev/null
@@ -1,70 +0,0 @@
-#!/usr/bin/python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-import hashlib
-import string
-
-api = arvados.api('v1')
-
-piece = 0
-manifest_text = ""
-
-# Look for paired reads
-
-inp = arvados.CollectionReader(arvados.getjobparam('reads'))
-
-manifest_list = []
-
-def nextline(reader, start):
-    n = -1
-    while True:
-        r = reader.readfrom(start, 128)
-        if r == '':
-            break
-        n = string.find(r, "\n")
-        if n > -1:
-            break
-        else:
-            start += 128
-    return n
-
-prog = re.compile(r'(.*?)(_[12])?\.fastq(\.gz)?$')
-
-# Look for fastq files
-for s in inp.all_streams():
-    for f in s.all_files():
-        name_pieces = prog.match(f.name())
-        if name_pieces is not None:
-            if s.name() != ".":
-                # The downstream tool (run-command) only iterates over the top
-                # level of directories so if there are fastq files in
-                # directories in the input, the choice is either to forget
-                # there are directories (which might lead to name conflicts) or
-                # just fail.
-                print >>sys.stderr, "fastq must be at the root of the collection"
-                sys.exit(1)
-
-            p = None
-            if name_pieces.group(2) is not None:
-                if name_pieces.group(2) == "_1":
-                    p = [{}, {}]
-                    p[0]["reader"] = s.files()[name_pieces.group(0)]
-                    p[1]["reader"] = s.files()[name_pieces.group(1) + "_2.fastq" + (name_pieces.group(3) if name_pieces.group(3) else '')]
-            else:
-                p = [{}]
-                p[0]["reader"] = s.files()[name_pieces.group(0)]
-
-            if p is not None:
-                for i in xrange(0, len(p)):
-                    m = p[i]["reader"].as_manifest().split()
-                    m[0] = "./_" + str(piece)
-                    manifest_list.append(m)
-                piece += 1
-
-manifest_text = "\n".join(" ".join(m) for m in manifest_list) + "\n"
-
-arvados.current_task().set_output(manifest_text)
diff --git a/crunch_scripts/test/task_output_dir b/crunch_scripts/test/task_output_dir
deleted file mode 100755 (executable)
index 8b2c7ce..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import arvados.crunch
-import hashlib
-import os
-
-out = arvados.crunch.TaskOutputDir()
-
-string = open(__file__).read()
-with open(os.path.join(out.path, 'example.out'), 'w') as f:
-    f.write(string)
-with open(os.path.join(out.path, 'example.out.SHA1'), 'w') as f:
-    f.write(hashlib.sha1(string).hexdigest() + "\n")
-
-arvados.current_task().set_output(out.manifest_text())
index 07bbc33ab2f599245e68af092145f24130536761..b960d2e9e458d6c186b3f34a7b2f3db70e3faa1e 100644 (file)
@@ -4,4 +4,9 @@
 
 class Arvados::V1::JobTasksController < ApplicationController
   accept_attribute_as_json :parameters, Hash
+
+  def create
+    return send_error("Unsupported legacy jobs API",
+                      status: 400)
+  end
 end
index c3655272ddf69687899012b2da2dd1e293b06a4f..f6308c528f6beae6ad6e3824e6d15495edca3e13 100644 (file)
@@ -13,115 +13,28 @@ class Arvados::V1::JobsController < ApplicationController
   include DbCurrentTime
 
   def create
-    [:repository, :script, :script_version, :script_parameters].each do |r|
-      if !resource_attrs[r]
-        return send_error("#{r} attribute must be specified",
-                          status: :unprocessable_entity)
-      end
-    end
-
-    # We used to ask for the minimum_, exclude_, and no_reuse params
-    # in the job resource. Now we advertise them as flags that alter
-    # the behavior of the create action.
-    [:minimum_script_version, :exclude_script_versions].each do |attr|
-      if resource_attrs.has_key? attr
-        params[attr] = resource_attrs.delete attr
-      end
-    end
-    if resource_attrs.has_key? :no_reuse
-      params[:find_or_create] = !resource_attrs.delete(:no_reuse)
-    end
-
-    return super if !params[:find_or_create]
-    return if !load_filters_param
-
-    begin
-      @object = Job.find_reusable(resource_attrs, params, @filters, @read_users)
-    rescue ArgumentError => error
-      return send_error(error.message)
-    end
-
-    if @object
-      show
-    else
-      super
-    end
+    return send_error("Unsupported legacy jobs API",
+                      status: 400)
   end
 
   def cancel
-    reload_object_before_update
-    @object.cancel cascade: params[:cascade]
-    show
+    return send_error("Unsupported legacy jobs API",
+                      status: 400)
   end
 
   def lock
-    @object.lock current_user.uuid
-    show
-  end
-
-  class LogStreamer
-    Q_UPDATE_INTERVAL = 12
-    def initialize(job, opts={})
-      @job = job
-      @opts = opts
-    end
-    def each
-      if @job.finished_at
-        yield "#{@job.uuid} finished at #{@job.finished_at}\n"
-        return
-      end
-      while not @job.started_at
-        # send a summary (job queue + available nodes) to the client
-        # every few seconds while waiting for the job to start
-        current_time = db_current_time
-        last_ack_at ||= current_time - Q_UPDATE_INTERVAL - 1
-        if current_time - last_ack_at >= Q_UPDATE_INTERVAL
-          nodes_in_state = {idle: 0, alloc: 0}
-          ActiveRecord::Base.uncached do
-            Node.where('hostname is not ?', nil).collect do |n|
-              if n.info[:slurm_state]
-                nodes_in_state[n.info[:slurm_state]] ||= 0
-                nodes_in_state[n.info[:slurm_state]] += 1
-              end
-            end
-          end
-          job_queue = Job.queue.select(:uuid)
-          n_queued_before_me = 0
-          job_queue.each do |j|
-            break if j.uuid == @job.uuid
-            n_queued_before_me += 1
-          end
-          yield "#{db_current_time}" \
-            " job #{@job.uuid}" \
-            " queue_position #{n_queued_before_me}" \
-            " queue_size #{job_queue.count}" \
-            " nodes_idle #{nodes_in_state[:idle]}" \
-            " nodes_alloc #{nodes_in_state[:alloc]}\n"
-          last_ack_at = db_current_time
-        end
-        sleep 3
-        ActiveRecord::Base.uncached do
-          @job.reload
-        end
-      end
-    end
+    return send_error("Unsupported legacy jobs API",
+                      status: 400)
   end
 
   def queue
-    params[:order] ||= ['priority desc', 'created_at']
-    load_limit_offset_order_params
-    load_where_param
-    @where.merge!({state: Job::Queued})
-    return if !load_filters_param
-    find_objects_for_index
-    index
+    return send_error("Unsupported legacy jobs API",
+                      status: 400)
   end
 
   def queue_size
-    # Users may not be allowed to see all the jobs in the queue, so provide a
-    # method to get just the queue size in order to get a gist of how busy the
-    # cluster is.
-    render :json => {:queue_size => Job.queue.size}
+    return send_error("Unsupported legacy jobs API",
+                      status: 400)
   end
 
   def self._create_requires_parameters
index baffda1c99b96ad72e81879c1fff9d124ef2635c..166f71049b249606f3667045b07d2bea1c17639e 100644 (file)
@@ -7,9 +7,13 @@ class Arvados::V1::PipelineInstancesController < ApplicationController
   accept_attribute_as_json :properties, Hash
   accept_attribute_as_json :components_summary, Hash
 
+  def create
+    return send_error("Unsupported legacy jobs API",
+                      status: 400)
+  end
+
   def cancel
-    reload_object_before_update
-    @object.cancel cascade: params[:cascade]
-    show
+    return send_error("Unsupported legacy jobs API",
+                      status: 400)
   end
 end
index a276948d59de444ab0c13e9cdc97eaeca39b26d9..4a5e724ee64471df8bcd38db5e6e2193c05e244b 100644 (file)
@@ -4,4 +4,9 @@
 
 class Arvados::V1::PipelineTemplatesController < ApplicationController
   accept_attribute_as_json :components, Hash
+
+  def create
+    return send_error("Unsupported legacy jobs API",
+                      status: 400)
+  end
 end
diff --git a/services/api/lib/crunch_dispatch.rb b/services/api/lib/crunch_dispatch.rb
deleted file mode 100644 (file)
index 4e64018..0000000
+++ /dev/null
@@ -1,981 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-require 'open3'
-require 'shellwords'
-
-class CrunchDispatch
-  extend DbCurrentTime
-  include ApplicationHelper
-  include Process
-
-  EXIT_TEMPFAIL = 75
-  EXIT_RETRY_UNLOCKED = 93
-  RETRY_UNLOCKED_LIMIT = 3
-
-  class LogTime < Time
-    def to_s
-      self.utc.strftime "%Y-%m-%d_%H:%M:%S"
-    end
-  end
-
-  def initialize
-    @crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
-    if @crunch_job_bin.empty?
-      raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
-    end
-
-    @docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN']
-    @docker_run_args = ENV['CRUNCH_JOB_DOCKER_RUN_ARGS']
-    @cgroup_root = ENV['CRUNCH_CGROUP_ROOT']
-    @srun_sync_timeout = ENV['CRUNCH_SRUN_SYNC_TIMEOUT']
-
-    @arvados_internal = Rails.configuration.Containers.JobsAPI.GitInternalDir
-    if not File.exist? @arvados_internal
-      $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
-      raise "No internal git repository available" unless ($? == 0)
-    end
-
-    @repo_root = Rails.configuration.Git.Repositories
-    @arvados_repo_path = Repository.where(name: "arvados").first.server_path
-    @authorizations = {}
-    @did_recently = {}
-    @fetched_commits = {}
-    @git_tags = {}
-    @node_state = {}
-    @pipe_auth_tokens = {}
-    @running = {}
-    @todo = []
-    @todo_job_retries = {}
-    @job_retry_counts = Hash.new(0)
-    @todo_pipelines = []
-  end
-
-  def sysuser
-    return act_as_system_user
-  end
-
-  def refresh_todo
-    if @runoptions[:jobs]
-      @todo = @todo_job_retries.values + Job.queue.select(&:repository)
-    end
-    if @runoptions[:pipelines]
-      @todo_pipelines = PipelineInstance.queue
-    end
-  end
-
-  def each_slurm_line(cmd, outfmt, max_fields=nil)
-    max_fields ||= outfmt.split(":").size
-    max_fields += 1  # To accommodate the node field we add
-    @@slurm_version ||= Gem::Version.new(`sinfo --version`.match(/\b[\d\.]+\b/)[0])
-    if Gem::Version.new('2.3') <= @@slurm_version
-      `#{cmd} --noheader -o '%n:#{outfmt}'`.each_line do |line|
-        yield line.chomp.split(":", max_fields)
-      end
-    else
-      # Expand rows with hostname ranges (like "foo[1-3,5,9-12]:idle")
-      # into multiple rows with one hostname each.
-      `#{cmd} --noheader -o '%N:#{outfmt}'`.each_line do |line|
-        tokens = line.chomp.split(":", max_fields)
-        if (re = tokens[0].match(/^(.*?)\[([-,\d]+)\]$/))
-          tokens.shift
-          re[2].split(",").each do |range|
-            range = range.split("-").collect(&:to_i)
-            (range[0]..range[-1]).each do |n|
-              yield [re[1] + n.to_s] + tokens
-            end
-          end
-        else
-          yield tokens
-        end
-      end
-    end
-  end
-
-  def slurm_status
-    slurm_nodes = {}
-    each_slurm_line("sinfo", "%t") do |hostname, state|
-      # Treat nodes in idle* state as down, because the * means that slurm
-      # hasn't been able to communicate with it recently.
-      state.sub!(/^idle\*/, "down")
-      state.sub!(/\W+$/, "")
-      state = "down" unless %w(idle alloc comp mix drng down).include?(state)
-      slurm_nodes[hostname] = {state: state, job: nil}
-    end
-    each_slurm_line("squeue", "%j") do |hostname, job_uuid|
-      slurm_nodes[hostname][:job] = job_uuid if slurm_nodes[hostname]
-    end
-    slurm_nodes
-  end
-
-  def update_node_status
-    return unless Rails.configuration.Containers.JobsAPI.CrunchJobWrapper.to_s.match(/^slurm/)
-    slurm_status.each_pair do |hostname, slurmdata|
-      next if @node_state[hostname] == slurmdata
-      begin
-        node = Node.where('hostname=?', hostname).order(:last_ping_at).last
-        if node
-          $stderr.puts "dispatch: update #{hostname} state to #{slurmdata}"
-          node.info["slurm_state"] = slurmdata[:state]
-          node.job_uuid = slurmdata[:job]
-          if node.save
-            @node_state[hostname] = slurmdata
-          else
-            $stderr.puts "dispatch: failed to update #{node.uuid}: #{node.errors.messages}"
-          end
-        elsif slurmdata[:state] != 'down'
-          $stderr.puts "dispatch: SLURM reports '#{hostname}' is not down, but no node has that name"
-        end
-      rescue => error
-        $stderr.puts "dispatch: error updating #{hostname} node status: #{error}"
-      end
-    end
-  end
-
-  def positive_int(raw_value, default=nil)
-    value = begin raw_value.to_i rescue 0 end
-    if value > 0
-      value
-    else
-      default
-    end
-  end
-
-  NODE_CONSTRAINT_MAP = {
-    # Map Job runtime_constraints keys to the corresponding Node info key.
-    'min_ram_mb_per_node' => 'total_ram_mb',
-    'min_scratch_mb_per_node' => 'total_scratch_mb',
-    'min_cores_per_node' => 'total_cpu_cores',
-  }
-
-  def nodes_available_for_job_now(job)
-    # Find Nodes that satisfy a Job's runtime constraints (by building
-    # a list of Procs and using them to test each Node).  If there
-    # enough to run the Job, return an array of their names.
-    # Otherwise, return nil.
-    need_procs = NODE_CONSTRAINT_MAP.each_pair.map do |job_key, node_key|
-      Proc.new do |node|
-        positive_int(node.properties[node_key], 0) >=
-          positive_int(job.runtime_constraints[job_key], 0)
-      end
-    end
-    min_node_count = positive_int(job.runtime_constraints['min_nodes'], 1)
-    usable_nodes = []
-    Node.all.select do |node|
-      node.info['slurm_state'] == 'idle'
-    end.sort_by do |node|
-      # Prefer nodes with no price, then cheap nodes, then expensive nodes
-      node.properties['cloud_node']['price'].to_f rescue 0
-    end.each do |node|
-      if need_procs.select { |need_proc| not need_proc.call(node) }.any?
-        # At least one runtime constraint is not satisfied by this node
-        next
-      end
-      usable_nodes << node
-      if usable_nodes.count >= min_node_count
-        hostnames = usable_nodes.map(&:hostname)
-        log_nodes = usable_nodes.map do |n|
-          "#{n.hostname} #{n.uuid} #{n.properties.to_json}"
-        end
-        log_job = "#{job.uuid} #{job.runtime_constraints}"
-        log_text = "dispatching job #{log_job} to #{log_nodes.join(", ")}"
-        $stderr.puts log_text
-        begin
-          act_as_system_user do
-            Log.new(object_uuid: job.uuid,
-                    event_type: 'dispatch',
-                    owner_uuid: system_user_uuid,
-                    summary: "dispatching to #{hostnames.join(", ")}",
-                    properties: {'text' => log_text}).save!
-          end
-        rescue => e
-          $stderr.puts "dispatch: log.create failed: #{e}"
-        end
-        return hostnames
-      end
-    end
-    nil
-  end
-
-  def nodes_available_for_job(job)
-    # Check if there are enough idle nodes with the Job's minimum
-    # hardware requirements to run it.  If so, return an array of
-    # their names.  If not, up to once per hour, signal start_jobs to
-    # hold off launching Jobs.  This delay is meant to give the Node
-    # Manager an opportunity to make new resources available for new
-    # Jobs.
-    #
-    # The exact timing parameters here might need to be adjusted for
-    # the best balance between helping the longest-waiting Jobs run,
-    # and making efficient use of immediately available resources.
-    # These are all just first efforts until we have more data to work
-    # with.
-    nodelist = nodes_available_for_job_now(job)
-    if nodelist.nil? and not did_recently(:wait_for_available_nodes, 3600)
-      $stderr.puts "dispatch: waiting for nodes for #{job.uuid}"
-      @node_wait_deadline = Time.now + 5.minutes
-    end
-    nodelist
-  end
-
-  def fail_job job, message, skip_lock: false
-    $stderr.puts "dispatch: #{job.uuid}: #{message}"
-    begin
-      Log.new(object_uuid: job.uuid,
-              event_type: 'dispatch',
-              owner_uuid: job.owner_uuid,
-              summary: message,
-              properties: {"text" => message}).save!
-    rescue => e
-      $stderr.puts "dispatch: log.create failed: #{e}"
-    end
-
-    if not skip_lock and not have_job_lock?(job)
-      begin
-        job.lock @authorizations[job.uuid].user.uuid
-      rescue ArvadosModel::AlreadyLockedError
-        $stderr.puts "dispatch: tried to mark job #{job.uuid} as failed but it was already locked by someone else"
-        return
-      end
-    end
-
-    job.state = "Failed"
-    if not job.save
-      $stderr.puts "dispatch: save failed setting job #{job.uuid} to failed"
-    end
-  end
-
-  def stdout_s(cmd_a, opts={})
-    IO.popen(cmd_a, "r", opts) do |pipe|
-      return pipe.read.chomp
-    end
-  end
-
-  def git_cmd(*cmd_a)
-    ["git", "--git-dir=#{@arvados_internal}"] + cmd_a
-  end
-
-  def get_authorization(job)
-    if @authorizations[job.uuid] and
-        @authorizations[job.uuid].user.uuid != job.modified_by_user_uuid
-      # We already made a token for this job, but we need a new one
-      # because modified_by_user_uuid has changed (the job will run
-      # as a different user).
-      @authorizations[job.uuid].update_attributes expires_at: Time.now
-      @authorizations[job.uuid] = nil
-    end
-    if not @authorizations[job.uuid]
-      auth = ApiClientAuthorization.
-        new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
-            api_client_id: 0)
-      if not auth.save
-        $stderr.puts "dispatch: auth.save failed for #{job.uuid}"
-      else
-        @authorizations[job.uuid] = auth
-      end
-    end
-    @authorizations[job.uuid]
-  end
-
-  def internal_repo_has_commit? sha1
-    if (not @fetched_commits[sha1] and
-        sha1 == stdout_s(git_cmd("rev-list", "-n1", sha1), err: "/dev/null") and
-        $? == 0)
-      @fetched_commits[sha1] = true
-    end
-    return @fetched_commits[sha1]
-  end
-
-  def get_commit src_repo, sha1
-    return true if internal_repo_has_commit? sha1
-
-    # commit does not exist in internal repository, so import the
-    # source repository using git fetch-pack
-    cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo)
-    $stderr.puts "dispatch: #{cmd}"
-    $stderr.puts(stdout_s(cmd))
-    @fetched_commits[sha1] = ($? == 0)
-  end
-
-  def tag_commit(job, commit_hash, tag_name)
-    # @git_tags[T]==V if we know commit V has been tagged T in the
-    # arvados_internal repository.
-    if not @git_tags[tag_name]
-      cmd = git_cmd("tag", tag_name, commit_hash)
-      $stderr.puts "dispatch: #{cmd}"
-      $stderr.puts(stdout_s(cmd, err: "/dev/null"))
-      unless $? == 0
-        # git tag failed.  This may be because the tag already exists, so check for that.
-        tag_rev = stdout_s(git_cmd("rev-list", "-n1", tag_name))
-        if $? == 0
-          # We got a revision back
-          if tag_rev != commit_hash
-            # Uh oh, the tag doesn't point to the revision we were expecting.
-            # Someone has been monkeying with the job record and/or git.
-            fail_job job, "Existing tag #{tag_name} points to commit #{tag_rev} but expected commit #{commit_hash}"
-            return nil
-          end
-          # we're okay (fall through to setting @git_tags below)
-        else
-          # git rev-list failed for some reason.
-          fail_job job, "'git tag' for #{tag_name} failed but did not find any existing tag using 'git rev-list'"
-          return nil
-        end
-      end
-      # 'git tag' was successful, or there is an existing tag that points to the same revision.
-      @git_tags[tag_name] = commit_hash
-    elsif @git_tags[tag_name] != commit_hash
-      fail_job job, "Existing tag #{tag_name} points to commit #{@git_tags[tag_name]} but this job uses commit #{commit_hash}"
-      return nil
-    end
-    @git_tags[tag_name]
-  end
-
-  def start_jobs
-    @todo.each do |job|
-      next if @running[job.uuid]
-
-      cmd_args = nil
-      case Rails.configuration.Containers.JobsAPI.CrunchJobWrapper
-      when "none"
-        if @running.size > 0
-            # Don't run more than one at a time.
-            return
-        end
-        cmd_args = []
-      when "slurm_immediate"
-        nodelist = nodes_available_for_job(job)
-        if nodelist.nil?
-          if Time.now < @node_wait_deadline
-            break
-          else
-            next
-          end
-        end
-        cmd_args = ["salloc",
-                    "--chdir=/",
-                    "--immediate",
-                    "--exclusive",
-                    "--no-kill",
-                    "--job-name=#{job.uuid}",
-                    "--nodelist=#{nodelist.join(',')}"]
-      else
-        raise "Unknown crunch_job_wrapper: #{Rails.configuration.Containers.JobsAPI.CrunchJobWrapper}"
-      end
-
-      cmd_args = sudo_preface + cmd_args
-
-      next unless get_authorization job
-
-      ready = internal_repo_has_commit? job.script_version
-
-      if not ready
-        # Import the commit from the specified repository into the
-        # internal repository. This should have been done already when
-        # the job was created/updated; this code is obsolete except to
-        # avoid deployment races. Failing the job would be a
-        # reasonable thing to do at this point.
-        repo = Repository.where(name: job.repository).first
-        if repo.nil? or repo.server_path.nil?
-          fail_job job, "Repository #{job.repository} not found under #{@repo_root}"
-          next
-        end
-        ready &&= get_commit repo.server_path, job.script_version
-        ready &&= tag_commit job, job.script_version, job.uuid
-      end
-
-      # This should be unnecessary, because API server does it during
-      # job create/update, but it's still not a bad idea to verify the
-      # tag is correct before starting the job:
-      ready &&= tag_commit job, job.script_version, job.uuid
-
-      # The arvados_sdk_version doesn't support use of arbitrary
-      # remote URLs, so the requested version isn't necessarily copied
-      # into the internal repository yet.
-      if job.arvados_sdk_version
-        ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version
-        ready &&= tag_commit job, job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
-      end
-
-      if not ready
-        fail_job job, "commit not present in internal repository"
-        next
-      end
-
-      cmd_args += [@crunch_job_bin,
-                   '--job-api-token', @authorizations[job.uuid].api_token,
-                   '--job', job.uuid,
-                   '--git-dir', @arvados_internal]
-
-      if @cgroup_root
-        cmd_args += ['--cgroup-root', @cgroup_root]
-      end
-
-      if @docker_bin
-        cmd_args += ['--docker-bin', @docker_bin]
-      end
-
-      if @docker_run_args
-        cmd_args += ['--docker-run-args', @docker_run_args]
-      end
-
-      if @srun_sync_timeout
-        cmd_args += ['--srun-sync-timeout', @srun_sync_timeout]
-      end
-
-      if have_job_lock?(job)
-        cmd_args << "--force-unlock"
-      end
-
-      $stderr.puts "dispatch: #{cmd_args.join ' '}"
-
-      begin
-        i, o, e, t = Open3.popen3(*cmd_args)
-      rescue
-        $stderr.puts "dispatch: popen3: #{$!}"
-        # This is a dispatch problem like "Too many open files";
-        # retrying another job right away would be futile. Just return
-        # and hope things are better next time, after (at least) a
-        # did_recently() delay.
-        return
-      end
-
-      $stderr.puts "dispatch: job #{job.uuid}"
-      start_banner = "dispatch: child #{t.pid} start #{LogTime.now}"
-      $stderr.puts start_banner
-
-      @running[job.uuid] = {
-        stdin: i,
-        stdout: o,
-        stderr: e,
-        wait_thr: t,
-        job: job,
-        buf: {stderr: '', stdout: ''},
-        started: false,
-        sent_int: 0,
-        job_auth: @authorizations[job.uuid],
-        stderr_buf_to_flush: '',
-        stderr_flushed_at: Time.new(0),
-        bytes_logged: 0,
-        events_logged: 0,
-        log_throttle_is_open: true,
-        log_throttle_reset_time: Time.now + Rails.configuration.Containers.Logging.LogThrottlePeriod,
-        log_throttle_bytes_so_far: 0,
-        log_throttle_lines_so_far: 0,
-        log_throttle_bytes_skipped: 0,
-        log_throttle_partial_line_last_at: Time.new(0),
-        log_throttle_first_partial_line: true,
-      }
-      i.close
-      @todo_job_retries.delete(job.uuid)
-      update_node_status
-    end
-  end
-
-  # Test for hard cap on total output and for log throttling.  Returns whether
-  # the log line should go to output or not.  Modifies "line" in place to
-  # replace it with an error if a logging limit is tripped.
-  def rate_limit running_job, line
-    message = false
-    linesize = line.size
-    if running_job[:log_throttle_is_open]
-      partial_line = false
-      skip_counts = false
-      matches = line.match(/^\S+ \S+ \d+ \d+ stderr (.*)/)
-      if matches and matches[1] and matches[1].start_with?('[...]') and matches[1].end_with?('[...]')
-        partial_line = true
-        if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.Containers.Logging.LogPartialLineThrottlePeriod
-          running_job[:log_throttle_partial_line_last_at] = Time.now
-        else
-          skip_counts = true
-        end
-      end
-
-      if !skip_counts
-        running_job[:log_throttle_lines_so_far] += 1
-        running_job[:log_throttle_bytes_so_far] += linesize
-        running_job[:bytes_logged] += linesize
-      end
-
-      if (running_job[:bytes_logged] >
-          Rails.configuration.Containers.Logging.LimitLogBytesPerJob)
-        message = "Exceeded log limit #{Rails.configuration.Containers.Logging.LimitLogBytesPerJob} bytes (LimitLogBytesPerJob). Log will be truncated."
-        running_job[:log_throttle_reset_time] = Time.now + 100.years
-        running_job[:log_throttle_is_open] = false
-
-      elsif (running_job[:log_throttle_bytes_so_far] >
-             Rails.configuration.Containers.Logging.LogThrottleBytes)
-        remaining_time = running_job[:log_throttle_reset_time] - Time.now
-        message = "Exceeded rate #{Rails.configuration.Containers.Logging.LogThrottleBytes} bytes per #{Rails.configuration.Containers.Logging.LogThrottlePeriod} seconds (LogThrottleBytes). Logging will be silenced for the next #{remaining_time.round} seconds."
-        running_job[:log_throttle_is_open] = false
-
-      elsif (running_job[:log_throttle_lines_so_far] >
-             Rails.configuration.Containers.Logging.LogThrottleLines)
-        remaining_time = running_job[:log_throttle_reset_time] - Time.now
-        message = "Exceeded rate #{Rails.configuration.Containers.Logging.LogThrottleLines} lines per #{Rails.configuration.Containers.Logging.LogThrottlePeriod} seconds (LogThrottleLines), logging will be silenced for the next #{remaining_time.round} seconds."
-        running_job[:log_throttle_is_open] = false
-
-      elsif partial_line and running_job[:log_throttle_first_partial_line]
-        running_job[:log_throttle_first_partial_line] = false
-        message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.Containers.Logging.LogPartialLineThrottlePeriod} seconds."
-      end
-    end
-
-    if not running_job[:log_throttle_is_open]
-      # Don't log anything if any limit has been exceeded. Just count lossage.
-      running_job[:log_throttle_bytes_skipped] += linesize
-    end
-
-    if message
-      # Yes, write to logs, but use our "rate exceeded" message
-      # instead of the log message that exceeded the limit.
-      message += " A complete log is still being written to Keep, and will be available when the job finishes.\n"
-      line.replace message
-      true
-    elsif partial_line
-      false
-    else
-      running_job[:log_throttle_is_open]
-    end
-  end
-
-  def read_pipes
-    @running.each do |job_uuid, j|
-      now = Time.now
-      if now > j[:log_throttle_reset_time]
-        # It has been more than throttle_period seconds since the last
-        # checkpoint so reset the throttle
-        if j[:log_throttle_bytes_skipped] > 0
-          message = "#{job_uuid} ! Skipped #{j[:log_throttle_bytes_skipped]} bytes of log"
-          $stderr.puts message
-          j[:stderr_buf_to_flush] << "#{LogTime.now} #{message}\n"
-        end
-
-        j[:log_throttle_reset_time] = now + Rails.configuration.Containers.Logging.LogThrottlePeriod
-        j[:log_throttle_bytes_so_far] = 0
-        j[:log_throttle_lines_so_far] = 0
-        j[:log_throttle_bytes_skipped] = 0
-        j[:log_throttle_is_open] = true
-        j[:log_throttle_partial_line_last_at] = Time.new(0)
-        j[:log_throttle_first_partial_line] = true
-      end
-
-      j[:buf].each do |stream, streambuf|
-        # Read some data from the child stream
-        buf = ''
-        begin
-          # It's important to use a big enough buffer here. When we're
-          # being flooded with logs, we must read and discard many
-          # bytes at once. Otherwise, we can easily peg a CPU with
-          # time-checking and other loop overhead. (Quick tests show a
-          # 1MiB buffer working 2.5x as fast as a 64 KiB buffer.)
-          #
-          # So don't reduce this buffer size!
-          buf = j[stream].read_nonblock(2**20)
-        rescue Errno::EAGAIN, EOFError
-        end
-
-        # Short circuit the counting code if we're just going to throw
-        # away the data anyway.
-        if not j[:log_throttle_is_open]
-          j[:log_throttle_bytes_skipped] += streambuf.size + buf.size
-          streambuf.replace ''
-          next
-        elsif buf == ''
-          next
-        end
-
-        # Append to incomplete line from previous read, if any
-        streambuf << buf
-
-        bufend = ''
-        streambuf.each_line do |line|
-          if not line.end_with? $/
-            if line.size > Rails.configuration.Containers.Logging.LogThrottleBytes
-              # Without a limit here, we'll use 2x an arbitrary amount
-              # of memory, and waste a lot of time copying strings
-              # around, all without providing any feedback to anyone
-              # about what's going on _or_ hitting any of our throttle
-              # limits.
-              #
-              # Here we leave "line" alone, knowing it will never be
-              # sent anywhere: rate_limit() will reach
-              # crunch_log_throttle_bytes immediately. However, we'll
-              # leave [...] in bufend: if the trailing end of the long
-              # line does end up getting sent anywhere, it will have
-              # some indication that it is incomplete.
-              bufend = "[...]"
-            else
-              # If line length is sane, we'll wait for the rest of the
-              # line to appear in the next read_pipes() call.
-              bufend = line
-              break
-            end
-          end
-          # rate_limit returns true or false as to whether to actually log
-          # the line or not.  It also modifies "line" in place to replace
-          # it with an error if a logging limit is tripped.
-          if rate_limit j, line
-            $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
-            $stderr.puts line
-            pub_msg = "#{LogTime.now} #{line.strip}\n"
-            j[:stderr_buf_to_flush] << pub_msg
-          end
-        end
-
-        # Leave the trailing incomplete line (if any) in streambuf for
-        # next time.
-        streambuf.replace bufend
-      end
-      # Flush buffered logs to the logs table, if appropriate. We have
-      # to do this even if we didn't collect any new logs this time:
-      # otherwise, buffered data older than seconds_between_events
-      # won't get flushed until new data arrives.
-      write_log j
-    end
-  end
-
-  def reap_children
-    return if 0 == @running.size
-    pid_done = nil
-    j_done = nil
-
-    @running.each do |uuid, j|
-      if !j[:wait_thr].status
-        pid_done = j[:wait_thr].pid
-        j_done = j
-        break
-      end
-    end
-
-    return if !pid_done
-
-    job_done = j_done[:job]
-
-    # Ensure every last drop of stdout and stderr is consumed.
-    read_pipes
-    # Reset flush timestamp to make sure log gets written.
-    j_done[:stderr_flushed_at] = Time.new(0)
-    # Write any remaining logs.
-    write_log j_done
-
-    j_done[:buf].each do |stream, streambuf|
-      if streambuf != ''
-        $stderr.puts streambuf + "\n"
-      end
-    end
-
-    # Wait the thread (returns a Process::Status)
-    exit_status = j_done[:wait_thr].value.exitstatus
-    exit_tempfail = exit_status == EXIT_TEMPFAIL
-
-    $stderr.puts "dispatch: child #{pid_done} exit #{exit_status}"
-    $stderr.puts "dispatch: job #{job_done.uuid} end"
-
-    jobrecord = Job.find_by_uuid(job_done.uuid)
-
-    if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid)
-      $stderr.puts("dispatch: job #{jobrecord.uuid} was interrupted by node failure")
-      # Only this crunch-dispatch process can retry the job:
-      # it's already locked, and there's no way to put it back in the
-      # Queued state.  Put it in our internal todo list unless the job
-      # has failed this way excessively.
-      @job_retry_counts[jobrecord.uuid] += 1
-      exit_tempfail = @job_retry_counts[jobrecord.uuid] <= RETRY_UNLOCKED_LIMIT
-      do_what_next = "give up now"
-      if exit_tempfail
-        @todo_job_retries[jobrecord.uuid] = jobrecord
-        do_what_next = "re-attempt"
-      end
-      $stderr.puts("dispatch: job #{jobrecord.uuid} has been interrupted " +
-                   "#{@job_retry_counts[jobrecord.uuid]}x, will #{do_what_next}")
-    end
-
-    if !exit_tempfail
-      @job_retry_counts.delete(jobrecord.uuid)
-      if jobrecord.state == "Running"
-        # Apparently there was an unhandled error.  That could potentially
-        # include "all allocated nodes failed" when we don't to retry
-        # because the job has already been retried RETRY_UNLOCKED_LIMIT
-        # times.  Fail the job.
-        jobrecord.state = "Failed"
-        if not jobrecord.save
-          $stderr.puts "dispatch: jobrecord.save failed"
-        end
-      end
-    else
-      # If the job failed to run due to an infrastructure
-      # issue with crunch-job or slurm, we want the job to stay in the
-      # queue. If crunch-job exited after losing a race to another
-      # crunch-job process, it exits 75 and we should leave the job
-      # record alone so the winner of the race can do its thing.
-      # If crunch-job exited after all of its allocated nodes failed,
-      # it exits 93, and we want to retry it later (see the
-      # EXIT_RETRY_UNLOCKED `if` block).
-      #
-      # There is still an unhandled race condition: If our crunch-job
-      # process is about to lose a race with another crunch-job
-      # process, but crashes before getting to its "exit 75" (for
-      # example, "cannot fork" or "cannot reach API server") then we
-      # will assume incorrectly that it's our process's fault
-      # jobrecord.started_at is non-nil, and mark the job as failed
-      # even though the winner of the race is probably still doing
-      # fine.
-    end
-
-    # Invalidate the per-job auth token, unless the job is still queued and we
-    # might want to try it again.
-    if jobrecord.state != "Queued" and !@todo_job_retries.include?(jobrecord.uuid)
-      j_done[:job_auth].update_attributes expires_at: Time.now
-    end
-
-    @running.delete job_done.uuid
-  end
-
-  def update_pipelines
-    expire_tokens = @pipe_auth_tokens.dup
-    @todo_pipelines.each do |p|
-      pipe_auth = (@pipe_auth_tokens[p.uuid] ||= ApiClientAuthorization.
-                   create(user: User.where('uuid=?', p.modified_by_user_uuid).first,
-                          api_client_id: 0))
-      puts `export ARVADOS_API_TOKEN=#{pipe_auth.api_token} && arv-run-pipeline-instance --run-pipeline-here --no-wait --instance #{p.uuid}`
-      expire_tokens.delete p.uuid
-    end
-
-    expire_tokens.each do |k, v|
-      v.update_attributes expires_at: Time.now
-      @pipe_auth_tokens.delete k
-    end
-  end
-
-  def parse_argv argv
-    @runoptions = {}
-    (argv.any? ? argv : ['--jobs', '--pipelines']).each do |arg|
-      case arg
-      when '--jobs'
-        @runoptions[:jobs] = true
-      when '--pipelines'
-        @runoptions[:pipelines] = true
-      else
-        abort "Unrecognized command line option '#{arg}'"
-      end
-    end
-    if not (@runoptions[:jobs] or @runoptions[:pipelines])
-      abort "Nothing to do. Please specify at least one of: --jobs, --pipelines."
-    end
-  end
-
-  def run argv
-    parse_argv argv
-
-    # We want files written by crunch-dispatch to be writable by other
-    # processes with the same GID, see bug #7228
-    File.umask(0002)
-
-    # This is how crunch-job child procs know where the "refresh"
-    # trigger file is
-    ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.Containers.JobsAPI.CrunchRefreshTrigger
-
-    # If salloc can't allocate resources immediately, make it use our
-    # temporary failure exit code.  This ensures crunch-dispatch won't
-    # mark a job failed because of an issue with node allocation.
-    # This often happens when another dispatcher wins the race to
-    # allocate nodes.
-    ENV["SLURM_EXIT_IMMEDIATE"] = CrunchDispatch::EXIT_TEMPFAIL.to_s
-
-    if ENV["CRUNCH_DISPATCH_LOCKFILE"]
-      lockfilename = ENV.delete "CRUNCH_DISPATCH_LOCKFILE"
-      lockfile = File.open(lockfilename, File::RDWR|File::CREAT, 0644)
-      unless lockfile.flock File::LOCK_EX|File::LOCK_NB
-        abort "Lock unavailable on #{lockfilename} - exit"
-      end
-    end
-
-    @signal = {}
-    %w{TERM INT}.each do |sig|
-      signame = sig
-      Signal.trap(sig) do
-        $stderr.puts "Received #{signame} signal"
-        @signal[:term] = true
-      end
-    end
-
-    act_as_system_user
-    User.first.group_permissions
-    $stderr.puts "dispatch: ready"
-    while !@signal[:term] or @running.size > 0
-      read_pipes
-      if @signal[:term]
-        @running.each do |uuid, j|
-          if !j[:started] and j[:sent_int] < 2
-            begin
-              Process.kill 'INT', j[:wait_thr].pid
-            rescue Errno::ESRCH
-              # No such pid = race condition + desired result is
-              # already achieved
-            end
-            j[:sent_int] += 1
-          end
-        end
-      else
-        refresh_todo unless did_recently(:refresh_todo, 1.0)
-        update_node_status unless did_recently(:update_node_status, 1.0)
-        unless @todo.empty? or did_recently(:start_jobs, 1.0) or @signal[:term]
-          start_jobs
-        end
-        unless (@todo_pipelines.empty? and @pipe_auth_tokens.empty?) or did_recently(:update_pipelines, 5.0)
-          update_pipelines
-        end
-        unless did_recently('check_orphaned_slurm_jobs', 60)
-          check_orphaned_slurm_jobs
-        end
-      end
-      reap_children
-      select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten,
-             [], [], 1)
-    end
-    # If there are jobs we wanted to retry, we have to mark them as failed now.
-    # Other dispatchers can't pick them up because we hold their lock.
-    @todo_job_retries.each_key do |job_uuid|
-      job = Job.find_by_uuid(job_uuid)
-      if job.state == "Running"
-        fail_job(job, "crunch-dispatch was stopped during job's tempfail retry loop")
-      end
-    end
-  end
-
-  def fail_jobs before: nil
-    act_as_system_user do
-      threshold = nil
-      if before == 'reboot'
-        boottime = nil
-        open('/proc/stat').map(&:split).each do |stat, t|
-          if stat == 'btime'
-            boottime = t
-          end
-        end
-        if not boottime
-          raise "Could not find btime in /proc/stat"
-        end
-        threshold = Time.at(boottime.to_i)
-      elsif before
-        threshold = Time.parse(before, Time.now)
-      else
-        threshold = db_current_time
-      end
-      Rails.logger.info "fail_jobs: threshold is #{threshold}"
-
-      squeue = squeue_jobs
-      Job.where('state = ? and started_at < ?', Job::Running, threshold).
-        each do |job|
-        Rails.logger.debug "fail_jobs: #{job.uuid} started #{job.started_at}"
-        squeue.each do |slurm_name|
-          if slurm_name == job.uuid
-            Rails.logger.info "fail_jobs: scancel #{job.uuid}"
-            scancel slurm_name
-          end
-        end
-        fail_job(job, "cleaned up stale job: started before #{threshold}",
-                 skip_lock: true)
-      end
-    end
-  end
-
-  def check_orphaned_slurm_jobs
-    act_as_system_user do
-      squeue_uuids = squeue_jobs.select{|uuid| uuid.match(/^[0-9a-z]{5}-8i9sb-[0-9a-z]{15}$/)}.
-                                  select{|uuid| !@running.has_key?(uuid)}
-
-      return if squeue_uuids.size == 0
-
-      scancel_uuids = squeue_uuids - Job.where('uuid in (?) and (state in (?) or modified_at>?)',
-                                               squeue_uuids,
-                                               ['Running', 'Queued'],
-                                               (Time.now - 60)).
-                                         collect(&:uuid)
-      scancel_uuids.each do |uuid|
-        Rails.logger.info "orphaned job: scancel #{uuid}"
-        scancel uuid
-      end
-    end
-  end
-
-  def sudo_preface
-    return [] if not Rails.configuration.Containers.JobsAPI.CrunchJobUser
-    ["sudo", "-E", "-u",
-     Rails.configuration.Containers.JobsAPI.CrunchJobUser,
-     "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
-     "PATH=#{ENV['PATH']}",
-     "PERLLIB=#{ENV['PERLLIB']}",
-     "PYTHONPATH=#{ENV['PYTHONPATH']}",
-     "RUBYLIB=#{ENV['RUBYLIB']}",
-     "GEM_PATH=#{ENV['GEM_PATH']}"]
-  end
-
-  protected
-
-  def have_job_lock?(job)
-    # Return true if the given job is locked by this crunch-dispatch, normally
-    # because we've run crunch-job for it.
-    @todo_job_retries.include?(job.uuid)
-  end
-
-  def did_recently(thing, min_interval)
-    if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
-      @did_recently[thing] = Time.now
-      false
-    else
-      true
-    end
-  end
-
-  # send message to log table. we want these records to be transient
-  def write_log running_job
-    return if running_job[:stderr_buf_to_flush] == ''
-
-    # Send out to log event if buffer size exceeds the bytes per event or if
-    # it has been at least crunch_log_seconds_between_events seconds since
-    # the last flush.
-    if running_job[:stderr_buf_to_flush].size > Rails.configuration.Containers.Logging.LogBytesPerEvent or
-        (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.Containers.Logging.LogSecondsBetweenEvents
-      begin
-        log = Log.new(object_uuid: running_job[:job].uuid,
-                      event_type: 'stderr',
-                      owner_uuid: running_job[:job].owner_uuid,
-                      properties: {"text" => running_job[:stderr_buf_to_flush]})
-        log.save!
-        running_job[:events_logged] += 1
-      rescue => exception
-        $stderr.puts "Failed to write logs"
-        $stderr.puts exception.backtrace
-      end
-      running_job[:stderr_buf_to_flush] = ''
-      running_job[:stderr_flushed_at] = Time.now
-    end
-  end
-
-  # An array of job_uuids in squeue
-  def squeue_jobs
-    if Rails.configuration.Containers.JobsAPI.CrunchJobWrapper == "slurm_immediate"
-      p = IO.popen(['squeue', '-a', '-h', '-o', '%j'])
-      begin
-        p.readlines.map {|line| line.strip}
-      ensure
-        p.close
-      end
-    else
-      []
-    end
-  end
-
-  def scancel slurm_name
-    cmd = sudo_preface + ['scancel', '-n', slurm_name]
-    IO.popen(cmd) do |scancel_pipe|
-      puts scancel_pipe.read
-    end
-    if not $?.success?
-      Rails.logger.error "scancel #{slurm_name.shellescape}: $?"
-    end
-  end
-end
diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb
deleted file mode 100755 (executable)
index 38bd54b..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-#!/usr/bin/env ruby
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-dispatch_argv = []
-ARGV.reject! do |arg|
-  dispatch_argv.push(arg) if /^--/ =~ arg
-end
-
-ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development"
-require File.dirname(__FILE__) + '/../config/boot'
-require File.dirname(__FILE__) + '/../config/environment'
-require './lib/crunch_dispatch.rb'
-
-CrunchDispatch.new.run dispatch_argv
diff --git a/services/api/script/crunch_failure_report.py b/services/api/script/crunch_failure_report.py
deleted file mode 100755 (executable)
index 83217d8..0000000
+++ /dev/null
@@ -1,222 +0,0 @@
-#! /usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-import argparse
-import datetime
-import json
-import re
-import sys
-
-import arvados
-
-# Useful configuration variables:
-
-# Number of log lines to use as context in diagnosing failure.
-LOG_CONTEXT_LINES = 10
-
-# Regex that signifies a failed task.
-FAILED_TASK_REGEX = re.compile(' \d+ failure (.*permanent)')
-
-# Regular expressions used to classify failure types.
-JOB_FAILURE_TYPES = {
-    'sys/docker': 'Cannot destroy container',
-    'crunch/node': 'User not found on host',
-    'slurm/comm':  'Communication connection failure'
-}
-
-def parse_arguments(arguments):
-    arg_parser = argparse.ArgumentParser(
-        description='Produce a report of Crunch failures within a specified time range')
-
-    arg_parser.add_argument(
-        '--start',
-        help='Start date and time')
-    arg_parser.add_argument(
-        '--end',
-        help='End date and time')
-
-    args = arg_parser.parse_args(arguments)
-
-    if args.start and not is_valid_timestamp(args.start):
-        raise ValueError(args.start)
-    if args.end and not is_valid_timestamp(args.end):
-        raise ValueError(args.end)
-
-    return args
-
-
-def api_timestamp(when=None):
-    """Returns a string representing the timestamp 'when' in a format
-    suitable for delivering to the API server.  Defaults to the
-    current time.
-    """
-    if when is None:
-        when = datetime.datetime.utcnow()
-    return when.strftime("%Y-%m-%dT%H:%M:%SZ")
-
-
-def is_valid_timestamp(ts):
-    return re.match(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z', ts)
-
-
-def jobs_created_between_dates(api, start, end):
-    return arvados.util.list_all(
-        api.jobs().list,
-        filters=json.dumps([ ['created_at', '>=', start],
-                             ['created_at', '<=', end] ]))
-
-
-def job_logs(api, job):
-    # Returns the contents of the log for this job (as an array of lines).
-    if job['log']:
-        log_collection = arvados.CollectionReader(job['log'], api)
-        log_filename = "{}.log.txt".format(job['uuid'])
-        return log_collection.open(log_filename).readlines()
-    return []
-
-
-user_names = {}
-def job_user_name(api, user_uuid):
-    def _lookup_user_name(api, user_uuid):
-        try:
-            return api.users().get(uuid=user_uuid).execute()['full_name']
-        except arvados.errors.ApiError:
-            return user_uuid
-
-    if user_uuid not in user_names:
-        user_names[user_uuid] = _lookup_user_name(api, user_uuid)
-    return user_names[user_uuid]
-
-
-job_pipeline_names = {}
-def job_pipeline_name(api, job_uuid):
-    def _lookup_pipeline_name(api, job_uuid):
-        try:
-            pipelines = api.pipeline_instances().list(
-                filters='[["components", "like", "%{}%"]]'.format(job_uuid)).execute()
-            pi = pipelines['items'][0]
-            if pi['name']:
-                return pi['name']
-            else:
-                # Use the pipeline template name
-                pt = api.pipeline_templates().get(uuid=pi['pipeline_template_uuid']).execute()
-                return pt['name']
-        except (TypeError, ValueError, IndexError):
-            return ""
-
-    if job_uuid not in job_pipeline_names:
-        job_pipeline_names[job_uuid] = _lookup_pipeline_name(api, job_uuid)
-    return job_pipeline_names[job_uuid]
-
-
-def is_failed_task(logline):
-    return FAILED_TASK_REGEX.search(logline) != None
-
-
-def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
-    args = parse_arguments(arguments)
-
-    api = arvados.api('v1')
-
-    now = datetime.datetime.utcnow()
-    start_time = args.start or api_timestamp(now - datetime.timedelta(days=1))
-    end_time = args.end or api_timestamp(now)
-
-    # Find all jobs created within the specified window,
-    # and their corresponding job logs.
-    jobs_created = jobs_created_between_dates(api, start_time, end_time)
-    jobs_by_state = {}
-    for job in jobs_created:
-        jobs_by_state.setdefault(job['state'], [])
-        jobs_by_state[job['state']].append(job)
-
-    # Find failed jobs and record the job failure text.
-
-    # failure_stats maps failure types (e.g. "sys/docker") to
-    # a set of job UUIDs that failed for that reason.
-    failure_stats = {}
-    for job in jobs_by_state['Failed']:
-        job_uuid = job['uuid']
-        logs = job_logs(api, job)
-        # Find the first permanent task failure, and collect the
-        # preceding log lines.
-        failure_type = None
-        for i, lg in enumerate(logs):
-            if is_failed_task(lg):
-                # Get preceding log record to provide context.
-                log_start = i - LOG_CONTEXT_LINES if i >= LOG_CONTEXT_LINES else 0
-                log_end = i + 1
-                lastlogs = ''.join(logs[log_start:log_end])
-                # try to identify the type of failure.
-                for key, rgx in JOB_FAILURE_TYPES.iteritems():
-                    if re.search(rgx, lastlogs):
-                        failure_type = key
-                        break
-            if failure_type is not None:
-                break
-        if failure_type is None:
-            failure_type = 'unknown'
-        failure_stats.setdefault(failure_type, set())
-        failure_stats[failure_type].add(job_uuid)
-
-    # Report percentages of successful, failed and unfinished jobs.
-    print "Start: {:20s}".format(start_time)
-    print "End:   {:20s}".format(end_time)
-    print ""
-
-    print "Overview"
-    print ""
-
-    job_start_count = len(jobs_created)
-    print "  {: <25s} {:4d}".format('Started', job_start_count)
-    for state in ['Complete', 'Failed', 'Queued', 'Cancelled', 'Running']:
-        if state in jobs_by_state:
-            job_count = len(jobs_by_state[state])
-            job_percentage = job_count / float(job_start_count)
-            print "  {: <25s} {:4d} ({: >4.0%})".format(state,
-                                                        job_count,
-                                                        job_percentage)
-    print ""
-
-    # Report failure types.
-    failure_summary = ""
-    failure_detail = ""
-
-    # Generate a mapping from failed job uuids to job records, to assist
-    # in generating detailed statistics for job failures.
-    jobs_failed_map = { job['uuid']: job for job in jobs_by_state.get('Failed', []) }
-
-    # sort the failure stats in descending order by occurrence.
-    sorted_failures = sorted(failure_stats,
-                             reverse=True,
-                             key=lambda failure_type: len(failure_stats[failure_type]))
-    for failtype in sorted_failures:
-        job_uuids = failure_stats[failtype]
-        failstat = "  {: <25s} {:4d} ({: >4.0%})\n".format(
-            failtype,
-            len(job_uuids),
-            len(job_uuids) / float(len(jobs_by_state['Failed'])))
-        failure_summary = failure_summary + failstat
-        failure_detail = failure_detail + failstat
-        for j in job_uuids:
-            job_info = jobs_failed_map[j]
-            job_owner = job_user_name(api, job_info['modified_by_user_uuid'])
-            job_name = job_pipeline_name(api, job_info['uuid'])
-            failure_detail = failure_detail + "    {}  {: <15.15s}  {:29.29s}\n".format(j, job_owner, job_name)
-        failure_detail = failure_detail + "\n"
-
-    print "Failures by class"
-    print ""
-    print failure_summary
-
-    print "Failures by class (detail)"
-    print ""
-    print failure_detail
-
-    return 0
-
-
-if __name__ == "__main__":
-    sys.exit(main())
diff --git a/services/api/script/fail-jobs.rb b/services/api/script/fail-jobs.rb
deleted file mode 100755 (executable)
index e52bfc0..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/usr/bin/env ruby
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-require 'optimist'
-
-opts = Optimist::options do
-  banner 'Fail jobs that have state=="Running".'
-  banner 'Options:'
-  opt(:before,
-      'fail only jobs that started before the given time (or "reboot")',
-      type: :string)
-end
-
-ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development"
-require File.dirname(__FILE__) + '/../config/boot'
-require File.dirname(__FILE__) + '/../config/environment'
-require Rails.root.join('lib/crunch_dispatch.rb')
-
-CrunchDispatch.new.fail_jobs before: opts[:before]