+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-
-arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-gatk_path = arvados.util.tarball_extract(
- tarball = this_job['script_parameters']['gatk_binary_tarball'],
- path = 'gatk')
-bundle_path = arvados.util.collection_extract(
- collection = this_job['script_parameters']['gatk_bundle'],
- path = 'gatk-bundle',
- files = ['human_g1k_v37.dict', 'human_g1k_v37.fasta', 'human_g1k_v37.fasta.fai'])
-this_task_input = this_task['parameters']['input']
-
-input_file = list(arvados.CollectionReader(this_task_input).all_files())[0]
-
-# choose vcf temporary file names
-vcf_in = os.path.join(arvados.current_task().tmpdir,
- os.path.basename(input_file.name()))
-vcf_out = re.sub('(.*)\\.vcf', '\\1-filtered.vcf', vcf_in)
-
-# fetch the unfiltered data
-vcf_in_file = open(vcf_in, 'w')
-for buf in input_file.readall():
- vcf_in_file.write(buf)
-vcf_in_file.close()
-
-stdoutdata, stderrdata = arvados.util.run_command(
- ['java', '-Xmx1g',
- '-jar', os.path.join(gatk_path,'GenomeAnalysisTK.jar'),
- '-T', 'VariantFiltration', '--variant', vcf_in,
- '--out', vcf_out,
- '--filterExpression', 'QD < 2.0',
- '--filterName', 'GATK_QD',
- '--filterExpression', 'MQ < 40.0',
- '--filterName', 'GATK_MQ',
- '--filterExpression', 'FS > 60.0',
- '--filterName', 'GATK_FS',
- '--filterExpression', 'MQRankSum < -12.5',
- '--filterName', 'GATK_MQRankSum',
- '--filterExpression', 'ReadPosRankSum < -8.0',
- '--filterName', 'GATK_ReadPosRankSum',
- '-R', os.path.join(bundle_path, 'human_g1k_v37.fasta')],
- cwd=arvados.current_task().tmpdir)
-
-# store the filtered data
-with open(vcf_out, 'rb') as f:
- out = arvados.CollectionWriter()
- while True:
- buf = f.read()
- if len(buf) == 0:
- break
- out.write(buf)
-out.set_current_file_name(os.path.basename(vcf_out))
-
-this_task.set_output(out.finish())
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import os
-import re
-import arvados
-import arvados_gatk2
-import arvados_samtools
-from arvados_ipc import *
-
-class InvalidArgumentError(Exception):
- pass
-
-arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-tmpdir = arvados.current_task().tmpdir
-arvados.util.clear_tmpdir()
-
-known_sites_files = arvados.getjobparam(
- 'known_sites',
- ['dbsnp_137.b37.vcf',
- 'Mills_and_1000G_gold_standard.indels.b37.vcf',
- ])
-bundle_dir = arvados.util.collection_extract(
- collection = this_job['script_parameters']['gatk_bundle'],
- files = [
- 'human_g1k_v37.dict',
- 'human_g1k_v37.fasta',
- 'human_g1k_v37.fasta.fai'
- ] + known_sites_files + [v + '.idx' for v in known_sites_files],
- path = 'gatk_bundle')
-ref_fasta_files = [os.path.join(bundle_dir, f)
- for f in os.listdir(bundle_dir)
- if re.search(r'\.fasta(\.gz)?$', f)]
-
-input_collection = this_task['parameters']['input']
-input_dir = arvados.util.collection_extract(
- collection = input_collection,
- path = os.path.join(this_task.tmpdir, 'input'))
-input_bam_files = []
-for f in arvados.util.listdir_recursive(input_dir):
- if re.search(r'\.bam$', f):
- input_stream_name, input_file_name = os.path.split(f)
- input_bam_files += [os.path.join(input_dir, f)]
-if len(input_bam_files) != 1:
- raise InvalidArgumentError("Expected exactly one bam file per task.")
-
-known_sites_args = []
-for f in known_sites_files:
- known_sites_args += ['-knownSites', os.path.join(bundle_dir, f)]
-
-recal_file = os.path.join(tmpdir, 'recal.csv')
-
-children = {}
-pipes = {}
-
-arvados_gatk2.run(
- args=[
- '-nct', arvados_gatk2.cpus_on_this_node(),
- '-T', 'BaseRecalibrator',
- '-R', ref_fasta_files[0],
- '-I', input_bam_files[0],
- '-o', recal_file,
- ] + known_sites_args)
-
-pipe_setup(pipes, 'BQSR')
-if 0 == named_fork(children, 'BQSR'):
- pipe_closeallbut(pipes, ('BQSR', 'w'))
- arvados_gatk2.run(
- args=[
- '-T', 'PrintReads',
- '-R', ref_fasta_files[0],
- '-I', input_bam_files[0],
- '-o', '/dev/fd/' + str(pipes['BQSR','w']),
- '-BQSR', recal_file,
- '--disable_bam_indexing',
- ],
- close_fds=False)
- os._exit(0)
-os.close(pipes.pop(('BQSR','w'), None))
-
-out = arvados.CollectionWriter()
-out.start_new_stream(input_stream_name)
-
-out.start_new_file(input_file_name + '.recal.csv')
-out.write(open(recal_file, 'rb'))
-
-out.start_new_file(input_file_name)
-while True:
- buf = os.read(pipes['BQSR','r'], 2**20)
- if len(buf) == 0:
- break
- out.write(buf)
-pipe_closeallbut(pipes)
-
-if waitpid_and_check_children(children):
- this_task.set_output(out.finish())
-else:
- sys.exit(1)
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import os
-import re
-import string
-import threading
-import arvados
-import arvados_gatk2
-import arvados_picard
-from arvados_ipc import *
-
-class InvalidArgumentError(Exception):
- pass
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-tmpdir = arvados.current_task().tmpdir
-arvados.util.clear_tmpdir()
-
-bundle_dir = arvados.util.collection_extract(
- collection = this_job['script_parameters']['gatk_bundle'],
- files = [
- 'human_g1k_v37.dict',
- 'human_g1k_v37.fasta',
- 'human_g1k_v37.fasta.fai',
- 'dbsnp_137.b37.vcf',
- 'dbsnp_137.b37.vcf.idx',
- ],
- path = 'gatk_bundle')
-ref_fasta_files = [os.path.join(bundle_dir, f)
- for f in os.listdir(bundle_dir)
- if re.search(r'\.fasta(\.gz)?$', f)]
-regions_args = []
-if 'regions' in this_job['script_parameters']:
- regions_dir = arvados.util.collection_extract(
- collection = this_job['script_parameters']['regions'],
- path = 'regions')
- region_padding = int(this_job['script_parameters']['region_padding'])
- for f in os.listdir(regions_dir):
- if re.search(r'\.bed$', f):
- regions_args += [
- '--intervals', os.path.join(regions_dir, f),
- '--interval_padding', str(region_padding)
- ]
-
-
-# Start a child process for each input file, feeding data to picard.
-
-input_child_names = []
-children = {}
-pipes = {}
-
-input_collection = this_job['script_parameters']['input']
-input_index = 0
-for s in arvados.CollectionReader(input_collection).all_streams():
- for f in s.all_files():
- if not re.search(r'\.bam$', f.name()):
- continue
- input_index += 1
- childname = 'input-' + str(input_index)
- input_child_names += [childname]
- pipe_setup(pipes, childname)
- childpid = named_fork(children, childname)
- if childpid == 0:
- pipe_closeallbut(pipes, (childname, 'w'))
- for s in f.readall():
- os.write(pipes[childname, 'w'], s)
- os.close(pipes[childname, 'w'])
- os._exit(0)
- sys.stderr.write("pid %d writing %s to fd %d->%d\n" %
- (childpid,
- s.name()+'/'+f.name(),
- pipes[childname, 'w'],
- pipes[childname, 'r']))
- pipe_closeallbut(pipes, *[(childname, 'r')
- for childname in input_child_names])
-
-
-# Merge-sort the input files to merge.bam
-
-arvados_picard.run(
- 'MergeSamFiles',
- args=[
- 'I=/dev/fd/' + str(pipes[childname, 'r'])
- for childname in input_child_names
- ],
- params={
- 'o': 'merge.bam',
- 'quiet': 'true',
- 'so': 'coordinate',
- 'use_threading': 'true',
- 'create_index': 'true',
- 'validation_stringency': 'LENIENT',
- },
- close_fds=False,
- )
-pipe_closeallbut(pipes)
-
-
-# Run CoverageBySample on merge.bam
-
-pipe_setup(pipes, 'stats_log')
-pipe_setup(pipes, 'stats_out')
-if 0 == named_fork(children, 'GATK'):
- pipe_closeallbut(pipes,
- ('stats_log', 'w'),
- ('stats_out', 'w'))
- arvados_gatk2.run(
- args=[
- '-T', 'CoverageBySample',
- '-R', ref_fasta_files[0],
- '-I', 'merge.bam',
- '-o', '/dev/fd/' + str(pipes['stats_out', 'w']),
- '--log_to_file', '/dev/fd/' + str(pipes['stats_log', 'w']),
- ]
- + regions_args,
- close_fds=False)
- pipe_closeallbut(pipes)
- os._exit(0)
-pipe_closeallbut(pipes, ('stats_log', 'r'), ('stats_out', 'r'))
-
-
-# Start two threads to read from CoverageBySample pipes
-
-class ExceptionPropagatingThread(threading.Thread):
- """
- If a subclassed thread calls _raise(e) in run(), running join() on
- the thread will raise e in the thread that calls join().
- """
- def __init__(self, *args, **kwargs):
- super(ExceptionPropagatingThread, self).__init__(*args, **kwargs)
- self.__exception = None
- def join(self, *args, **kwargs):
- ret = super(ExceptionPropagatingThread, self).join(*args, **kwargs)
- if self.__exception:
- raise self.__exception
- return ret
- def _raise(self, exception):
- self.__exception = exception
-
-class StatsLogReader(ExceptionPropagatingThread):
- def __init__(self, **kwargs):
- super(StatsLogReader, self).__init__()
- self.args = kwargs
- def run(self):
- try:
- for logline in self.args['infile']:
- x = re.search('Processing (\d+) bp from intervals', logline)
- if x:
- self._total_bp = int(x.group(1))
- except Exception as e:
- self._raise(e)
- def total_bp(self):
- self.join()
- return self._total_bp
-stats_log_thr = StatsLogReader(infile=os.fdopen(pipes.pop(('stats_log', 'r'))))
-stats_log_thr.start()
-
-class StatsOutReader(ExceptionPropagatingThread):
- """
- Read output of CoverageBySample and collect a histogram of
- coverage (last column) -> number of loci (number of rows).
- """
- def __init__(self, **kwargs):
- super(StatsOutReader, self).__init__()
- self.args = kwargs
- def run(self):
- try:
- hist = [0]
- histtot = 0
- for line in self.args['infile']:
- try:
- i = int(string.split(line)[-1])
- except ValueError:
- continue
- if i >= 1:
- if len(hist) <= i:
- hist.extend([0 for x in range(1+i-len(hist))])
- hist[i] += 1
- histtot += 1
- hist[0] = stats_log_thr.total_bp() - histtot
- self._histogram = hist
- except Exception as e:
- self._raise(e)
- def histogram(self):
- self.join()
- return self._histogram
-stats_out_thr = StatsOutReader(infile=os.fdopen(pipes.pop(('stats_out', 'r'))))
-stats_out_thr.start()
-
-
-# Run UnifiedGenotyper on merge.bam
-
-arvados_gatk2.run(
- args=[
- '-nt', arvados_gatk2.cpus_on_this_node(),
- '-T', 'UnifiedGenotyper',
- '-R', ref_fasta_files[0],
- '-I', 'merge.bam',
- '-o', os.path.join(tmpdir, 'out.vcf'),
- '--dbsnp', os.path.join(bundle_dir, 'dbsnp_137.b37.vcf'),
- '-metrics', 'UniGenMetrics',
- '-A', 'DepthOfCoverage',
- '-A', 'AlleleBalance',
- '-A', 'QualByDepth',
- '-A', 'HaplotypeScore',
- '-A', 'MappingQualityRankSumTest',
- '-A', 'ReadPosRankSumTest',
- '-A', 'FisherStrand',
- '-glm', 'both',
- ]
- + regions_args
- + arvados.getjobparam('GATK2_UnifiedGenotyper_args',[]))
-
-# Copy the output VCF file to Keep
-
-out = arvados.CollectionWriter()
-out.start_new_stream()
-out.start_new_file('out.vcf')
-out.write(open(os.path.join(tmpdir, 'out.vcf'), 'rb'))
-
-
-# Write statistics to Keep
-
-out.start_new_file('mincoverage_nlocus.csv')
-sofar = 0
-hist = stats_out_thr.histogram()
-total_bp = stats_log_thr.total_bp()
-for i in range(len(hist)):
- out.write("%d,%d,%f\n" %
- (i,
- total_bp - sofar,
- 100.0 * (total_bp - sofar) / total_bp))
- sofar += hist[i]
-
-if waitpid_and_check_children(children):
- this_task.set_output(out.finish())
-else:
- sys.exit(1)
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import os
-import re
-import arvados
-import arvados_gatk2
-import arvados_picard
-import arvados_samtools
-from arvados_ipc import *
-
-class InvalidArgumentError(Exception):
- pass
-
-arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-tmpdir = arvados.current_task().tmpdir
-arvados.util.clear_tmpdir()
-
-known_sites_files = arvados.getjobparam(
- 'known_sites',
- ['dbsnp_137.b37.vcf',
- 'Mills_and_1000G_gold_standard.indels.b37.vcf',
- ])
-bundle_dir = arvados.util.collection_extract(
- collection = this_job['script_parameters']['gatk_bundle'],
- files = [
- 'human_g1k_v37.dict',
- 'human_g1k_v37.fasta',
- 'human_g1k_v37.fasta.fai'
- ] + known_sites_files + [v + '.idx' for v in known_sites_files],
- path = 'gatk_bundle')
-ref_fasta_files = [os.path.join(bundle_dir, f)
- for f in os.listdir(bundle_dir)
- if re.search(r'\.fasta(\.gz)?$', f)]
-regions_args = []
-if 'regions' in this_job['script_parameters']:
- regions_dir = arvados.util.collection_extract(
- collection = this_job['script_parameters']['regions'],
- path = 'regions')
- region_padding = int(this_job['script_parameters']['region_padding'])
- for f in os.listdir(regions_dir):
- if re.search(r'\.bed$', f):
- regions_args += [
- '--intervals', os.path.join(regions_dir, f),
- '--interval_padding', str(region_padding)
- ]
-
-input_collection = this_task['parameters']['input']
-input_dir = arvados.util.collection_extract(
- collection = input_collection,
- path = os.path.join(this_task.tmpdir, 'input'))
-input_bam_files = []
-for f in arvados.util.listdir_recursive(input_dir):
- if re.search(r'\.bam$', f):
- input_stream_name, input_file_name = os.path.split(f)
- input_bam_files += [os.path.join(input_dir, f)]
-if len(input_bam_files) != 1:
- raise InvalidArgumentError("Expected exactly one bam file per task.")
-
-known_sites_args = []
-for f in known_sites_files:
- known_sites_args += ['-known', os.path.join(bundle_dir, f)]
-
-children = {}
-pipes = {}
-
-arvados_gatk2.run(
- args=[
- '-nt', arvados_gatk2.cpus_per_task(),
- '-T', 'RealignerTargetCreator',
- '-R', ref_fasta_files[0],
- '-I', input_bam_files[0],
- '-o', os.path.join(tmpdir, 'intervals.list')
- ] + known_sites_args + regions_args)
-
-pipe_setup(pipes, 'IndelRealigner')
-if 0 == named_fork(children, 'IndelRealigner'):
- pipe_closeallbut(pipes, ('IndelRealigner', 'w'))
- arvados_gatk2.run(
- args=[
- '-T', 'IndelRealigner',
- '-R', ref_fasta_files[0],
- '-targetIntervals', os.path.join(tmpdir, 'intervals.list'),
- '-I', input_bam_files[0],
- '-o', '/dev/fd/' + str(pipes['IndelRealigner','w']),
- '--disable_bam_indexing',
- ] + known_sites_args + regions_args,
- close_fds=False)
- os._exit(0)
-os.close(pipes.pop(('IndelRealigner','w'), None))
-
-pipe_setup(pipes, 'bammanifest')
-pipe_setup(pipes, 'bam')
-if 0==named_fork(children, 'bammanifest'):
- pipe_closeallbut(pipes,
- ('IndelRealigner', 'r'),
- ('bammanifest', 'w'),
- ('bam', 'w'))
- out = arvados.CollectionWriter()
- out.start_new_stream(input_stream_name)
- out.start_new_file(input_file_name)
- while True:
- buf = os.read(pipes['IndelRealigner','r'], 2**20)
- if len(buf) == 0:
- break
- os.write(pipes['bam','w'], buf)
- out.write(buf)
- os.write(pipes['bammanifest','w'], out.manifest_text())
- os.close(pipes['bammanifest','w'])
- os._exit(0)
-
-pipe_setup(pipes, 'index')
-if 0==named_fork(children, 'index'):
- pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
- arvados_picard.run(
- 'BuildBamIndex',
- params={
- 'i': '/dev/fd/' + str(pipes['bam','r']),
- 'o': '/dev/fd/' + str(pipes['index','w']),
- 'quiet': 'true',
- 'validation_stringency': 'LENIENT'
- },
- close_fds=False)
- os._exit(0)
-
-pipe_setup(pipes, 'indexmanifest')
-if 0==named_fork(children, 'indexmanifest'):
- pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
- out = arvados.CollectionWriter()
- out.start_new_stream(input_stream_name)
- out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
- while True:
- buf = os.read(pipes['index','r'], 2**20)
- if len(buf) == 0:
- break
- out.write(buf)
- os.write(pipes['indexmanifest','w'], out.manifest_text())
- os.close(pipes['indexmanifest','w'])
- os._exit(0)
-
-pipe_closeallbut(pipes, ('bammanifest', 'r'), ('indexmanifest', 'r'))
-outmanifest = ''
-for which in ['bammanifest', 'indexmanifest']:
- with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
- while True:
- buf = f.read()
- if buf == '':
- break
- outmanifest += buf
-
-all_ok = True
-for (childname, pid) in children.items():
- all_ok = all_ok and waitpid_and_check_exit(pid, childname)
-
-if all_ok:
- this_task.set_output(outmanifest)
-else:
- sys.exit(1)
+++ /dev/null
-#!/usr/bin/python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import subprocess
-import crunchutil.subst as subst
-import shutil
-import os
-import sys
-import time
-
-if len(arvados.current_task()['parameters']) > 0:
- p = arvados.current_task()['parameters']
-else:
- p = arvados.current_job()['script_parameters']
-
-t = arvados.current_task().tmpdir
-
-os.unlink("/usr/local/share/bcbio-nextgen/galaxy")
-os.mkdir("/usr/local/share/bcbio-nextgen/galaxy")
-shutil.copy("/usr/local/share/bcbio-nextgen/config/bcbio_system.yaml", "/usr/local/share/bcbio-nextgen/galaxy")
-
-with open("/usr/local/share/bcbio-nextgen/galaxy/tool_data_table_conf.xml", "w") as f:
- f.write('''<tables>
- <!-- Locations of indexes in the BWA mapper format -->
- <table name="bwa_indexes" comment_char="#">
- <columns>value, dbkey, name, path</columns>
- <file path="tool-data/bwa_index.loc" />
- </table>
- <!-- Locations of indexes in the Bowtie2 mapper format -->
- <table name="bowtie2_indexes" comment_char="#">
- <columns>value, dbkey, name, path</columns>
- <file path="tool-data/bowtie2_indices.loc" />
- </table>
- <!-- Locations of indexes in the Bowtie2 mapper format for TopHat2 to use -->
- <table name="tophat2_indexes" comment_char="#">
- <columns>value, dbkey, name, path</columns>
- <file path="tool-data/bowtie2_indices.loc" />
- </table>
- <!-- Location of SAMTools indexes and other files -->
- <table name="sam_fa_indexes" comment_char="#">
- <columns>index, value, path</columns>
- <file path="tool-data/sam_fa_indices.loc" />
- </table>
- <!-- Location of Picard dict file and other files -->
- <table name="picard_indexes" comment_char="#">
- <columns>value, dbkey, name, path</columns>
- <file path="tool-data/picard_index.loc" />
- </table>
- <!-- Location of Picard dict files valid for GATK -->
- <table name="gatk_picard_indexes" comment_char="#">
- <columns>value, dbkey, name, path</columns>
- <file path="tool-data/gatk_sorted_picard_index.loc" />
- </table>
-</tables>
-''')
-
-os.mkdir("/usr/local/share/bcbio-nextgen/galaxy/tool-data")
-
-with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/bowtie2_indices.loc", "w") as f:
- f.write(subst.do_substitution(p, "GRCh37\tGRCh37\tHuman (GRCh37)\t$(dir $(bowtie2_indices))\n"))
-
-with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/bwa_index.loc", "w") as f:
- f.write(subst.do_substitution(p, "GRCh37\tGRCh37\tHuman (GRCh37)\t$(file $(bwa_index))\n"))
-
-with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/gatk_sorted_picard_index.loc", "w") as f:
- f.write(subst.do_substitution(p, "GRCh37\tGRCh37\tHuman (GRCh37)\t$(file $(gatk_sorted_picard_index))\n"))
-
-with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/picard_index.loc", "w") as f:
- f.write(subst.do_substitution(p, "GRCh37\tGRCh37\tHuman (GRCh37)\t$(file $(picard_index))\n"))
-
-with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/sam_fa_indices.loc", "w") as f:
- f.write(subst.do_substitution(p, "index\tGRCh37\t$(file $(sam_fa_indices))\n"))
-
-with open("/tmp/crunch-job/freebayes-variant.yaml", "w") as f:
- f.write('''
-# Template for whole genome Illumina variant calling with FreeBayes
-# This is a GATK-free pipeline without post-alignment BAM pre-processing
-# (recalibration and realignment)
----
-details:
- - analysis: variant2
- genome_build: GRCh37
- # to do multi-sample variant calling, assign samples the same metadata / batch
- # metadata:
- # batch: your-arbitrary-batch-name
- algorithm:
- aligner: bwa
- mark_duplicates: true
- recalibrate: false
- realign: false
- variantcaller: freebayes
- platform: illumina
- quality_format: Standard
- # for targetted projects, set the region
- # variant_regions: /path/to/your.bed
-''')
-
-os.unlink("/usr/local/share/bcbio-nextgen/gemini_data")
-os.symlink(arvados.get_job_param_mount("gemini_data"), "/usr/local/share/bcbio-nextgen/gemini_data")
-
-os.chdir(arvados.current_task().tmpdir)
-
-rcode = subprocess.call(["bcbio_nextgen.py", "--workflow", "template", "/tmp/crunch-job/freebayes-variant.yaml", "project1",
- subst.do_substitution(p, "$(file $(R1))"),
- subst.do_substitution(p, "$(file $(R2))")])
-
-os.chdir("project1/work")
-
-os.symlink("/usr/local/share/bcbio-nextgen/galaxy/tool-data", "tool-data")
-
-rcode = subprocess.call(["bcbio_nextgen.py", "../config/project1.yaml", "-n", os.environ['CRUNCH_NODE_SLOTS']])
-
-print("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
-
-if rcode == 0:
- os.chdir("../final")
-
- print("arvados-bcbio-nextgen: the follow output files will be saved to keep:")
-
- subprocess.call(["find", ".", "-type", "f", "-printf", "arvados-bcbio-nextgen: %12.12s %h/%f\\n"])
-
- print("arvados-bcbio-nextgen: start writing output to keep")
-
- done = False
- api = arvados.api('v1')
- while not done:
- try:
- out = arvados.CollectionWriter()
- out.write_directory_tree(".", max_manifest_depth=0)
- outuuid = out.finish()
- api.job_tasks().update(uuid=arvados.current_task()['uuid'],
- body={
- 'output':outuuid,
- 'success': (rcode == 0),
- 'progress':1.0
- }).execute()
- done = True
- except Exception as e:
- print("arvados-bcbio-nextgen: caught exception: {}".format(e))
- time.sleep(5)
-
-sys.exit(rcode)
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-import os
-import sys
-import fcntl
-import subprocess
-
-bwa_install_path = None
-
-def install_path():
- """
- Extract the bwa source tree, build the bwa binary, and return the
- path to the source tree.
- """
- global bwa_install_path
- if bwa_install_path:
- return bwa_install_path
-
- bwa_install_path = arvados.util.tarball_extract(
- tarball = arvados.current_job()['script_parameters']['bwa_tbz'],
- path = 'bwa')
-
- # build "bwa" binary
- lockfile = open(os.path.split(bwa_install_path)[0] + '.bwa-make.lock',
- 'w')
- fcntl.flock(lockfile, fcntl.LOCK_EX)
- arvados.util.run_command(['make', '-j16'], cwd=bwa_install_path)
- lockfile.close()
-
- return bwa_install_path
-
-def bwa_binary():
- """
- Return the path to the bwa executable.
- """
- return os.path.join(install_path(), 'bwa')
-
-def run(command, command_args, **kwargs):
- """
- Build and run the bwa binary.
-
- command is the bwa module, e.g., "index" or "aln".
-
- command_args is a list of additional command line arguments, e.g.,
- ['-a', 'bwtsw', 'ref.fasta']
-
- It is assumed that we are running in a Crunch job environment, and
- the job's "bwa_tbz" parameter is a collection containing the bwa
- source tree in a .tbz file.
- """
- execargs = [bwa_binary(),
- command]
- execargs += command_args
- sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
- arvados.util.run_command(
- execargs,
- cwd=arvados.current_task().tmpdir,
- stderr=sys.stderr,
- stdin=kwargs.get('stdin', subprocess.PIPE),
- stdout=kwargs.get('stdout', sys.stderr))
-
-def one_task_per_pair_input_file(if_sequence=0, and_end_task=True):
- """
- Queue one task for each pair of fastq files in this job's input
- collection.
-
- Each new task will have two parameters, named "input_1" and
- "input_2", each being a manifest containing a single fastq file.
-
- A matching pair of files in the input collection is assumed to
- have names "x_1.y" and "x_2.y".
-
- Files in the input collection that are not part of a matched pair
- are silently ignored.
-
- if_sequence and and_end_task arguments have the same significance
- as in arvados.job_setup.one_task_per_input_file().
- """
- if if_sequence != arvados.current_task()['sequence']:
- return
- job_input = arvados.current_job()['script_parameters']['input']
- cr = arvados.CollectionReader(job_input)
- all_files = []
- for s in cr.all_streams():
- all_files += list(s.all_files())
- for s in cr.all_streams():
- for left_file in s.all_files():
- left_name = left_file.name()
- right_file = None
- right_name = re.sub(r'(.*_)1\.', '\g<1>2.', left_name)
- if right_name == left_name:
- continue
- for f2 in s.all_files():
- if right_name == f2.name():
- right_file = f2
- if right_file != None:
- new_task_attrs = {
- 'job_uuid': arvados.current_job()['uuid'],
- 'created_by_job_task_uuid': arvados.current_task()['uuid'],
- 'sequence': if_sequence + 1,
- 'parameters': {
- 'input_1':left_file.as_manifest(),
- 'input_2':right_file.as_manifest()
- }
- }
- arvados.api().job_tasks().create(body=new_task_attrs).execute()
- if and_end_task:
- arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'],
- body={'success':True}
- ).execute()
- exit(0)
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-import os
-import sys
-import fcntl
-import subprocess
-
-gatk2_install_path = None
-
-def install_path():
- global gatk2_install_path
- if gatk2_install_path:
- return gatk2_install_path
- gatk2_install_path = arvados.util.tarball_extract(
- tarball = arvados.current_job()['script_parameters']['gatk_tbz'],
- path = 'gatk2')
- return gatk2_install_path
-
-def memory_limit():
- taskspernode = int(os.environ.get('CRUNCH_NODE_SLOTS', '1'))
- with open('/proc/meminfo', 'r') as f:
- ram = int(re.search(r'MemTotal:\s*(\d+)', f.read()).group(1)) / 1024
- if taskspernode > 1:
- ram = ram / taskspernode
- return max(ram-700, 500)
-
-def cpus_on_this_node():
- with open('/proc/cpuinfo', 'r') as cpuinfo:
- return max(int(os.environ.get('SLURM_CPUS_ON_NODE', 1)),
- len(re.findall(r'^processor\s*:\s*\d',
- cpuinfo.read(),
- re.MULTILINE)))
-
-def cpus_per_task():
- return max(1, (cpus_on_this_node()
- / int(os.environ.get('CRUNCH_NODE_SLOTS', 1))))
-
-def run(**kwargs):
- kwargs.setdefault('cwd', arvados.current_task().tmpdir)
- kwargs.setdefault('stdout', sys.stderr)
- execargs = ['java',
- '-Xmx%dm' % memory_limit(),
- '-Djava.io.tmpdir=' + arvados.current_task().tmpdir,
- '-jar', os.path.join(install_path(), 'GenomeAnalysisTK.jar')]
- execargs += [str(arg) for arg in kwargs.pop('args', [])]
- sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
- return arvados.util.run_command(execargs, **kwargs)
-
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import os
-import re
-import sys
-import subprocess
-
-def pipe_setup(pipes, name):
- pipes[name,'r'], pipes[name,'w'] = os.pipe()
-
-def pipe_closeallbut(pipes, *keepus):
- for n,m in pipes.keys():
- if (n,m) not in keepus:
- os.close(pipes.pop((n,m), None))
-
-def named_fork(children, name):
- children[name] = os.fork()
- return children[name]
-
-def waitpid_and_check_children(children):
- """
- Given a dict of childname->pid, wait for each child process to
- finish, and report non-zero exit status on stderr. Return True if
- all children exited 0.
- """
- all_ok = True
- for (childname, pid) in children.items():
- # all_ok must be on RHS here -- we need to call waitpid() on
- # every child, even if all_ok is already False.
- all_ok = waitpid_and_check_exit(pid, childname) and all_ok
- return all_ok
-
-def waitpid_and_check_exit(pid, childname=''):
- """
- Wait for a child process to finish. If it exits non-zero, report
- exit status on stderr (mentioning the given childname) and return
- False. If it exits zero, return True.
- """
- _, childstatus = os.waitpid(pid, 0)
- exitvalue = childstatus >> 8
- signal = childstatus & 127
- dumpedcore = childstatus & 128
- if childstatus != 0:
- sys.stderr.write("%s child %d failed: exit %d signal %d core %s\n"
- % (childname, pid, exitvalue, signal,
- ('y' if dumpedcore else 'n')))
- return False
- return True
-
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-import os
-import sys
-import fcntl
-import subprocess
-
-picard_install_path = None
-
-def install_path():
- global picard_install_path
- if picard_install_path:
- return picard_install_path
- zipball = arvados.current_job()['script_parameters']['picard_zip']
- extracted = arvados.util.zipball_extract(
- zipball = zipball,
- path = 'picard')
- for f in os.listdir(extracted):
- if (re.search(r'^picard-tools-[\d\.]+$', f) and
- os.path.exists(os.path.join(extracted, f, '.'))):
- picard_install_path = os.path.join(extracted, f)
- break
- if not picard_install_path:
- raise Exception("picard-tools-{version} directory not found in %s" %
- zipball)
- return picard_install_path
-
-def run(module, **kwargs):
- kwargs.setdefault('cwd', arvados.current_task().tmpdir)
- execargs = ['java',
- '-Xmx1500m',
- '-Djava.io.tmpdir=' + arvados.current_task().tmpdir,
- '-jar', os.path.join(install_path(), module + '.jar')]
- execargs += [str(arg) for arg in kwargs.pop('args', [])]
- for key, value in kwargs.pop('params', {}).items():
- execargs += [key.upper() + '=' + str(value)]
- sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
- return arvados.util.run_command(execargs, **kwargs)
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-import os
-import sys
-import fcntl
-import subprocess
-
-samtools_path = None
-
-def samtools_install_path():
- """
- Extract the samtools source tree, build the samtools binary, and
- return the path to the source tree.
- """
- global samtools_path
- if samtools_path:
- return samtools_path
- samtools_path = arvados.util.tarball_extract(
- tarball = arvados.current_job()['script_parameters']['samtools_tgz'],
- path = 'samtools')
-
- # build "samtools" binary
- lockfile = open(os.path.split(samtools_path)[0] + '.samtools-make.lock',
- 'w')
- fcntl.flock(lockfile, fcntl.LOCK_EX)
- arvados.util.run_command(['make', '-j16'], cwd=samtools_path)
- lockfile.close()
-
- return samtools_path
-
-def samtools_binary():
- """
- Return the path to the samtools executable.
- """
- return os.path.join(samtools_install_path(), 'samtools')
-
-def run(command, command_args, **kwargs):
- """
- Build and run the samtools binary.
-
- command is the samtools subcommand, e.g., "view" or "sort".
-
- command_args is a list of additional command line arguments, e.g.,
- ['-bt', 'ref_list.txt', '-o', 'aln.bam', 'aln.sam.gz']
-
- It is assumed that we are running in a Crunch job environment, and
- the job's "samtools_tgz" parameter is a collection containing the
- samtools source tree in a .tgz file.
- """
- execargs = [samtools_binary(),
- command]
- execargs += command_args
- sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs)))
- arvados.util.run_command(
- execargs,
- cwd=arvados.current_task().tmpdir,
- stdin=kwargs.get('stdin', subprocess.PIPE),
- stderr=kwargs.get('stderr', sys.stderr),
- stdout=kwargs.get('stdout', sys.stderr))
-
-def one_task_per_bam_file(if_sequence=0, and_end_task=True):
- """
- Queue one task for each bam file in this job's input collection.
-
- Each new task will have an "input" parameter: a manifest
- containing one .bam file and (if available) the corresponding .bai
- index file.
-
- Files in the input collection that are not named *.bam or *.bai
- (as well as *.bai files that do not match any .bam file present)
- are silently ignored.
-
- if_sequence and and_end_task arguments have the same significance
- as in arvados.job_setup.one_task_per_input_file().
- """
- if if_sequence != arvados.current_task()['sequence']:
- return
- job_input = arvados.current_job()['script_parameters']['input']
- cr = arvados.CollectionReader(job_input)
- bam = {}
- bai = {}
- for s in cr.all_streams():
- for f in s.all_files():
- if re.search(r'\.bam$', f.name()):
- bam[s.name(), f.name()] = f
- elif re.search(r'\.bai$', f.name()):
- bai[s.name(), f.name()] = f
- for ((s_name, f_name), bam_f) in bam.items():
- bai_f = bai.get((s_name, re.sub(r'bam$', 'bai', f_name)), None)
- task_input = bam_f.as_manifest()
- if bai_f:
- task_input += bai_f.as_manifest()
- new_task_attrs = {
- 'job_uuid': arvados.current_job()['uuid'],
- 'created_by_job_task_uuid': arvados.current_task()['uuid'],
- 'sequence': if_sequence + 1,
- 'parameters': {
- 'input': task_input
- }
- }
- arvados.api().job_tasks().create(body=new_task_attrs).execute()
- if and_end_task:
- arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'],
- body={'success':True}
- ).execute()
- exit(0)
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import arvados_bwa
-import arvados_samtools
-import os
-import re
-import sys
-import subprocess
-
-arvados_bwa.one_task_per_pair_input_file(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-ref_dir = arvados.util.collection_extract(
- collection = this_job['script_parameters']['reference_index'],
- path = 'reference',
- decompress = False)
-
-ref_basename = None
-for f in os.listdir(ref_dir):
- basename = re.sub(r'\.bwt$', '', f)
- if basename != f:
- ref_basename = os.path.join(ref_dir, basename)
-if ref_basename == None:
- raise Exception("Could not find *.bwt in reference collection.")
-
-tmp_dir = arvados.current_task().tmpdir
-
-class Aligner:
- def input_filename(self):
- for s in arvados.CollectionReader(self.collection).all_streams():
- for f in s.all_files():
- return f.decompressed_name()
- def generate_input(self):
- for s in arvados.CollectionReader(self.collection).all_streams():
- for f in s.all_files():
- for s in f.readall_decompressed():
- yield s
- def aln(self, input_param):
- self.collection = this_task['parameters'][input_param]
- reads_filename = os.path.join(tmp_dir, self.input_filename())
- aln_filename = os.path.join(tmp_dir, self.input_filename() + '.sai')
- reads_pipe_r, reads_pipe_w = os.pipe()
- if os.fork() == 0:
- os.close(reads_pipe_r)
- reads_file = open(reads_filename, 'wb')
- for s in self.generate_input():
- if len(s) != os.write(reads_pipe_w, s):
- raise Exception("short write")
- reads_file.write(s)
- reads_file.close()
- os.close(reads_pipe_w)
- sys.exit(0)
- os.close(reads_pipe_w)
-
- aln_file = open(aln_filename, 'wb')
- bwa_proc = subprocess.Popen(
- [arvados_bwa.bwa_binary(),
- 'aln', '-t', '16',
- ref_basename,
- '-'],
- stdin=os.fdopen(reads_pipe_r, 'rb', 2**20),
- stdout=aln_file)
- aln_file.close()
- return reads_filename, aln_filename
-
-reads_1, alignments_1 = Aligner().aln('input_1')
-reads_2, alignments_2 = Aligner().aln('input_2')
-pid1, exit1 = os.wait()
-pid2, exit2 = os.wait()
-if exit1 != 0 or exit2 != 0:
- raise Exception("bwa aln exited non-zero (0x%x, 0x%x)" % (exit1, exit2))
-
-# output alignments in sam format to pipe
-sam_pipe_r, sam_pipe_w = os.pipe()
-sam_pid = os.fork()
-if sam_pid != 0:
- # parent
- os.close(sam_pipe_w)
-else:
- # child
- os.close(sam_pipe_r)
- arvados_bwa.run('sampe',
- [ref_basename,
- alignments_1, alignments_2,
- reads_1, reads_2],
- stdout=os.fdopen(sam_pipe_w, 'wb', 2**20))
- sys.exit(0)
-
-# convert sam (sam_pipe_r) to bam (bam_pipe_w)
-bam_pipe_r, bam_pipe_w = os.pipe()
-bam_pid = os.fork()
-if bam_pid != 0:
- # parent
- os.close(bam_pipe_w)
- os.close(sam_pipe_r)
-else:
- # child
- os.close(bam_pipe_r)
- arvados_samtools.run('view',
- ['-S', '-b',
- '-'],
- stdin=os.fdopen(sam_pipe_r, 'rb', 2**20),
- stdout=os.fdopen(bam_pipe_w, 'wb', 2**20))
- sys.exit(0)
-
-# copy bam (bam_pipe_r) to Keep
-out_bam_filename = os.path.split(reads_1)[-1] + '.bam'
-out = arvados.CollectionWriter()
-out.start_new_stream()
-out.start_new_file(out_bam_filename)
-out.write(os.fdopen(bam_pipe_r, 'rb', 2**20))
-
-# make sure everyone exited nicely
-pid3, exit3 = os.waitpid(sam_pid, 0)
-if exit3 != 0:
- raise Exception("bwa sampe exited non-zero (0x%x)" % exit3)
-pid4, exit4 = os.waitpid(bam_pid, 0)
-if exit4 != 0:
- raise Exception("samtools view exited non-zero (0x%x)" % exit4)
-
-# proclaim success
-this_task.set_output(out.finish())
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import arvados_bwa
-import os
-import re
-import sys
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-ref_dir = arvados.util.collection_extract(
- collection = this_job['script_parameters']['input'],
- path = 'reference',
- decompress = False)
-
-ref_fasta_files = (os.path.join(ref_dir, f)
- for f in os.listdir(ref_dir)
- if re.search(r'\.fasta(\.gz)?$', f))
-
-# build reference index
-arvados_bwa.run('index',
- ['-a', 'bwtsw'] + list(ref_fasta_files))
-
-# move output files to new empty directory
-out_dir = os.path.join(arvados.current_task().tmpdir, 'out')
-arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr)
-os.mkdir(out_dir)
-for f in os.listdir(ref_dir):
- if re.search(r'\.(amb|ann|bwt|pac|rbwt|rpac|rsa|sa)$', f):
- sys.stderr.write("bwa output: %s (%d)\n" %
- (f, os.stat(os.path.join(ref_dir, f)).st_size))
- os.rename(os.path.join(ref_dir, f),
- os.path.join(out_dir, f))
-
-# store output
-out = arvados.CollectionWriter()
-out.write_directory_tree(out_dir, max_manifest_depth=0)
-this_task.set_output(out.finish())
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-# collection-merge
-#
-# Merge two or more collections together. Can also be used to extract specific
-# files from a collection to produce a new collection.
-#
-# input:
-# An array of collections or collection/file paths in script_parameter["input"]
-#
-# output:
-# A manifest with the collections merged. Duplicate file names will
-# have their contents concatenated in the order that they appear in the input
-# array.
-
-import arvados
-import md5
-import crunchutil.subst as subst
-import subprocess
-import os
-import hashlib
-
-p = arvados.current_job()['script_parameters']
-
-merged = ""
-src = []
-for c in p["input"]:
- c = subst.do_substitution(p, c)
- i = c.find('/')
- if i == -1:
- src.append(c)
- merged += arvados.CollectionReader(c).manifest_text()
- else:
- src.append(c[0:i])
- cr = arvados.CollectionReader(c[0:i])
- j = c.rfind('/')
- stream = c[i+1:j]
- if stream == "":
- stream = "."
- fn = c[(j+1):]
- for s in cr.all_streams():
- if s.name() == stream:
- if fn in s.files():
- merged += s.files()[fn].as_manifest()
-
-arvados.current_task().set_output(merged)
+++ /dev/null
-#!/bin/sh
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-if test -n "$JOB_PARAMETER_CRUNCHRUNNER" ; then
- exec $TASK_KEEPMOUNT/$JOB_PARAMETER_CRUNCHRUNNER
-else
- exec /usr/local/bin/crunchrunner
-fi
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import arvados.commands.put as put
-import os
-import logging
-import time
-
-def machine_progress(bytes_written, bytes_expected):
- return "upload wrote {} total {}\n".format(
- bytes_written, -1 if (bytes_expected is None) else bytes_expected)
-
-class Args(object):
- def __init__(self, fn):
- self.filename = None
- self.paths = [fn]
- self.max_manifest_depth = 0
-
-# Upload to Keep with error recovery.
-# Return a uuid or raise an exception if there are too many failures.
-def upload(source_dir, logger=None):
- if logger is None:
- logger = logging.getLogger("arvados")
-
- source_dir = os.path.abspath(source_dir)
- done = False
- if 'TASK_WORK' in os.environ:
- resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
- else:
- resume_cache = put.ResumeCache(put.ResumeCache.make_path(Args(source_dir)))
- reporter = put.progress_writer(machine_progress)
- bytes_expected = put.expected_bytes_for([source_dir])
- backoff = 1
- outuuid = None
- while not done:
- try:
- out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
- out.do_queued_work()
- out.write_directory_tree(source_dir, max_manifest_depth=0)
- outuuid = out.finish()
- done = True
- except KeyboardInterrupt as e:
- logger.critical("caught interrupt signal 2")
- raise e
- except Exception as e:
- logger.exception("caught exception:")
- backoff *= 2
- if backoff > 256:
- logger.critical("Too many upload failures, giving up")
- raise e
- else:
- logger.warning("Sleeping for %s seconds before trying again" % backoff)
- time.sleep(backoff)
- return outuuid
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import glob
-import os
-import re
-import stat
-
-BACKSLASH_ESCAPE_RE = re.compile(r'\\(.)')
-
-class SubstitutionError(Exception):
- pass
-
-def search(c):
- DEFAULT = 0
- DOLLAR = 1
-
- i = 0
- state = DEFAULT
- start = None
- depth = 0
- while i < len(c):
- if c[i] == '\\':
- i += 1
- elif state == DEFAULT:
- if c[i] == '$':
- state = DOLLAR
- if depth == 0:
- start = i
- elif c[i] == ')':
- if depth == 1:
- return [start, i]
- if depth > 0:
- depth -= 1
- elif state == DOLLAR:
- if c[i] == '(':
- depth += 1
- state = DEFAULT
- i += 1
- if depth != 0:
- raise SubstitutionError("Substitution error, mismatched parentheses {}".format(c))
- return None
-
-def sub_file(v):
- path = os.path.join(os.environ['TASK_KEEPMOUNT'], v)
- st = os.stat(path)
- if st and stat.S_ISREG(st.st_mode):
- return path
- else:
- raise SubstitutionError("$(file {}) is not accessible or is not a regular file".format(path))
-
-def sub_dir(v):
- d = os.path.dirname(v)
- if d == '':
- d = v
- path = os.path.join(os.environ['TASK_KEEPMOUNT'], d)
- st = os.stat(path)
- if st and stat.S_ISDIR(st.st_mode):
- return path
- else:
- raise SubstitutionError("$(dir {}) is not accessible or is not a directory".format(path))
-
-def sub_basename(v):
- return os.path.splitext(os.path.basename(v))[0]
-
-def sub_glob(v):
- l = glob.glob(v)
- if len(l) == 0:
- raise SubstitutionError("$(glob {}) no match found".format(v))
- else:
- return l[0]
-
-default_subs = {"file ": sub_file,
- "dir ": sub_dir,
- "basename ": sub_basename,
- "glob ": sub_glob}
-
-def do_substitution(p, c, subs=default_subs):
- while True:
- m = search(c)
- if m is None:
- return BACKSLASH_ESCAPE_RE.sub(r'\1', c)
-
- v = do_substitution(p, c[m[0]+2 : m[1]])
- var = True
- for sub in subs:
- if v.startswith(sub):
- r = subs[sub](v[len(sub):])
- var = False
- break
- if var:
- if v in p:
- r = p[v]
- else:
- raise SubstitutionError("Unknown variable or function '%s' while performing substitution on '%s'" % (v, c))
- if r is None:
- raise SubstitutionError("Substitution for '%s' is null while performing substitution on '%s'" % (v, c))
- if not isinstance(r, basestring):
- raise SubstitutionError("Substitution for '%s' must be a string while performing substitution on '%s'" % (v, c))
-
- c = c[:m[0]] + r + c[m[1]+1:]
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import stat
-import arvados.commands.run
-import logging
-
-# Implements "Virtual Working Directory"
-# Provides a way of emulating a shared writable directory in Keep based
-# on a "check out, edit, check in, merge" model.
-# At the moment, this only permits adding new files, applications
-# cannot modify or delete existing files.
-
-# Create a symlink tree rooted at target_dir mirroring arv-mounted
-# source_collection. target_dir must be empty, and will be created if it
-# doesn't exist.
-def checkout(source_collection, target_dir, keepmount=None):
- # create symlinks
- if keepmount is None:
- keepmount = os.environ['TASK_KEEPMOUNT']
-
- if not os.path.exists(target_dir):
- os.makedirs(target_dir)
-
- l = os.listdir(target_dir)
- if len(l) > 0:
- raise Exception("target_dir must be empty before checkout, contains %s" % l)
-
- stem = os.path.join(keepmount, source_collection)
- for root, dirs, files in os.walk(os.path.join(keepmount, source_collection), topdown=True):
- rel = root[len(stem)+1:]
- for d in dirs:
- os.mkdir(os.path.join(target_dir, rel, d))
- for f in files:
- os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f))
-
-def checkin(target_dir):
- """Write files in `target_dir` to Keep.
-
- Regular files or symlinks to files outside the keep mount are written to
- Keep as normal files (Keep does not support symlinks).
-
- Symlinks to files in the keep mount will result in files in the new
- collection which reference existing Keep blocks, no data copying necessary.
-
- Returns a new Collection object, with data flushed but the collection record
- not saved to the API.
-
- """
-
- outputcollection = arvados.collection.Collection(num_retries=5)
-
- if target_dir[-1:] != '/':
- target_dir += '/'
-
- collections = {}
-
- logger = logging.getLogger("arvados")
-
- last_error = None
- for root, dirs, files in os.walk(target_dir):
- for f in files:
- try:
- s = os.lstat(os.path.join(root, f))
-
- writeIt = False
-
- if stat.S_ISREG(s.st_mode):
- writeIt = True
- elif stat.S_ISLNK(s.st_mode):
- # 1. check if it is a link into a collection
- real = os.path.split(os.path.realpath(os.path.join(root, f)))
- (pdh, branch) = arvados.commands.run.is_in_collection(real[0], real[1])
- if pdh is not None:
- # 2. load collection
- if pdh not in collections:
- # 2.1 make sure it is flushed (see #5787 note 11)
- fd = os.open(real[0], os.O_RDONLY)
- os.fsync(fd)
- os.close(fd)
-
- # 2.2 get collection from API server
- collections[pdh] = arvados.collection.CollectionReader(pdh,
- api_client=outputcollection._my_api(),
- keep_client=outputcollection._my_keep(),
- num_retries=5)
- # 3. copy arvfile to new collection
- outputcollection.copy(branch, os.path.join(root[len(target_dir):], f), source_collection=collections[pdh])
- else:
- writeIt = True
-
- if writeIt:
- reldir = root[len(target_dir):]
- with outputcollection.open(os.path.join(reldir, f), "wb") as writer:
- with open(os.path.join(root, f), "rb") as reader:
- dat = reader.read(64*1024)
- while dat:
- writer.write(dat)
- dat = reader.read(64*1024)
- except (IOError, OSError) as e:
- logger.error(e)
- last_error = e
-
- return (outputcollection, last_error)
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-# Crunch script integration for running arvados-cwl-runner inside a crunch job.
-
-import arvados_cwl
-import sys
-
-try:
- # Use the crunch script defined in the arvados_cwl package. This helps
- # prevent the crunch script from going out of sync with the rest of the
- # arvados_cwl package.
- import arvados_cwl.crunch_script
- arvados_cwl.crunch_script.run()
- sys.exit()
-except ImportError:
- pass
-
-# When running against an older arvados-cwl-runner package without
-# arvados_cwl.crunch_script, fall back to the old code.
-
-
-# This gets the job record, transforms the script parameters into a valid CWL
-# input object, then executes the CWL runner to run the underlying workflow or
-# tool. When the workflow completes, record the output object in an output
-# collection for this runner job.
-
-import arvados
-import arvados.collection
-import arvados.util
-import cwltool.main
-import logging
-import os
-import json
-import argparse
-import re
-import functools
-
-from arvados.api import OrderedJsonModel
-from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
-from cwltool.load_tool import load_tool
-
-# Print package versions
-logging.info(cwltool.main.versionstring())
-
-api = arvados.api("v1")
-
-try:
- job_order_object = arvados.current_job()['script_parameters']
-
- pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
-
- def keeppath(v):
- if pdh_path.match(v):
- return "keep:%s" % v
- else:
- return v
-
- def keeppathObj(v):
- v["location"] = keeppath(v["location"])
-
- job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"])
-
- for k,v in job_order_object.items():
- if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
- job_order_object[k] = {
- "class": "File",
- "location": "keep:%s" % v
- }
-
- adjustFileObjs(job_order_object, keeppathObj)
- adjustDirObjs(job_order_object, keeppathObj)
- normalizeFilesDirs(job_order_object)
- adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
-
- output_name = None
- if "arv:output_name" in job_order_object:
- output_name = job_order_object["arv:output_name"]
- del job_order_object["arv:output_name"]
-
- runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
- output_name=output_name)
-
- t = load_tool(job_order_object, runner.arv_make_tool)
-
- args = argparse.Namespace()
- args.project_uuid = arvados.current_job()["owner_uuid"]
- args.enable_reuse = True
- args.submit = False
- args.debug = True
- args.quiet = False
- args.ignore_docker_for_reuse = False
- args.basedir = os.getcwd()
- args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
- outputObj = runner.arv_executor(t, job_order_object, **vars(args))
-
- if runner.final_output_collection:
- outputCollection = runner.final_output_collection.portable_data_hash()
- else:
- outputCollection = None
-
- api.job_tasks().update(uuid=arvados.current_task()['uuid'],
- body={
- 'output': outputCollection,
- 'success': True,
- 'progress':1.0
- }).execute()
-except Exception as e:
- logging.exception("Unhandled exception")
- api.job_tasks().update(uuid=arvados.current_task()['uuid'],
- body={
- 'output': None,
- 'success': False,
- 'progress':1.0
- }).execute()
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-#
-# decompress-all.py
-#
-# Decompress all compressed files in the collection using the "dtrx" tool and
-# produce a new collection with the contents. Uncompressed files
-# are passed through.
-#
-# input:
-# A collection at script_parameters["input"]
-#
-# output:
-# A manifest of the uncompressed contents of the input collection.
-
-import arvados
-import re
-import subprocess
-import os
-import sys
-import crunchutil.robust_put as robust_put
-
-arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True,
- input_as_path=True)
-
-task = arvados.current_task()
-
-input_file = task['parameters']['input']
-
-infile_parts = re.match(r"(^[a-f0-9]{32}\+\d+)(\+\S+)*(/.*)?(/[^/]+)$", input_file)
-
-outdir = os.path.join(task.tmpdir, "output")
-os.makedirs(outdir)
-os.chdir(outdir)
-
-if infile_parts is None:
- print >>sys.stderr, "Failed to parse input filename '%s' as a Keep file\n" % input_file
- sys.exit(1)
-
-cr = arvados.CollectionReader(infile_parts.group(1))
-streamname = infile_parts.group(3)[1:]
-filename = infile_parts.group(4)[1:]
-
-if streamname is not None:
- subprocess.call(["mkdir", "-p", streamname])
- os.chdir(streamname)
-else:
- streamname = '.'
-
-m = re.match(r'.*\.(gz|Z|bz2|tgz|tbz|zip|rar|7z|cab|deb|rpm|cpio|gem)$', arvados.get_task_param_mount('input'), re.IGNORECASE)
-
-if m is not None:
- rc = subprocess.call(["dtrx", "-r", "-n", "-q", arvados.get_task_param_mount('input')])
- if rc == 0:
- task.set_output(robust_put.upload(outdir))
- else:
- sys.exit(rc)
-else:
- streamreader = filter(lambda s: s.name() == streamname, cr.all_streams())[0]
- filereader = streamreader.files()[filename]
- task.set_output(streamname + filereader.as_manifest()[1:])
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-this_job_input = this_job['script_parameters']['input']
-manifest_text = ""
-for f in arvados.CollectionReader(this_job_input).all_files():
- if f.name() in this_job['script_parameters']['names']:
- manifest_text += f.as_manifest()
-
-this_task.set_output(arvados.Keep.put(manifest_text))
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-
-arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-this_task_input = this_task['parameters']['input']
-pattern = re.compile(this_job['script_parameters']['pattern'])
-
-input_file = list(arvados.CollectionReader(this_task_input).all_files())[0]
-out = arvados.CollectionWriter()
-out.set_current_file_name(input_file.decompressed_name())
-out.set_current_stream_name(input_file.stream_name())
-for line in input_file.readlines():
- if pattern.search(line):
- out.write(line)
-
-this_task.set_output(out.finish())
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import hashlib
-import os
-
-arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-
-if 'algorithm' in this_job['script_parameters']:
- alg = this_job['script_parameters']['algorithm']
-else:
- alg = 'md5'
-digestor = hashlib.new(alg)
-
-input_file = arvados.get_task_param_mount('input')
-
-with open(input_file) as f:
- while True:
- buf = f.read(2**20)
- if len(buf) == 0:
- break
- digestor.update(buf)
-
-hexdigest = digestor.hexdigest()
-
-file_name = '/'.join(this_task['parameters']['input'].split('/')[1:])
-
-out = arvados.CollectionWriter()
-out.set_current_file_name("md5sum.txt")
-out.write("%s %s\n" % (hexdigest, file_name))
-this_task.set_output(out.finish())
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import string
-import json
-import UserDict
-import sys
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-this_job_input = this_job['script_parameters']['input']
-
-out = arvados.CollectionWriter()
-out.set_current_file_name("arvados_objects.json")
-out.write("[\n")
-separator = ""
-
-traits = {}
-done_bytes = 0
-done_ratio = 0
-for input_file in arvados.CollectionReader(this_job_input).all_files():
- for line_number, line in enumerate(input_file.readlines()):
-
- done_bytes += len(line)
- new_done_ratio = 1.0 * done_bytes / input_file.size()
- if line_number == 2 or new_done_ratio - done_ratio > 0.05:
- sys.stderr.write("progress: %d%% after %d lines\n" % (int(done_ratio * 100), line_number+1))
- done_ratio = new_done_ratio
-
- words = string.split(string.strip(line), "\t")
- if line_number == 0:
- headings = words
- for t in arvados.api('v1').traits().list(
- where={'name':words},
- limit=1000
- ).execute()['items']:
- traits[t['name']] = t
- for i, trait_name in enumerate(words[3:], start=3):
- # find or create trait
- if trait_name not in traits:
- traits_match = arvados.api('v1').traits().list(
- where={'name':trait_name}
- ).execute()['items']
- if len(traits_match) > 0:
- traits[trait_name] = traits_match[0]
- else:
- traits[trait_name] = arvados.api('v1').traits().create(
- trait={'name':trait_name}).execute()
- out.write(separator)
- out.write(json.dumps(traits[trait_name]))
- separator = ",\n"
- else:
- huID_links_match = arvados.api('v1').links().list(
- where={'link_class':'identifier','name':words[0]}
- ).execute()['items']
- if len(huID_links_match) > 0:
- human_uuid = huID_links_match[0]['head_uuid']
- else:
- human = arvados.api('v1').humans().create(
- body={}
- ).execute()
- huID_link = arvados.api('v1').links().create(
- body={
- 'link_class':'identifier',
- 'name':words[0],
- 'head_kind':'arvados#human',
- 'head_uuid':human['uuid']
- }
- ).execute()
- human_uuid = human['uuid']
- human_trait = {}
- for t in arvados.api('v1').links().list(
- limit=10000,
- where={
- 'tail_uuid':human_uuid,
- 'tail_kind':'arvados#human',
- 'head_kind':'arvados#trait',
- 'link_class':'human_trait',
- 'name':'pgp-survey-response'
- }
- ).execute()['items']:
- human_trait[t['head_uuid']] = t
- for i, trait_value in enumerate(words[3:], start=3):
- trait_uuid = traits[headings[i]]['uuid']
- if trait_uuid in human_trait:
- trait_link = human_trait[trait_uuid]
- if trait_link['properties']['value'] != trait_value:
- # update database value to match survey response
- trait_link['properties']['value'] = trait_value
- arvados.api('v1').links().update(
- uuid=trait_link['uuid'],
- body={'properties':trait_link['properties']}
- ).execute()
- out.write(",\n")
- out.write(json.dumps(trait_link))
- elif trait_value == '':
- # nothing in database, nothing in input
- pass
- else:
- trait_link = {
- 'tail_uuid':human_uuid,
- 'tail_kind':'arvados#human',
- 'head_uuid':traits[headings[i]]['uuid'],
- 'head_kind':'arvados#trait',
- 'link_class':'human_trait',
- 'name':'pgp-survey-response',
- 'properties': { 'value': trait_value }
- }
- arvados.api('v1').links().create(
- body=trait_link
- ).execute()
- out.write(",\n")
- out.write(json.dumps(trait_link))
-
-out.write("\n]\n")
-this_task.set_output(out.finish())
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-parser_path = arvados.util.git_checkout(
- url = this_job['script_parameters']['parser_url'],
- version = this_job['script_parameters']['parser_version'],
- path = 'parser')
-
-stdoutdata, stderrdata = arvados.util.run_command(
- ["python", "demo.py"],
- cwd=parser_path)
-
-out = arvados.CollectionWriter()
-out.write(stdoutdata)
-out.set_current_file_name('participant_traits.tsv')
-this_task.set_output(out.finish())
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-import sys
-import subprocess
-import arvados_picard
-from arvados_ipc import *
-
-arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-ref_dir = arvados.util.collection_extract(
- collection = this_job['script_parameters']['reference'],
- path = 'reference',
- decompress = True)
-ref_fasta_files = [os.path.join(ref_dir, f)
- for f in os.listdir(ref_dir)
- if re.search(r'\.fasta(\.gz)?$', f)]
-input_collection = this_task['parameters']['input']
-
-for s in arvados.CollectionReader(input_collection).all_streams():
- for f in s.all_files():
- input_stream_name = s.name()
- input_file_name = f.name()
- break
-
-# Unfortunately, picard FixMateInformation cannot read from a pipe. We
-# must copy the input to a temporary file before running picard.
-input_bam_path = os.path.join(this_task.tmpdir, input_file_name)
-with open(input_bam_path, 'wb') as bam:
- for s in arvados.CollectionReader(input_collection).all_streams():
- for f in s.all_files():
- for s in f.readall():
- bam.write(s)
-
-children = {}
-pipes = {}
-
-pipe_setup(pipes, 'fixmate')
-if 0==named_fork(children, 'fixmate'):
- pipe_closeallbut(pipes, ('fixmate', 'w'))
- arvados_picard.run(
- 'FixMateInformation',
- params={
- 'i': input_bam_path,
- 'o': '/dev/stdout',
- 'quiet': 'true',
- 'so': 'coordinate',
- 'validation_stringency': 'LENIENT',
- 'compression_level': 0
- },
- stdout=os.fdopen(pipes['fixmate','w'], 'wb', 2**20))
- os._exit(0)
-os.close(pipes.pop(('fixmate','w'), None))
-
-pipe_setup(pipes, 'sortsam')
-if 0==named_fork(children, 'sortsam'):
- pipe_closeallbut(pipes, ('fixmate', 'r'), ('sortsam', 'w'))
- arvados_picard.run(
- 'SortSam',
- params={
- 'i': '/dev/stdin',
- 'o': '/dev/stdout',
- 'quiet': 'true',
- 'so': 'coordinate',
- 'validation_stringency': 'LENIENT',
- 'compression_level': 0
- },
- stdin=os.fdopen(pipes['fixmate','r'], 'rb', 2**20),
- stdout=os.fdopen(pipes['sortsam','w'], 'wb', 2**20))
- os._exit(0)
-
-pipe_setup(pipes, 'reordersam')
-if 0==named_fork(children, 'reordersam'):
- pipe_closeallbut(pipes, ('sortsam', 'r'), ('reordersam', 'w'))
- arvados_picard.run(
- 'ReorderSam',
- params={
- 'i': '/dev/stdin',
- 'o': '/dev/stdout',
- 'reference': ref_fasta_files[0],
- 'quiet': 'true',
- 'validation_stringency': 'LENIENT',
- 'compression_level': 0
- },
- stdin=os.fdopen(pipes['sortsam','r'], 'rb', 2**20),
- stdout=os.fdopen(pipes['reordersam','w'], 'wb', 2**20))
- os._exit(0)
-
-pipe_setup(pipes, 'addrg')
-if 0==named_fork(children, 'addrg'):
- pipe_closeallbut(pipes, ('reordersam', 'r'), ('addrg', 'w'))
- arvados_picard.run(
- 'AddOrReplaceReadGroups',
- params={
- 'i': '/dev/stdin',
- 'o': '/dev/stdout',
- 'quiet': 'true',
- 'rglb': this_job['script_parameters'].get('rglb', 0),
- 'rgpl': this_job['script_parameters'].get('rgpl', 'illumina'),
- 'rgpu': this_job['script_parameters'].get('rgpu', 0),
- 'rgsm': this_job['script_parameters'].get('rgsm', 0),
- 'validation_stringency': 'LENIENT'
- },
- stdin=os.fdopen(pipes['reordersam','r'], 'rb', 2**20),
- stdout=os.fdopen(pipes['addrg','w'], 'wb', 2**20))
- os._exit(0)
-
-pipe_setup(pipes, 'bammanifest')
-pipe_setup(pipes, 'bam')
-pipe_setup(pipes, 'casm_in')
-if 0==named_fork(children, 'bammanifest'):
- pipe_closeallbut(pipes,
- ('addrg', 'r'),
- ('bammanifest', 'w'),
- ('bam', 'w'),
- ('casm_in', 'w'))
- out = arvados.CollectionWriter()
- out.start_new_stream(input_stream_name)
- out.start_new_file(input_file_name)
- while True:
- buf = os.read(pipes['addrg','r'], 2**20)
- if len(buf) == 0:
- break
- os.write(pipes['bam','w'], buf)
- os.write(pipes['casm_in','w'], buf)
- out.write(buf)
- os.write(pipes['bammanifest','w'], out.manifest_text())
- os.close(pipes['bammanifest','w'])
- os._exit(0)
-
-pipe_setup(pipes, 'casm')
-if 0 == named_fork(children, 'casm'):
- pipe_closeallbut(pipes, ('casm_in', 'r'), ('casm', 'w'))
- arvados_picard.run(
- 'CollectAlignmentSummaryMetrics',
- params={
- 'input': '/dev/fd/' + str(pipes['casm_in','r']),
- 'output': '/dev/fd/' + str(pipes['casm','w']),
- 'reference_sequence': ref_fasta_files[0],
- 'validation_stringency': 'LENIENT',
- },
- close_fds=False)
- os._exit(0)
-
-pipe_setup(pipes, 'index')
-if 0==named_fork(children, 'index'):
- pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w'))
- arvados_picard.run(
- 'BuildBamIndex',
- params={
- 'i': '/dev/stdin',
- 'o': '/dev/stdout',
- 'quiet': 'true',
- 'validation_stringency': 'LENIENT'
- },
- stdin=os.fdopen(pipes['bam','r'], 'rb', 2**20),
- stdout=os.fdopen(pipes['index','w'], 'wb', 2**20))
- os._exit(0)
-
-pipe_setup(pipes, 'indexmanifest')
-if 0==named_fork(children, 'indexmanifest'):
- pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w'))
- out = arvados.CollectionWriter()
- out.start_new_stream(input_stream_name)
- out.start_new_file(re.sub('\.bam$', '.bai', input_file_name))
- while True:
- buf = os.read(pipes['index','r'], 2**20)
- if len(buf) == 0:
- break
- out.write(buf)
- os.write(pipes['indexmanifest','w'], out.manifest_text())
- os.close(pipes['indexmanifest','w'])
- os._exit(0)
-
-pipe_closeallbut(pipes,
- ('bammanifest', 'r'),
- ('indexmanifest', 'r'),
- ('casm', 'r'))
-
-outmanifest = ''
-
-for which in ['bammanifest', 'indexmanifest']:
- with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f:
- while True:
- buf = f.read()
- if buf == '':
- break
- outmanifest += buf
-
-casm_out = arvados.CollectionWriter()
-casm_out.start_new_stream(input_stream_name)
-casm_out.start_new_file(input_file_name + '.casm.tsv')
-casm_out.write(os.fdopen(pipes.pop(('casm','r'))))
-
-outmanifest += casm_out.manifest_text()
-
-all_ok = True
-for (childname, pid) in children.items():
- all_ok = all_ok and waitpid_and_check_exit(pid, childname)
-
-if all_ok:
- this_task.set_output(outmanifest)
-else:
- sys.exit(1)
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-import os
-import sys
-
-rtg_install_path = None
-
-def setup():
- global rtg_install_path
- if rtg_install_path:
- return rtg_install_path
- rtg_path = arvados.util.zipball_extract(
- zipball = arvados.current_job()['script_parameters']['rtg_binary_zip'],
- path = 'rtg')
- rtg_license_path = arvados.util.collection_extract(
- collection = arvados.current_job()['script_parameters']['rtg_license'],
- path = 'license',
- decompress = False)
-
- # symlink to rtg-license.txt
- license_txt_path = os.path.join(rtg_license_path, 'rtg-license.txt')
- try:
- os.symlink(license_txt_path, os.path.join(rtg_path,'rtg-license.txt'))
- except OSError:
- if not os.path.exists(os.path.join(rtg_path,'rtg-license.txt')):
- os.symlink(license_txt_path, os.path.join(rtg_path,'rtg-license.txt'))
-
- rtg_install_path = rtg_path
- return rtg_path
-
-def run_rtg(command, output_dir, command_args, **kwargs):
- global rtg_install_path
- execargs = [os.path.join(rtg_install_path, 'rtg'),
- command,
- '-o', output_dir]
- execargs += command_args
- sys.stderr.write("run_rtg: exec %s\n" % str(execargs))
- arvados.util.run_command(
- execargs,
- cwd=arvados.current_task().tmpdir,
- stderr=sys.stderr,
- stdout=sys.stderr)
-
- # Exit status cannot be trusted in rtg 1.1.1.
- assert_done(output_dir)
-
- # Copy log files to stderr and delete them to avoid storing them
- # in Keep with the output data.
- for dirent in arvados.util.listdir_recursive(output_dir):
- if is_log_file(dirent):
- log_file = os.path.join(output_dir, dirent)
- sys.stderr.write(' '.join(['==>', dirent, '<==\n']))
- with open(log_file, 'rb') as f:
- while True:
- buf = f.read(2**20)
- if len(buf) == 0:
- break
- sys.stderr.write(buf)
- sys.stderr.write('\n') # in case log does not end in newline
- os.unlink(log_file)
-
-def assert_done(output_dir):
- # Sanity-check exit code.
- done_file = os.path.join(output_dir, 'done')
- if not os.path.exists(done_file):
- raise Exception("rtg exited 0 but %s does not exist. abort.\n" % done_file)
-
-def is_log_file(filename):
- return re.search(r'^(.*/)?(progress|done|\S+.log)$', filename)
-
-setup()
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-import sys
-import pyrtg
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-fasta_path = arvados.util.collection_extract(
- collection = this_job['script_parameters']['input'],
- path = 'fasta',
- decompress = False)
-fasta_files = filter(lambda f: f != '.locator', os.listdir(fasta_path))
-out_dir = os.path.join(arvados.current_task().tmpdir, 'ref-sdf')
-arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr)
-
-pyrtg.run_rtg('format', out_dir,
- map(lambda f: os.path.join(fasta_path, f), fasta_files))
-
-out = arvados.CollectionWriter()
-out.write_directory_tree(out_dir, max_manifest_depth=0)
-this_task.set_output(out.finish())
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-import sys
-import pyrtg
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-fastq_path = arvados.util.collection_extract(
- collection = this_job['script_parameters']['input'],
- path = 'fastq')
-fastq_files = filter(lambda f: f != '.locator', os.listdir(fastq_path))
-tmp_dir_base = os.path.join(arvados.current_task().tmpdir, 'tmp')
-out_dir = os.path.join(arvados.current_task().tmpdir, 'reads')
-
-arvados.util.run_command(['rm', '-rf', tmp_dir_base], stderr=sys.stderr)
-arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr)
-os.mkdir(tmp_dir_base)
-
-# convert fastq to sdf
-tmp_dirs = []
-for leftarm in fastq_files:
- if re.search(r'_1.f(ast)?q(.gz)?$', leftarm):
- rightarm = re.sub(r'_1(.f(ast)?q(.gz)?)$', '_2\\1', leftarm)
- if rightarm in fastq_files:
- tmp_dirs += ['%s/%08d' % (tmp_dir_base, len(tmp_dirs))]
- pyrtg.run_rtg('format', tmp_dirs[-1],
- ['-f', 'fastq',
- '-q', 'sanger',
- '-l', os.path.join(fastq_path, leftarm),
- '-r', os.path.join(fastq_path, rightarm)])
-
-# split sdf
-pyrtg.run_rtg('sdfsplit', out_dir,
- ['-n', '1500000'] + tmp_dirs)
-
-# store output
-out = arvados.CollectionWriter()
-out.write_directory_tree(out_dir, max_manifest_depth=1)
-this_task.set_output(out.finish())
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-import sys
-import pyrtg
-
-arvados.job_setup.one_task_per_input_stream(if_sequence=0, and_end_task=True)
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-in_dir = os.path.join(this_task.tmpdir, 'input')
-arvados.util.run_command(['rm', '-rf', in_dir], stderr=sys.stderr)
-in_dir = arvados.util.stream_extract(
- stream = arvados.StreamReader(this_task['parameters']['input']),
- path = in_dir,
- decompress = False)
-ref_dir = arvados.util.collection_extract(
- collection = this_job['script_parameters']['reference'],
- path = 'reference',
- decompress = False)
-
-out_dir = os.path.join(arvados.current_task().tmpdir, 'out')
-arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr)
-
-# map reads
-pyrtg.run_rtg('map', out_dir,
- ['-i', in_dir,
- '-t', ref_dir,
- '-a', '2',
- '-b', '1',
- '--sam-rg', '@RG\\tID:NA\\tSM:NA\\tPL:ILLUMINA'])
-
-# store output
-out = arvados.CollectionWriter()
-out.write_directory_tree(out_dir, this_task['parameters']['input'][0], 0)
-this_task.set_output(out.finish())
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import os
-import re
-import sys
-import pyrtg
-
-this_job = arvados.current_job()
-this_task = arvados.current_task()
-ref_dir = arvados.util.collection_extract(
- collection = this_job['script_parameters']['reference'],
- path = 'reference',
- decompress = False)
-input_dir = arvados.util.collection_extract(
- collection = this_job['script_parameters']['input'],
- path = 'input')
-bam_files = map(lambda f: os.path.join(input_dir, f),
- filter(lambda f: re.search(r'^(.*/)?alignments.bam$', f),
- arvados.util.listdir_recursive(input_dir)))
-out_dir = os.path.join(arvados.current_task().tmpdir, 'out')
-arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr)
-
-# call sequence variants
-pyrtg.run_rtg('snp', out_dir,
- ['-t', ref_dir] + bam_files)
-
-# store output
-out = arvados.CollectionWriter()
-out.write_directory_tree(out_dir, max_manifest_depth=0)
-this_task.set_output(out.finish())
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import logging
-
-logger = logging.getLogger('run-command')
-log_handler = logging.StreamHandler()
-log_handler.setFormatter(logging.Formatter("run-command: %(message)s"))
-logger.addHandler(log_handler)
-logger.setLevel(logging.INFO)
-
-import arvados
-import re
-import os
-import subprocess
-import sys
-import shutil
-import crunchutil.subst as subst
-import time
-import arvados.commands.put as put
-import signal
-import stat
-import copy
-import traceback
-import pprint
-import multiprocessing
-import crunchutil.robust_put as robust_put
-import crunchutil.vwd as vwd
-import argparse
-import json
-import tempfile
-import errno
-
-parser = argparse.ArgumentParser()
-parser.add_argument('--dry-run', action='store_true')
-parser.add_argument('--script-parameters', type=str, default="{}")
-args = parser.parse_args()
-
-os.umask(0077)
-
-if not args.dry_run:
- api = arvados.api('v1')
- t = arvados.current_task().tmpdir
- os.chdir(arvados.current_task().tmpdir)
- os.mkdir("tmpdir")
- os.mkdir("output")
-
- os.chdir("output")
-
- outdir = os.getcwd()
-
- taskp = None
- jobp = arvados.current_job()['script_parameters']
- if len(arvados.current_task()['parameters']) > 0:
- taskp = arvados.current_task()['parameters']
-else:
- outdir = "/tmp"
- jobp = json.loads(args.script_parameters)
- os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde'
- os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde'
- os.environ['CRUNCH_SRC'] = '/tmp/crunch-src'
- if 'TASK_KEEPMOUNT' not in os.environ:
- os.environ['TASK_KEEPMOUNT'] = '/keep'
-
-def sub_tmpdir(v):
- return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
-
-def sub_outdir(v):
- return outdir
-
-def sub_cores(v):
- return str(multiprocessing.cpu_count())
-
-def sub_jobid(v):
- return os.environ['JOB_UUID']
-
-def sub_taskid(v):
- return os.environ['TASK_UUID']
-
-def sub_jobsrc(v):
- return os.environ['CRUNCH_SRC']
-
-subst.default_subs["task.tmpdir"] = sub_tmpdir
-subst.default_subs["task.outdir"] = sub_outdir
-subst.default_subs["job.srcdir"] = sub_jobsrc
-subst.default_subs["node.cores"] = sub_cores
-subst.default_subs["job.uuid"] = sub_jobid
-subst.default_subs["task.uuid"] = sub_taskid
-
-class SigHandler(object):
- def __init__(self):
- self.sig = None
-
- def send_signal(self, subprocesses, signum):
- for sp in subprocesses:
- sp.send_signal(signum)
- self.sig = signum
-
-# http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html
-def flatten(l, ltypes=(list, tuple)):
- ltype = type(l)
- l = list(l)
- i = 0
- while i < len(l):
- while isinstance(l[i], ltypes):
- if not l[i]:
- l.pop(i)
- i -= 1
- break
- else:
- l[i:i + 1] = l[i]
- i += 1
- return ltype(l)
-
-def add_to_group(gr, match):
- m = match.groups()
- if m not in gr:
- gr[m] = []
- gr[m].append(match.group(0))
-
-class EvaluationError(Exception):
- pass
-
-# Return the name of variable ('var') that will take on each value in 'items'
-# when performing an inner substitution
-def var_items(p, c, key):
- if key not in c:
- raise EvaluationError("'%s' was expected in 'p' but is missing" % key)
-
- if "var" in c:
- if not isinstance(c["var"], basestring):
- raise EvaluationError("Value of 'var' must be a string")
- # Var specifies the variable name for inner parameter substitution
- return (c["var"], get_items(p, c[key]))
- else:
- # The component function ('key') value is a list, so return the list
- # directly with no parameter selected.
- if isinstance(c[key], list):
- return (None, get_items(p, c[key]))
- elif isinstance(c[key], basestring):
- # check if c[key] is a string that looks like a parameter
- m = re.match("^\$\((.*)\)$", c[key])
- if m and m.group(1) in p:
- return (m.group(1), get_items(p, c[key]))
- else:
- # backwards compatible, foreach specifies bare parameter name to use
- return (c[key], get_items(p, p[c[key]]))
- else:
- raise EvaluationError("Value of '%s' must be a string or list" % key)
-
-# "p" is the parameter scope, "c" is the item to be expanded.
-# If "c" is a dict, apply function expansion.
-# If "c" is a list, recursively expand each item and return a new list.
-# If "c" is a string, apply parameter substitution
-def expand_item(p, c):
- if isinstance(c, dict):
- if "foreach" in c and "command" in c:
- # Expand a command template for each item in the specified user
- # parameter
- var, items = var_items(p, c, "foreach")
- if var is None:
- raise EvaluationError("Must specify 'var' in foreach")
- r = []
- for i in items:
- params = copy.copy(p)
- params[var] = i
- r.append(expand_item(params, c["command"]))
- return r
- elif "list" in c and "index" in c and "command" in c:
- # extract a single item from a list
- var, items = var_items(p, c, "list")
- if var is None:
- raise EvaluationError("Must specify 'var' in list")
- params = copy.copy(p)
- params[var] = items[int(c["index"])]
- return expand_item(params, c["command"])
- elif "regex" in c:
- pattern = re.compile(c["regex"])
- if "filter" in c:
- # filter list so that it only includes items that match a
- # regular expression
- _, items = var_items(p, c, "filter")
- return [i for i in items if pattern.match(i)]
- elif "group" in c:
- # generate a list of lists, where items are grouped on common
- # subexpression match
- _, items = var_items(p, c, "group")
- groups = {}
- for i in items:
- match = pattern.match(i)
- if match:
- add_to_group(groups, match)
- return [groups[k] for k in groups]
- elif "extract" in c:
- # generate a list of lists, where items are split by
- # subexpression match
- _, items = var_items(p, c, "extract")
- r = []
- for i in items:
- match = pattern.match(i)
- if match:
- r.append(list(match.groups()))
- return r
- elif "batch" in c and "size" in c:
- # generate a list of lists, where items are split into a batch size
- _, items = var_items(p, c, "batch")
- sz = int(c["size"])
- r = []
- for j in xrange(0, len(items), sz):
- r.append(items[j:j+sz])
- return r
- raise EvaluationError("Missing valid list context function")
- elif isinstance(c, list):
- return [expand_item(p, arg) for arg in c]
- elif isinstance(c, basestring):
- m = re.match("^\$\((.*)\)$", c)
- if m and m.group(1) in p:
- return expand_item(p, p[m.group(1)])
- else:
- return subst.do_substitution(p, c)
- else:
- raise EvaluationError("expand_item() unexpected parameter type %s" % type(c))
-
-# Evaluate in a list context
-# "p" is the parameter scope, "value" will be evaluated
-# if "value" is a list after expansion, return that
-# if "value" is a path to a directory, return a list consisting of each entry in the directory
-# if "value" is a path to a file, return a list consisting of each line of the file
-def get_items(p, value):
- value = expand_item(p, value)
- if isinstance(value, list):
- return value
- elif isinstance(value, basestring):
- mode = os.stat(value).st_mode
- prefix = value[len(os.environ['TASK_KEEPMOUNT'])+1:]
- if mode is not None:
- if stat.S_ISDIR(mode):
- items = [os.path.join(value, l) for l in os.listdir(value)]
- elif stat.S_ISREG(mode):
- with open(value) as f:
- items = [line.rstrip("\r\n") for line in f]
- return items
- raise EvaluationError("get_items did not yield a list")
-
-stdoutname = None
-stdoutfile = None
-stdinname = None
-stdinfile = None
-
-# Construct the cross product of all values of each variable listed in fvars
-def recursive_foreach(params, fvars):
- var = fvars[0]
- fvars = fvars[1:]
- items = get_items(params, params[var])
- logger.info("parallelizing on %s with items %s" % (var, items))
- if items is not None:
- for i in items:
- params = copy.copy(params)
- params[var] = i
- if len(fvars) > 0:
- recursive_foreach(params, fvars)
- else:
- if not args.dry_run:
- arvados.api().job_tasks().create(body={
- 'job_uuid': arvados.current_job()['uuid'],
- 'created_by_job_task_uuid': arvados.current_task()['uuid'],
- 'sequence': 1,
- 'parameters': params
- }).execute()
- else:
- if isinstance(params["command"][0], list):
- for c in params["command"]:
- logger.info(flatten(expand_item(params, c)))
- else:
- logger.info(flatten(expand_item(params, params["command"])))
- else:
- logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var]))
- sys.exit(1)
-
-try:
- if "task.foreach" in jobp:
- if args.dry_run or arvados.current_task()['sequence'] == 0:
- # This is the first task to start the other tasks and exit
- fvars = jobp["task.foreach"]
- if isinstance(fvars, basestring):
- fvars = [fvars]
- if not isinstance(fvars, list) or len(fvars) == 0:
- logger.error("value of task.foreach must be a string or non-empty list")
- sys.exit(1)
- recursive_foreach(jobp, jobp["task.foreach"])
- if not args.dry_run:
- if "task.vwd" in jobp:
- # Set output of the first task to the base vwd collection so it
- # will be merged with output fragments from the other tasks by
- # crunch.
- arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
- else:
- arvados.current_task().set_output(None)
- sys.exit(0)
- else:
- # This is the only task so taskp/jobp are the same
- taskp = jobp
-except Exception as e:
- logger.exception("caught exception")
- logger.error("job parameters were:")
- logger.error(pprint.pformat(jobp))
- sys.exit(1)
-
-try:
- if not args.dry_run:
- if "task.vwd" in taskp:
- # Populate output directory with symlinks to files in collection
- vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
-
- if "task.cwd" in taskp:
- os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
-
- cmd = []
- if isinstance(taskp["command"][0], list):
- for c in taskp["command"]:
- cmd.append(flatten(expand_item(taskp, c)))
- else:
- cmd.append(flatten(expand_item(taskp, taskp["command"])))
-
- if "task.stdin" in taskp:
- stdinname = subst.do_substitution(taskp, taskp["task.stdin"])
- if not args.dry_run:
- stdinfile = open(stdinname, "rb")
-
- if "task.stdout" in taskp:
- stdoutname = subst.do_substitution(taskp, taskp["task.stdout"])
- if not args.dry_run:
- stdoutfile = open(stdoutname, "wb")
-
- if "task.env" in taskp:
- env = copy.copy(os.environ)
- for k,v in taskp["task.env"].items():
- env[k] = subst.do_substitution(taskp, v)
- else:
- env = None
-
- logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else ""))
-
- if args.dry_run:
- sys.exit(0)
-except subst.SubstitutionError as e:
- logger.error(str(e))
- logger.error("task parameters were:")
- logger.error(pprint.pformat(taskp))
- sys.exit(1)
-except Exception as e:
- logger.exception("caught exception")
- logger.error("task parameters were:")
- logger.error(pprint.pformat(taskp))
- sys.exit(1)
-
-# rcode holds the return codes produced by each subprocess
-rcode = {}
-try:
- subprocesses = []
- close_streams = []
- if stdinfile:
- close_streams.append(stdinfile)
- next_stdin = stdinfile
-
- for i in xrange(len(cmd)):
- if i == len(cmd)-1:
- # this is the last command in the pipeline, so its stdout should go to stdoutfile
- next_stdout = stdoutfile
- else:
- # this is an intermediate command in the pipeline, so its stdout should go to a pipe
- next_stdout = subprocess.PIPE
-
- sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout, env=env)
-
- # Need to close the FDs on our side so that subcommands will get SIGPIPE if the
- # consuming process ends prematurely.
- if sp.stdout:
- close_streams.append(sp.stdout)
-
- # Send this processes's stdout to to the next process's stdin
- next_stdin = sp.stdout
-
- subprocesses.append(sp)
-
- # File descriptors have been handed off to the subprocesses, so close them here.
- for s in close_streams:
- s.close()
-
- # Set up signal handling
- sig = SigHandler()
-
- # Forward terminate signals to the subprocesses.
- signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum))
- signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum))
- signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum))
-
- active = 1
- pids = set([s.pid for s in subprocesses])
- while len(pids) > 0:
- try:
- (pid, status) = os.wait()
- except OSError as e:
- if e.errno == errno.EINTR:
- pass
- else:
- raise
- else:
- pids.discard(pid)
- if not taskp.get("task.ignore_rcode"):
- rcode[pid] = (status >> 8)
- else:
- rcode[pid] = 0
-
- if sig.sig is not None:
- logger.critical("terminating on signal %s" % sig.sig)
- sys.exit(2)
- else:
- for i in xrange(len(cmd)):
- r = rcode[subprocesses[i].pid]
- logger.info("%s completed with exit code %i (%s)" % (cmd[i][0], r, "success" if r == 0 else "failed"))
-
-except Exception as e:
- logger.exception("caught exception")
-
-# restore default signal handlers.
-signal.signal(signal.SIGINT, signal.SIG_DFL)
-signal.signal(signal.SIGTERM, signal.SIG_DFL)
-signal.signal(signal.SIGQUIT, signal.SIG_DFL)
-
-logger.info("the following output files will be saved to keep:")
-
-subprocess.call(["find", "-L", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir)
-
-logger.info("start writing output to keep")
-
-if "task.vwd" in taskp and "task.foreach" in jobp:
- for root, dirs, files in os.walk(outdir):
- for f in files:
- s = os.lstat(os.path.join(root, f))
- if stat.S_ISLNK(s.st_mode):
- os.unlink(os.path.join(root, f))
-
-(outcollection, checkin_error) = vwd.checkin(outdir)
-
-# Success if we ran any subprocess, and they all exited 0.
-success = rcode and all(status == 0 for status in rcode.itervalues()) and not checkin_error
-
-api.job_tasks().update(uuid=arvados.current_task()['uuid'],
- body={
- 'output': outcollection.manifest_text(),
- 'success': success,
- 'progress':1.0
- }).execute()
-
-sys.exit(0 if success else 1)
+++ /dev/null
-#!/usr/bin/python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import re
-import hashlib
-import string
-
-api = arvados.api('v1')
-
-piece = 0
-manifest_text = ""
-
-# Look for paired reads
-
-inp = arvados.CollectionReader(arvados.getjobparam('reads'))
-
-manifest_list = []
-
-def nextline(reader, start):
- n = -1
- while True:
- r = reader.readfrom(start, 128)
- if r == '':
- break
- n = string.find(r, "\n")
- if n > -1:
- break
- else:
- start += 128
- return n
-
-prog = re.compile(r'(.*?)(_[12])?\.fastq(\.gz)?$')
-
-# Look for fastq files
-for s in inp.all_streams():
- for f in s.all_files():
- name_pieces = prog.match(f.name())
- if name_pieces is not None:
- if s.name() != ".":
- # The downstream tool (run-command) only iterates over the top
- # level of directories so if there are fastq files in
- # directories in the input, the choice is either to forget
- # there are directories (which might lead to name conflicts) or
- # just fail.
- print >>sys.stderr, "fastq must be at the root of the collection"
- sys.exit(1)
-
- p = None
- if name_pieces.group(2) is not None:
- if name_pieces.group(2) == "_1":
- p = [{}, {}]
- p[0]["reader"] = s.files()[name_pieces.group(0)]
- p[1]["reader"] = s.files()[name_pieces.group(1) + "_2.fastq" + (name_pieces.group(3) if name_pieces.group(3) else '')]
- else:
- p = [{}]
- p[0]["reader"] = s.files()[name_pieces.group(0)]
-
- if p is not None:
- for i in xrange(0, len(p)):
- m = p[i]["reader"].as_manifest().split()
- m[0] = "./_" + str(piece)
- manifest_list.append(m)
- piece += 1
-
-manifest_text = "\n".join(" ".join(m) for m in manifest_list) + "\n"
-
-arvados.current_task().set_output(manifest_text)
+++ /dev/null
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import arvados
-import arvados.crunch
-import hashlib
-import os
-
-out = arvados.crunch.TaskOutputDir()
-
-string = open(__file__).read()
-with open(os.path.join(out.path, 'example.out'), 'w') as f:
- f.write(string)
-with open(os.path.join(out.path, 'example.out.SHA1'), 'w') as f:
- f.write(hashlib.sha1(string).hexdigest() + "\n")
-
-arvados.current_task().set_output(out.manifest_text())
class Arvados::V1::JobTasksController < ApplicationController
accept_attribute_as_json :parameters, Hash
+
+ def create
+ return send_error("Unsupported legacy jobs API",
+ status: 400)
+ end
end
include DbCurrentTime
def create
- [:repository, :script, :script_version, :script_parameters].each do |r|
- if !resource_attrs[r]
- return send_error("#{r} attribute must be specified",
- status: :unprocessable_entity)
- end
- end
-
- # We used to ask for the minimum_, exclude_, and no_reuse params
- # in the job resource. Now we advertise them as flags that alter
- # the behavior of the create action.
- [:minimum_script_version, :exclude_script_versions].each do |attr|
- if resource_attrs.has_key? attr
- params[attr] = resource_attrs.delete attr
- end
- end
- if resource_attrs.has_key? :no_reuse
- params[:find_or_create] = !resource_attrs.delete(:no_reuse)
- end
-
- return super if !params[:find_or_create]
- return if !load_filters_param
-
- begin
- @object = Job.find_reusable(resource_attrs, params, @filters, @read_users)
- rescue ArgumentError => error
- return send_error(error.message)
- end
-
- if @object
- show
- else
- super
- end
+ return send_error("Unsupported legacy jobs API",
+ status: 400)
end
def cancel
- reload_object_before_update
- @object.cancel cascade: params[:cascade]
- show
+ return send_error("Unsupported legacy jobs API",
+ status: 400)
end
def lock
- @object.lock current_user.uuid
- show
- end
-
- class LogStreamer
- Q_UPDATE_INTERVAL = 12
- def initialize(job, opts={})
- @job = job
- @opts = opts
- end
- def each
- if @job.finished_at
- yield "#{@job.uuid} finished at #{@job.finished_at}\n"
- return
- end
- while not @job.started_at
- # send a summary (job queue + available nodes) to the client
- # every few seconds while waiting for the job to start
- current_time = db_current_time
- last_ack_at ||= current_time - Q_UPDATE_INTERVAL - 1
- if current_time - last_ack_at >= Q_UPDATE_INTERVAL
- nodes_in_state = {idle: 0, alloc: 0}
- ActiveRecord::Base.uncached do
- Node.where('hostname is not ?', nil).collect do |n|
- if n.info[:slurm_state]
- nodes_in_state[n.info[:slurm_state]] ||= 0
- nodes_in_state[n.info[:slurm_state]] += 1
- end
- end
- end
- job_queue = Job.queue.select(:uuid)
- n_queued_before_me = 0
- job_queue.each do |j|
- break if j.uuid == @job.uuid
- n_queued_before_me += 1
- end
- yield "#{db_current_time}" \
- " job #{@job.uuid}" \
- " queue_position #{n_queued_before_me}" \
- " queue_size #{job_queue.count}" \
- " nodes_idle #{nodes_in_state[:idle]}" \
- " nodes_alloc #{nodes_in_state[:alloc]}\n"
- last_ack_at = db_current_time
- end
- sleep 3
- ActiveRecord::Base.uncached do
- @job.reload
- end
- end
- end
+ return send_error("Unsupported legacy jobs API",
+ status: 400)
end
def queue
- params[:order] ||= ['priority desc', 'created_at']
- load_limit_offset_order_params
- load_where_param
- @where.merge!({state: Job::Queued})
- return if !load_filters_param
- find_objects_for_index
- index
+ return send_error("Unsupported legacy jobs API",
+ status: 400)
end
def queue_size
- # Users may not be allowed to see all the jobs in the queue, so provide a
- # method to get just the queue size in order to get a gist of how busy the
- # cluster is.
- render :json => {:queue_size => Job.queue.size}
+ return send_error("Unsupported legacy jobs API",
+ status: 400)
end
def self._create_requires_parameters
accept_attribute_as_json :properties, Hash
accept_attribute_as_json :components_summary, Hash
+ def create
+ return send_error("Unsupported legacy jobs API",
+ status: 400)
+ end
+
def cancel
- reload_object_before_update
- @object.cancel cascade: params[:cascade]
- show
+ return send_error("Unsupported legacy jobs API",
+ status: 400)
end
end
class Arvados::V1::PipelineTemplatesController < ApplicationController
accept_attribute_as_json :components, Hash
+
+ def create
+ return send_error("Unsupported legacy jobs API",
+ status: 400)
+ end
end
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-require 'open3'
-require 'shellwords'
-
-class CrunchDispatch
- extend DbCurrentTime
- include ApplicationHelper
- include Process
-
- EXIT_TEMPFAIL = 75
- EXIT_RETRY_UNLOCKED = 93
- RETRY_UNLOCKED_LIMIT = 3
-
- class LogTime < Time
- def to_s
- self.utc.strftime "%Y-%m-%d_%H:%M:%S"
- end
- end
-
- def initialize
- @crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
- if @crunch_job_bin.empty?
- raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
- end
-
- @docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN']
- @docker_run_args = ENV['CRUNCH_JOB_DOCKER_RUN_ARGS']
- @cgroup_root = ENV['CRUNCH_CGROUP_ROOT']
- @srun_sync_timeout = ENV['CRUNCH_SRUN_SYNC_TIMEOUT']
-
- @arvados_internal = Rails.configuration.Containers.JobsAPI.GitInternalDir
- if not File.exist? @arvados_internal
- $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
- raise "No internal git repository available" unless ($? == 0)
- end
-
- @repo_root = Rails.configuration.Git.Repositories
- @arvados_repo_path = Repository.where(name: "arvados").first.server_path
- @authorizations = {}
- @did_recently = {}
- @fetched_commits = {}
- @git_tags = {}
- @node_state = {}
- @pipe_auth_tokens = {}
- @running = {}
- @todo = []
- @todo_job_retries = {}
- @job_retry_counts = Hash.new(0)
- @todo_pipelines = []
- end
-
- def sysuser
- return act_as_system_user
- end
-
- def refresh_todo
- if @runoptions[:jobs]
- @todo = @todo_job_retries.values + Job.queue.select(&:repository)
- end
- if @runoptions[:pipelines]
- @todo_pipelines = PipelineInstance.queue
- end
- end
-
- def each_slurm_line(cmd, outfmt, max_fields=nil)
- max_fields ||= outfmt.split(":").size
- max_fields += 1 # To accommodate the node field we add
- @@slurm_version ||= Gem::Version.new(`sinfo --version`.match(/\b[\d\.]+\b/)[0])
- if Gem::Version.new('2.3') <= @@slurm_version
- `#{cmd} --noheader -o '%n:#{outfmt}'`.each_line do |line|
- yield line.chomp.split(":", max_fields)
- end
- else
- # Expand rows with hostname ranges (like "foo[1-3,5,9-12]:idle")
- # into multiple rows with one hostname each.
- `#{cmd} --noheader -o '%N:#{outfmt}'`.each_line do |line|
- tokens = line.chomp.split(":", max_fields)
- if (re = tokens[0].match(/^(.*?)\[([-,\d]+)\]$/))
- tokens.shift
- re[2].split(",").each do |range|
- range = range.split("-").collect(&:to_i)
- (range[0]..range[-1]).each do |n|
- yield [re[1] + n.to_s] + tokens
- end
- end
- else
- yield tokens
- end
- end
- end
- end
-
- def slurm_status
- slurm_nodes = {}
- each_slurm_line("sinfo", "%t") do |hostname, state|
- # Treat nodes in idle* state as down, because the * means that slurm
- # hasn't been able to communicate with it recently.
- state.sub!(/^idle\*/, "down")
- state.sub!(/\W+$/, "")
- state = "down" unless %w(idle alloc comp mix drng down).include?(state)
- slurm_nodes[hostname] = {state: state, job: nil}
- end
- each_slurm_line("squeue", "%j") do |hostname, job_uuid|
- slurm_nodes[hostname][:job] = job_uuid if slurm_nodes[hostname]
- end
- slurm_nodes
- end
-
- def update_node_status
- return unless Rails.configuration.Containers.JobsAPI.CrunchJobWrapper.to_s.match(/^slurm/)
- slurm_status.each_pair do |hostname, slurmdata|
- next if @node_state[hostname] == slurmdata
- begin
- node = Node.where('hostname=?', hostname).order(:last_ping_at).last
- if node
- $stderr.puts "dispatch: update #{hostname} state to #{slurmdata}"
- node.info["slurm_state"] = slurmdata[:state]
- node.job_uuid = slurmdata[:job]
- if node.save
- @node_state[hostname] = slurmdata
- else
- $stderr.puts "dispatch: failed to update #{node.uuid}: #{node.errors.messages}"
- end
- elsif slurmdata[:state] != 'down'
- $stderr.puts "dispatch: SLURM reports '#{hostname}' is not down, but no node has that name"
- end
- rescue => error
- $stderr.puts "dispatch: error updating #{hostname} node status: #{error}"
- end
- end
- end
-
- def positive_int(raw_value, default=nil)
- value = begin raw_value.to_i rescue 0 end
- if value > 0
- value
- else
- default
- end
- end
-
- NODE_CONSTRAINT_MAP = {
- # Map Job runtime_constraints keys to the corresponding Node info key.
- 'min_ram_mb_per_node' => 'total_ram_mb',
- 'min_scratch_mb_per_node' => 'total_scratch_mb',
- 'min_cores_per_node' => 'total_cpu_cores',
- }
-
- def nodes_available_for_job_now(job)
- # Find Nodes that satisfy a Job's runtime constraints (by building
- # a list of Procs and using them to test each Node). If there
- # enough to run the Job, return an array of their names.
- # Otherwise, return nil.
- need_procs = NODE_CONSTRAINT_MAP.each_pair.map do |job_key, node_key|
- Proc.new do |node|
- positive_int(node.properties[node_key], 0) >=
- positive_int(job.runtime_constraints[job_key], 0)
- end
- end
- min_node_count = positive_int(job.runtime_constraints['min_nodes'], 1)
- usable_nodes = []
- Node.all.select do |node|
- node.info['slurm_state'] == 'idle'
- end.sort_by do |node|
- # Prefer nodes with no price, then cheap nodes, then expensive nodes
- node.properties['cloud_node']['price'].to_f rescue 0
- end.each do |node|
- if need_procs.select { |need_proc| not need_proc.call(node) }.any?
- # At least one runtime constraint is not satisfied by this node
- next
- end
- usable_nodes << node
- if usable_nodes.count >= min_node_count
- hostnames = usable_nodes.map(&:hostname)
- log_nodes = usable_nodes.map do |n|
- "#{n.hostname} #{n.uuid} #{n.properties.to_json}"
- end
- log_job = "#{job.uuid} #{job.runtime_constraints}"
- log_text = "dispatching job #{log_job} to #{log_nodes.join(", ")}"
- $stderr.puts log_text
- begin
- act_as_system_user do
- Log.new(object_uuid: job.uuid,
- event_type: 'dispatch',
- owner_uuid: system_user_uuid,
- summary: "dispatching to #{hostnames.join(", ")}",
- properties: {'text' => log_text}).save!
- end
- rescue => e
- $stderr.puts "dispatch: log.create failed: #{e}"
- end
- return hostnames
- end
- end
- nil
- end
-
- def nodes_available_for_job(job)
- # Check if there are enough idle nodes with the Job's minimum
- # hardware requirements to run it. If so, return an array of
- # their names. If not, up to once per hour, signal start_jobs to
- # hold off launching Jobs. This delay is meant to give the Node
- # Manager an opportunity to make new resources available for new
- # Jobs.
- #
- # The exact timing parameters here might need to be adjusted for
- # the best balance between helping the longest-waiting Jobs run,
- # and making efficient use of immediately available resources.
- # These are all just first efforts until we have more data to work
- # with.
- nodelist = nodes_available_for_job_now(job)
- if nodelist.nil? and not did_recently(:wait_for_available_nodes, 3600)
- $stderr.puts "dispatch: waiting for nodes for #{job.uuid}"
- @node_wait_deadline = Time.now + 5.minutes
- end
- nodelist
- end
-
- def fail_job job, message, skip_lock: false
- $stderr.puts "dispatch: #{job.uuid}: #{message}"
- begin
- Log.new(object_uuid: job.uuid,
- event_type: 'dispatch',
- owner_uuid: job.owner_uuid,
- summary: message,
- properties: {"text" => message}).save!
- rescue => e
- $stderr.puts "dispatch: log.create failed: #{e}"
- end
-
- if not skip_lock and not have_job_lock?(job)
- begin
- job.lock @authorizations[job.uuid].user.uuid
- rescue ArvadosModel::AlreadyLockedError
- $stderr.puts "dispatch: tried to mark job #{job.uuid} as failed but it was already locked by someone else"
- return
- end
- end
-
- job.state = "Failed"
- if not job.save
- $stderr.puts "dispatch: save failed setting job #{job.uuid} to failed"
- end
- end
-
- def stdout_s(cmd_a, opts={})
- IO.popen(cmd_a, "r", opts) do |pipe|
- return pipe.read.chomp
- end
- end
-
- def git_cmd(*cmd_a)
- ["git", "--git-dir=#{@arvados_internal}"] + cmd_a
- end
-
- def get_authorization(job)
- if @authorizations[job.uuid] and
- @authorizations[job.uuid].user.uuid != job.modified_by_user_uuid
- # We already made a token for this job, but we need a new one
- # because modified_by_user_uuid has changed (the job will run
- # as a different user).
- @authorizations[job.uuid].update_attributes expires_at: Time.now
- @authorizations[job.uuid] = nil
- end
- if not @authorizations[job.uuid]
- auth = ApiClientAuthorization.
- new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
- api_client_id: 0)
- if not auth.save
- $stderr.puts "dispatch: auth.save failed for #{job.uuid}"
- else
- @authorizations[job.uuid] = auth
- end
- end
- @authorizations[job.uuid]
- end
-
- def internal_repo_has_commit? sha1
- if (not @fetched_commits[sha1] and
- sha1 == stdout_s(git_cmd("rev-list", "-n1", sha1), err: "/dev/null") and
- $? == 0)
- @fetched_commits[sha1] = true
- end
- return @fetched_commits[sha1]
- end
-
- def get_commit src_repo, sha1
- return true if internal_repo_has_commit? sha1
-
- # commit does not exist in internal repository, so import the
- # source repository using git fetch-pack
- cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo)
- $stderr.puts "dispatch: #{cmd}"
- $stderr.puts(stdout_s(cmd))
- @fetched_commits[sha1] = ($? == 0)
- end
-
- def tag_commit(job, commit_hash, tag_name)
- # @git_tags[T]==V if we know commit V has been tagged T in the
- # arvados_internal repository.
- if not @git_tags[tag_name]
- cmd = git_cmd("tag", tag_name, commit_hash)
- $stderr.puts "dispatch: #{cmd}"
- $stderr.puts(stdout_s(cmd, err: "/dev/null"))
- unless $? == 0
- # git tag failed. This may be because the tag already exists, so check for that.
- tag_rev = stdout_s(git_cmd("rev-list", "-n1", tag_name))
- if $? == 0
- # We got a revision back
- if tag_rev != commit_hash
- # Uh oh, the tag doesn't point to the revision we were expecting.
- # Someone has been monkeying with the job record and/or git.
- fail_job job, "Existing tag #{tag_name} points to commit #{tag_rev} but expected commit #{commit_hash}"
- return nil
- end
- # we're okay (fall through to setting @git_tags below)
- else
- # git rev-list failed for some reason.
- fail_job job, "'git tag' for #{tag_name} failed but did not find any existing tag using 'git rev-list'"
- return nil
- end
- end
- # 'git tag' was successful, or there is an existing tag that points to the same revision.
- @git_tags[tag_name] = commit_hash
- elsif @git_tags[tag_name] != commit_hash
- fail_job job, "Existing tag #{tag_name} points to commit #{@git_tags[tag_name]} but this job uses commit #{commit_hash}"
- return nil
- end
- @git_tags[tag_name]
- end
-
- def start_jobs
- @todo.each do |job|
- next if @running[job.uuid]
-
- cmd_args = nil
- case Rails.configuration.Containers.JobsAPI.CrunchJobWrapper
- when "none"
- if @running.size > 0
- # Don't run more than one at a time.
- return
- end
- cmd_args = []
- when "slurm_immediate"
- nodelist = nodes_available_for_job(job)
- if nodelist.nil?
- if Time.now < @node_wait_deadline
- break
- else
- next
- end
- end
- cmd_args = ["salloc",
- "--chdir=/",
- "--immediate",
- "--exclusive",
- "--no-kill",
- "--job-name=#{job.uuid}",
- "--nodelist=#{nodelist.join(',')}"]
- else
- raise "Unknown crunch_job_wrapper: #{Rails.configuration.Containers.JobsAPI.CrunchJobWrapper}"
- end
-
- cmd_args = sudo_preface + cmd_args
-
- next unless get_authorization job
-
- ready = internal_repo_has_commit? job.script_version
-
- if not ready
- # Import the commit from the specified repository into the
- # internal repository. This should have been done already when
- # the job was created/updated; this code is obsolete except to
- # avoid deployment races. Failing the job would be a
- # reasonable thing to do at this point.
- repo = Repository.where(name: job.repository).first
- if repo.nil? or repo.server_path.nil?
- fail_job job, "Repository #{job.repository} not found under #{@repo_root}"
- next
- end
- ready &&= get_commit repo.server_path, job.script_version
- ready &&= tag_commit job, job.script_version, job.uuid
- end
-
- # This should be unnecessary, because API server does it during
- # job create/update, but it's still not a bad idea to verify the
- # tag is correct before starting the job:
- ready &&= tag_commit job, job.script_version, job.uuid
-
- # The arvados_sdk_version doesn't support use of arbitrary
- # remote URLs, so the requested version isn't necessarily copied
- # into the internal repository yet.
- if job.arvados_sdk_version
- ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version
- ready &&= tag_commit job, job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
- end
-
- if not ready
- fail_job job, "commit not present in internal repository"
- next
- end
-
- cmd_args += [@crunch_job_bin,
- '--job-api-token', @authorizations[job.uuid].api_token,
- '--job', job.uuid,
- '--git-dir', @arvados_internal]
-
- if @cgroup_root
- cmd_args += ['--cgroup-root', @cgroup_root]
- end
-
- if @docker_bin
- cmd_args += ['--docker-bin', @docker_bin]
- end
-
- if @docker_run_args
- cmd_args += ['--docker-run-args', @docker_run_args]
- end
-
- if @srun_sync_timeout
- cmd_args += ['--srun-sync-timeout', @srun_sync_timeout]
- end
-
- if have_job_lock?(job)
- cmd_args << "--force-unlock"
- end
-
- $stderr.puts "dispatch: #{cmd_args.join ' '}"
-
- begin
- i, o, e, t = Open3.popen3(*cmd_args)
- rescue
- $stderr.puts "dispatch: popen3: #{$!}"
- # This is a dispatch problem like "Too many open files";
- # retrying another job right away would be futile. Just return
- # and hope things are better next time, after (at least) a
- # did_recently() delay.
- return
- end
-
- $stderr.puts "dispatch: job #{job.uuid}"
- start_banner = "dispatch: child #{t.pid} start #{LogTime.now}"
- $stderr.puts start_banner
-
- @running[job.uuid] = {
- stdin: i,
- stdout: o,
- stderr: e,
- wait_thr: t,
- job: job,
- buf: {stderr: '', stdout: ''},
- started: false,
- sent_int: 0,
- job_auth: @authorizations[job.uuid],
- stderr_buf_to_flush: '',
- stderr_flushed_at: Time.new(0),
- bytes_logged: 0,
- events_logged: 0,
- log_throttle_is_open: true,
- log_throttle_reset_time: Time.now + Rails.configuration.Containers.Logging.LogThrottlePeriod,
- log_throttle_bytes_so_far: 0,
- log_throttle_lines_so_far: 0,
- log_throttle_bytes_skipped: 0,
- log_throttle_partial_line_last_at: Time.new(0),
- log_throttle_first_partial_line: true,
- }
- i.close
- @todo_job_retries.delete(job.uuid)
- update_node_status
- end
- end
-
- # Test for hard cap on total output and for log throttling. Returns whether
- # the log line should go to output or not. Modifies "line" in place to
- # replace it with an error if a logging limit is tripped.
- def rate_limit running_job, line
- message = false
- linesize = line.size
- if running_job[:log_throttle_is_open]
- partial_line = false
- skip_counts = false
- matches = line.match(/^\S+ \S+ \d+ \d+ stderr (.*)/)
- if matches and matches[1] and matches[1].start_with?('[...]') and matches[1].end_with?('[...]')
- partial_line = true
- if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.Containers.Logging.LogPartialLineThrottlePeriod
- running_job[:log_throttle_partial_line_last_at] = Time.now
- else
- skip_counts = true
- end
- end
-
- if !skip_counts
- running_job[:log_throttle_lines_so_far] += 1
- running_job[:log_throttle_bytes_so_far] += linesize
- running_job[:bytes_logged] += linesize
- end
-
- if (running_job[:bytes_logged] >
- Rails.configuration.Containers.Logging.LimitLogBytesPerJob)
- message = "Exceeded log limit #{Rails.configuration.Containers.Logging.LimitLogBytesPerJob} bytes (LimitLogBytesPerJob). Log will be truncated."
- running_job[:log_throttle_reset_time] = Time.now + 100.years
- running_job[:log_throttle_is_open] = false
-
- elsif (running_job[:log_throttle_bytes_so_far] >
- Rails.configuration.Containers.Logging.LogThrottleBytes)
- remaining_time = running_job[:log_throttle_reset_time] - Time.now
- message = "Exceeded rate #{Rails.configuration.Containers.Logging.LogThrottleBytes} bytes per #{Rails.configuration.Containers.Logging.LogThrottlePeriod} seconds (LogThrottleBytes). Logging will be silenced for the next #{remaining_time.round} seconds."
- running_job[:log_throttle_is_open] = false
-
- elsif (running_job[:log_throttle_lines_so_far] >
- Rails.configuration.Containers.Logging.LogThrottleLines)
- remaining_time = running_job[:log_throttle_reset_time] - Time.now
- message = "Exceeded rate #{Rails.configuration.Containers.Logging.LogThrottleLines} lines per #{Rails.configuration.Containers.Logging.LogThrottlePeriod} seconds (LogThrottleLines), logging will be silenced for the next #{remaining_time.round} seconds."
- running_job[:log_throttle_is_open] = false
-
- elsif partial_line and running_job[:log_throttle_first_partial_line]
- running_job[:log_throttle_first_partial_line] = false
- message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.Containers.Logging.LogPartialLineThrottlePeriod} seconds."
- end
- end
-
- if not running_job[:log_throttle_is_open]
- # Don't log anything if any limit has been exceeded. Just count lossage.
- running_job[:log_throttle_bytes_skipped] += linesize
- end
-
- if message
- # Yes, write to logs, but use our "rate exceeded" message
- # instead of the log message that exceeded the limit.
- message += " A complete log is still being written to Keep, and will be available when the job finishes.\n"
- line.replace message
- true
- elsif partial_line
- false
- else
- running_job[:log_throttle_is_open]
- end
- end
-
- def read_pipes
- @running.each do |job_uuid, j|
- now = Time.now
- if now > j[:log_throttle_reset_time]
- # It has been more than throttle_period seconds since the last
- # checkpoint so reset the throttle
- if j[:log_throttle_bytes_skipped] > 0
- message = "#{job_uuid} ! Skipped #{j[:log_throttle_bytes_skipped]} bytes of log"
- $stderr.puts message
- j[:stderr_buf_to_flush] << "#{LogTime.now} #{message}\n"
- end
-
- j[:log_throttle_reset_time] = now + Rails.configuration.Containers.Logging.LogThrottlePeriod
- j[:log_throttle_bytes_so_far] = 0
- j[:log_throttle_lines_so_far] = 0
- j[:log_throttle_bytes_skipped] = 0
- j[:log_throttle_is_open] = true
- j[:log_throttle_partial_line_last_at] = Time.new(0)
- j[:log_throttle_first_partial_line] = true
- end
-
- j[:buf].each do |stream, streambuf|
- # Read some data from the child stream
- buf = ''
- begin
- # It's important to use a big enough buffer here. When we're
- # being flooded with logs, we must read and discard many
- # bytes at once. Otherwise, we can easily peg a CPU with
- # time-checking and other loop overhead. (Quick tests show a
- # 1MiB buffer working 2.5x as fast as a 64 KiB buffer.)
- #
- # So don't reduce this buffer size!
- buf = j[stream].read_nonblock(2**20)
- rescue Errno::EAGAIN, EOFError
- end
-
- # Short circuit the counting code if we're just going to throw
- # away the data anyway.
- if not j[:log_throttle_is_open]
- j[:log_throttle_bytes_skipped] += streambuf.size + buf.size
- streambuf.replace ''
- next
- elsif buf == ''
- next
- end
-
- # Append to incomplete line from previous read, if any
- streambuf << buf
-
- bufend = ''
- streambuf.each_line do |line|
- if not line.end_with? $/
- if line.size > Rails.configuration.Containers.Logging.LogThrottleBytes
- # Without a limit here, we'll use 2x an arbitrary amount
- # of memory, and waste a lot of time copying strings
- # around, all without providing any feedback to anyone
- # about what's going on _or_ hitting any of our throttle
- # limits.
- #
- # Here we leave "line" alone, knowing it will never be
- # sent anywhere: rate_limit() will reach
- # crunch_log_throttle_bytes immediately. However, we'll
- # leave [...] in bufend: if the trailing end of the long
- # line does end up getting sent anywhere, it will have
- # some indication that it is incomplete.
- bufend = "[...]"
- else
- # If line length is sane, we'll wait for the rest of the
- # line to appear in the next read_pipes() call.
- bufend = line
- break
- end
- end
- # rate_limit returns true or false as to whether to actually log
- # the line or not. It also modifies "line" in place to replace
- # it with an error if a logging limit is tripped.
- if rate_limit j, line
- $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
- $stderr.puts line
- pub_msg = "#{LogTime.now} #{line.strip}\n"
- j[:stderr_buf_to_flush] << pub_msg
- end
- end
-
- # Leave the trailing incomplete line (if any) in streambuf for
- # next time.
- streambuf.replace bufend
- end
- # Flush buffered logs to the logs table, if appropriate. We have
- # to do this even if we didn't collect any new logs this time:
- # otherwise, buffered data older than seconds_between_events
- # won't get flushed until new data arrives.
- write_log j
- end
- end
-
- def reap_children
- return if 0 == @running.size
- pid_done = nil
- j_done = nil
-
- @running.each do |uuid, j|
- if !j[:wait_thr].status
- pid_done = j[:wait_thr].pid
- j_done = j
- break
- end
- end
-
- return if !pid_done
-
- job_done = j_done[:job]
-
- # Ensure every last drop of stdout and stderr is consumed.
- read_pipes
- # Reset flush timestamp to make sure log gets written.
- j_done[:stderr_flushed_at] = Time.new(0)
- # Write any remaining logs.
- write_log j_done
-
- j_done[:buf].each do |stream, streambuf|
- if streambuf != ''
- $stderr.puts streambuf + "\n"
- end
- end
-
- # Wait the thread (returns a Process::Status)
- exit_status = j_done[:wait_thr].value.exitstatus
- exit_tempfail = exit_status == EXIT_TEMPFAIL
-
- $stderr.puts "dispatch: child #{pid_done} exit #{exit_status}"
- $stderr.puts "dispatch: job #{job_done.uuid} end"
-
- jobrecord = Job.find_by_uuid(job_done.uuid)
-
- if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid)
- $stderr.puts("dispatch: job #{jobrecord.uuid} was interrupted by node failure")
- # Only this crunch-dispatch process can retry the job:
- # it's already locked, and there's no way to put it back in the
- # Queued state. Put it in our internal todo list unless the job
- # has failed this way excessively.
- @job_retry_counts[jobrecord.uuid] += 1
- exit_tempfail = @job_retry_counts[jobrecord.uuid] <= RETRY_UNLOCKED_LIMIT
- do_what_next = "give up now"
- if exit_tempfail
- @todo_job_retries[jobrecord.uuid] = jobrecord
- do_what_next = "re-attempt"
- end
- $stderr.puts("dispatch: job #{jobrecord.uuid} has been interrupted " +
- "#{@job_retry_counts[jobrecord.uuid]}x, will #{do_what_next}")
- end
-
- if !exit_tempfail
- @job_retry_counts.delete(jobrecord.uuid)
- if jobrecord.state == "Running"
- # Apparently there was an unhandled error. That could potentially
- # include "all allocated nodes failed" when we don't to retry
- # because the job has already been retried RETRY_UNLOCKED_LIMIT
- # times. Fail the job.
- jobrecord.state = "Failed"
- if not jobrecord.save
- $stderr.puts "dispatch: jobrecord.save failed"
- end
- end
- else
- # If the job failed to run due to an infrastructure
- # issue with crunch-job or slurm, we want the job to stay in the
- # queue. If crunch-job exited after losing a race to another
- # crunch-job process, it exits 75 and we should leave the job
- # record alone so the winner of the race can do its thing.
- # If crunch-job exited after all of its allocated nodes failed,
- # it exits 93, and we want to retry it later (see the
- # EXIT_RETRY_UNLOCKED `if` block).
- #
- # There is still an unhandled race condition: If our crunch-job
- # process is about to lose a race with another crunch-job
- # process, but crashes before getting to its "exit 75" (for
- # example, "cannot fork" or "cannot reach API server") then we
- # will assume incorrectly that it's our process's fault
- # jobrecord.started_at is non-nil, and mark the job as failed
- # even though the winner of the race is probably still doing
- # fine.
- end
-
- # Invalidate the per-job auth token, unless the job is still queued and we
- # might want to try it again.
- if jobrecord.state != "Queued" and !@todo_job_retries.include?(jobrecord.uuid)
- j_done[:job_auth].update_attributes expires_at: Time.now
- end
-
- @running.delete job_done.uuid
- end
-
- def update_pipelines
- expire_tokens = @pipe_auth_tokens.dup
- @todo_pipelines.each do |p|
- pipe_auth = (@pipe_auth_tokens[p.uuid] ||= ApiClientAuthorization.
- create(user: User.where('uuid=?', p.modified_by_user_uuid).first,
- api_client_id: 0))
- puts `export ARVADOS_API_TOKEN=#{pipe_auth.api_token} && arv-run-pipeline-instance --run-pipeline-here --no-wait --instance #{p.uuid}`
- expire_tokens.delete p.uuid
- end
-
- expire_tokens.each do |k, v|
- v.update_attributes expires_at: Time.now
- @pipe_auth_tokens.delete k
- end
- end
-
- def parse_argv argv
- @runoptions = {}
- (argv.any? ? argv : ['--jobs', '--pipelines']).each do |arg|
- case arg
- when '--jobs'
- @runoptions[:jobs] = true
- when '--pipelines'
- @runoptions[:pipelines] = true
- else
- abort "Unrecognized command line option '#{arg}'"
- end
- end
- if not (@runoptions[:jobs] or @runoptions[:pipelines])
- abort "Nothing to do. Please specify at least one of: --jobs, --pipelines."
- end
- end
-
- def run argv
- parse_argv argv
-
- # We want files written by crunch-dispatch to be writable by other
- # processes with the same GID, see bug #7228
- File.umask(0002)
-
- # This is how crunch-job child procs know where the "refresh"
- # trigger file is
- ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.Containers.JobsAPI.CrunchRefreshTrigger
-
- # If salloc can't allocate resources immediately, make it use our
- # temporary failure exit code. This ensures crunch-dispatch won't
- # mark a job failed because of an issue with node allocation.
- # This often happens when another dispatcher wins the race to
- # allocate nodes.
- ENV["SLURM_EXIT_IMMEDIATE"] = CrunchDispatch::EXIT_TEMPFAIL.to_s
-
- if ENV["CRUNCH_DISPATCH_LOCKFILE"]
- lockfilename = ENV.delete "CRUNCH_DISPATCH_LOCKFILE"
- lockfile = File.open(lockfilename, File::RDWR|File::CREAT, 0644)
- unless lockfile.flock File::LOCK_EX|File::LOCK_NB
- abort "Lock unavailable on #{lockfilename} - exit"
- end
- end
-
- @signal = {}
- %w{TERM INT}.each do |sig|
- signame = sig
- Signal.trap(sig) do
- $stderr.puts "Received #{signame} signal"
- @signal[:term] = true
- end
- end
-
- act_as_system_user
- User.first.group_permissions
- $stderr.puts "dispatch: ready"
- while !@signal[:term] or @running.size > 0
- read_pipes
- if @signal[:term]
- @running.each do |uuid, j|
- if !j[:started] and j[:sent_int] < 2
- begin
- Process.kill 'INT', j[:wait_thr].pid
- rescue Errno::ESRCH
- # No such pid = race condition + desired result is
- # already achieved
- end
- j[:sent_int] += 1
- end
- end
- else
- refresh_todo unless did_recently(:refresh_todo, 1.0)
- update_node_status unless did_recently(:update_node_status, 1.0)
- unless @todo.empty? or did_recently(:start_jobs, 1.0) or @signal[:term]
- start_jobs
- end
- unless (@todo_pipelines.empty? and @pipe_auth_tokens.empty?) or did_recently(:update_pipelines, 5.0)
- update_pipelines
- end
- unless did_recently('check_orphaned_slurm_jobs', 60)
- check_orphaned_slurm_jobs
- end
- end
- reap_children
- select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten,
- [], [], 1)
- end
- # If there are jobs we wanted to retry, we have to mark them as failed now.
- # Other dispatchers can't pick them up because we hold their lock.
- @todo_job_retries.each_key do |job_uuid|
- job = Job.find_by_uuid(job_uuid)
- if job.state == "Running"
- fail_job(job, "crunch-dispatch was stopped during job's tempfail retry loop")
- end
- end
- end
-
- def fail_jobs before: nil
- act_as_system_user do
- threshold = nil
- if before == 'reboot'
- boottime = nil
- open('/proc/stat').map(&:split).each do |stat, t|
- if stat == 'btime'
- boottime = t
- end
- end
- if not boottime
- raise "Could not find btime in /proc/stat"
- end
- threshold = Time.at(boottime.to_i)
- elsif before
- threshold = Time.parse(before, Time.now)
- else
- threshold = db_current_time
- end
- Rails.logger.info "fail_jobs: threshold is #{threshold}"
-
- squeue = squeue_jobs
- Job.where('state = ? and started_at < ?', Job::Running, threshold).
- each do |job|
- Rails.logger.debug "fail_jobs: #{job.uuid} started #{job.started_at}"
- squeue.each do |slurm_name|
- if slurm_name == job.uuid
- Rails.logger.info "fail_jobs: scancel #{job.uuid}"
- scancel slurm_name
- end
- end
- fail_job(job, "cleaned up stale job: started before #{threshold}",
- skip_lock: true)
- end
- end
- end
-
- def check_orphaned_slurm_jobs
- act_as_system_user do
- squeue_uuids = squeue_jobs.select{|uuid| uuid.match(/^[0-9a-z]{5}-8i9sb-[0-9a-z]{15}$/)}.
- select{|uuid| !@running.has_key?(uuid)}
-
- return if squeue_uuids.size == 0
-
- scancel_uuids = squeue_uuids - Job.where('uuid in (?) and (state in (?) or modified_at>?)',
- squeue_uuids,
- ['Running', 'Queued'],
- (Time.now - 60)).
- collect(&:uuid)
- scancel_uuids.each do |uuid|
- Rails.logger.info "orphaned job: scancel #{uuid}"
- scancel uuid
- end
- end
- end
-
- def sudo_preface
- return [] if not Rails.configuration.Containers.JobsAPI.CrunchJobUser
- ["sudo", "-E", "-u",
- Rails.configuration.Containers.JobsAPI.CrunchJobUser,
- "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
- "PATH=#{ENV['PATH']}",
- "PERLLIB=#{ENV['PERLLIB']}",
- "PYTHONPATH=#{ENV['PYTHONPATH']}",
- "RUBYLIB=#{ENV['RUBYLIB']}",
- "GEM_PATH=#{ENV['GEM_PATH']}"]
- end
-
- protected
-
- def have_job_lock?(job)
- # Return true if the given job is locked by this crunch-dispatch, normally
- # because we've run crunch-job for it.
- @todo_job_retries.include?(job.uuid)
- end
-
- def did_recently(thing, min_interval)
- if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
- @did_recently[thing] = Time.now
- false
- else
- true
- end
- end
-
- # send message to log table. we want these records to be transient
- def write_log running_job
- return if running_job[:stderr_buf_to_flush] == ''
-
- # Send out to log event if buffer size exceeds the bytes per event or if
- # it has been at least crunch_log_seconds_between_events seconds since
- # the last flush.
- if running_job[:stderr_buf_to_flush].size > Rails.configuration.Containers.Logging.LogBytesPerEvent or
- (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.Containers.Logging.LogSecondsBetweenEvents
- begin
- log = Log.new(object_uuid: running_job[:job].uuid,
- event_type: 'stderr',
- owner_uuid: running_job[:job].owner_uuid,
- properties: {"text" => running_job[:stderr_buf_to_flush]})
- log.save!
- running_job[:events_logged] += 1
- rescue => exception
- $stderr.puts "Failed to write logs"
- $stderr.puts exception.backtrace
- end
- running_job[:stderr_buf_to_flush] = ''
- running_job[:stderr_flushed_at] = Time.now
- end
- end
-
- # An array of job_uuids in squeue
- def squeue_jobs
- if Rails.configuration.Containers.JobsAPI.CrunchJobWrapper == "slurm_immediate"
- p = IO.popen(['squeue', '-a', '-h', '-o', '%j'])
- begin
- p.readlines.map {|line| line.strip}
- ensure
- p.close
- end
- else
- []
- end
- end
-
- def scancel slurm_name
- cmd = sudo_preface + ['scancel', '-n', slurm_name]
- IO.popen(cmd) do |scancel_pipe|
- puts scancel_pipe.read
- end
- if not $?.success?
- Rails.logger.error "scancel #{slurm_name.shellescape}: $?"
- end
- end
-end
+++ /dev/null
-#!/usr/bin/env ruby
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-dispatch_argv = []
-ARGV.reject! do |arg|
- dispatch_argv.push(arg) if /^--/ =~ arg
-end
-
-ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development"
-require File.dirname(__FILE__) + '/../config/boot'
-require File.dirname(__FILE__) + '/../config/environment'
-require './lib/crunch_dispatch.rb'
-
-CrunchDispatch.new.run dispatch_argv
+++ /dev/null
-#! /usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-import argparse
-import datetime
-import json
-import re
-import sys
-
-import arvados
-
-# Useful configuration variables:
-
-# Number of log lines to use as context in diagnosing failure.
-LOG_CONTEXT_LINES = 10
-
-# Regex that signifies a failed task.
-FAILED_TASK_REGEX = re.compile(' \d+ failure (.*permanent)')
-
-# Regular expressions used to classify failure types.
-JOB_FAILURE_TYPES = {
- 'sys/docker': 'Cannot destroy container',
- 'crunch/node': 'User not found on host',
- 'slurm/comm': 'Communication connection failure'
-}
-
-def parse_arguments(arguments):
- arg_parser = argparse.ArgumentParser(
- description='Produce a report of Crunch failures within a specified time range')
-
- arg_parser.add_argument(
- '--start',
- help='Start date and time')
- arg_parser.add_argument(
- '--end',
- help='End date and time')
-
- args = arg_parser.parse_args(arguments)
-
- if args.start and not is_valid_timestamp(args.start):
- raise ValueError(args.start)
- if args.end and not is_valid_timestamp(args.end):
- raise ValueError(args.end)
-
- return args
-
-
-def api_timestamp(when=None):
- """Returns a string representing the timestamp 'when' in a format
- suitable for delivering to the API server. Defaults to the
- current time.
- """
- if when is None:
- when = datetime.datetime.utcnow()
- return when.strftime("%Y-%m-%dT%H:%M:%SZ")
-
-
-def is_valid_timestamp(ts):
- return re.match(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z', ts)
-
-
-def jobs_created_between_dates(api, start, end):
- return arvados.util.list_all(
- api.jobs().list,
- filters=json.dumps([ ['created_at', '>=', start],
- ['created_at', '<=', end] ]))
-
-
-def job_logs(api, job):
- # Returns the contents of the log for this job (as an array of lines).
- if job['log']:
- log_collection = arvados.CollectionReader(job['log'], api)
- log_filename = "{}.log.txt".format(job['uuid'])
- return log_collection.open(log_filename).readlines()
- return []
-
-
-user_names = {}
-def job_user_name(api, user_uuid):
- def _lookup_user_name(api, user_uuid):
- try:
- return api.users().get(uuid=user_uuid).execute()['full_name']
- except arvados.errors.ApiError:
- return user_uuid
-
- if user_uuid not in user_names:
- user_names[user_uuid] = _lookup_user_name(api, user_uuid)
- return user_names[user_uuid]
-
-
-job_pipeline_names = {}
-def job_pipeline_name(api, job_uuid):
- def _lookup_pipeline_name(api, job_uuid):
- try:
- pipelines = api.pipeline_instances().list(
- filters='[["components", "like", "%{}%"]]'.format(job_uuid)).execute()
- pi = pipelines['items'][0]
- if pi['name']:
- return pi['name']
- else:
- # Use the pipeline template name
- pt = api.pipeline_templates().get(uuid=pi['pipeline_template_uuid']).execute()
- return pt['name']
- except (TypeError, ValueError, IndexError):
- return ""
-
- if job_uuid not in job_pipeline_names:
- job_pipeline_names[job_uuid] = _lookup_pipeline_name(api, job_uuid)
- return job_pipeline_names[job_uuid]
-
-
-def is_failed_task(logline):
- return FAILED_TASK_REGEX.search(logline) != None
-
-
-def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
- args = parse_arguments(arguments)
-
- api = arvados.api('v1')
-
- now = datetime.datetime.utcnow()
- start_time = args.start or api_timestamp(now - datetime.timedelta(days=1))
- end_time = args.end or api_timestamp(now)
-
- # Find all jobs created within the specified window,
- # and their corresponding job logs.
- jobs_created = jobs_created_between_dates(api, start_time, end_time)
- jobs_by_state = {}
- for job in jobs_created:
- jobs_by_state.setdefault(job['state'], [])
- jobs_by_state[job['state']].append(job)
-
- # Find failed jobs and record the job failure text.
-
- # failure_stats maps failure types (e.g. "sys/docker") to
- # a set of job UUIDs that failed for that reason.
- failure_stats = {}
- for job in jobs_by_state['Failed']:
- job_uuid = job['uuid']
- logs = job_logs(api, job)
- # Find the first permanent task failure, and collect the
- # preceding log lines.
- failure_type = None
- for i, lg in enumerate(logs):
- if is_failed_task(lg):
- # Get preceding log record to provide context.
- log_start = i - LOG_CONTEXT_LINES if i >= LOG_CONTEXT_LINES else 0
- log_end = i + 1
- lastlogs = ''.join(logs[log_start:log_end])
- # try to identify the type of failure.
- for key, rgx in JOB_FAILURE_TYPES.iteritems():
- if re.search(rgx, lastlogs):
- failure_type = key
- break
- if failure_type is not None:
- break
- if failure_type is None:
- failure_type = 'unknown'
- failure_stats.setdefault(failure_type, set())
- failure_stats[failure_type].add(job_uuid)
-
- # Report percentages of successful, failed and unfinished jobs.
- print "Start: {:20s}".format(start_time)
- print "End: {:20s}".format(end_time)
- print ""
-
- print "Overview"
- print ""
-
- job_start_count = len(jobs_created)
- print " {: <25s} {:4d}".format('Started', job_start_count)
- for state in ['Complete', 'Failed', 'Queued', 'Cancelled', 'Running']:
- if state in jobs_by_state:
- job_count = len(jobs_by_state[state])
- job_percentage = job_count / float(job_start_count)
- print " {: <25s} {:4d} ({: >4.0%})".format(state,
- job_count,
- job_percentage)
- print ""
-
- # Report failure types.
- failure_summary = ""
- failure_detail = ""
-
- # Generate a mapping from failed job uuids to job records, to assist
- # in generating detailed statistics for job failures.
- jobs_failed_map = { job['uuid']: job for job in jobs_by_state.get('Failed', []) }
-
- # sort the failure stats in descending order by occurrence.
- sorted_failures = sorted(failure_stats,
- reverse=True,
- key=lambda failure_type: len(failure_stats[failure_type]))
- for failtype in sorted_failures:
- job_uuids = failure_stats[failtype]
- failstat = " {: <25s} {:4d} ({: >4.0%})\n".format(
- failtype,
- len(job_uuids),
- len(job_uuids) / float(len(jobs_by_state['Failed'])))
- failure_summary = failure_summary + failstat
- failure_detail = failure_detail + failstat
- for j in job_uuids:
- job_info = jobs_failed_map[j]
- job_owner = job_user_name(api, job_info['modified_by_user_uuid'])
- job_name = job_pipeline_name(api, job_info['uuid'])
- failure_detail = failure_detail + " {} {: <15.15s} {:29.29s}\n".format(j, job_owner, job_name)
- failure_detail = failure_detail + "\n"
-
- print "Failures by class"
- print ""
- print failure_summary
-
- print "Failures by class (detail)"
- print ""
- print failure_detail
-
- return 0
-
-
-if __name__ == "__main__":
- sys.exit(main())
+++ /dev/null
-#!/usr/bin/env ruby
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-require 'optimist'
-
-opts = Optimist::options do
- banner 'Fail jobs that have state=="Running".'
- banner 'Options:'
- opt(:before,
- 'fail only jobs that started before the given time (or "reboot")',
- type: :string)
-end
-
-ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development"
-require File.dirname(__FILE__) + '/../config/boot'
-require File.dirname(__FILE__) + '/../config/environment'
-require Rails.root.join('lib/crunch_dispatch.rb')
-
-CrunchDispatch.new.fail_jobs before: opts[:before]