From 4d56f9b913fcf41fbf89bf5016463b5353fa3a9f Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 6 Aug 2019 15:20:47 -0400 Subject: [PATCH] 15133: Delete crunch_scripts, start clearing out API server Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- crunch_scripts/GATK2-VariantFiltration | 64 -- crunch_scripts/GATK2-bqsr | 103 -- crunch_scripts/GATK2-merge-call | 242 ----- crunch_scripts/GATK2-realign | 163 --- crunch_scripts/arvados-bcbio-nextgen.py | 145 --- crunch_scripts/arvados_bwa.py | 115 -- crunch_scripts/arvados_gatk2.py | 52 - crunch_scripts/arvados_ipc.py | 51 - crunch_scripts/arvados_picard.py | 42 - crunch_scripts/arvados_samtools.py | 110 -- crunch_scripts/bwa-aln | 127 --- crunch_scripts/bwa-index | 41 - crunch_scripts/collection-merge | 49 - crunch_scripts/crunchrunner | 10 - crunch_scripts/crunchutil/__init__.py | 0 crunch_scripts/crunchutil/robust_put.py | 56 - crunch_scripts/crunchutil/subst.py | 102 -- crunch_scripts/crunchutil/vwd.py | 107 -- crunch_scripts/cwl-runner | 117 --- crunch_scripts/decompress-all.py | 64 -- crunch_scripts/file-select | 18 - crunch_scripts/grep | 24 - crunch_scripts/hash | 37 - crunch_scripts/pgp-survey-import | 119 --- crunch_scripts/pgp-survey-parse | 22 - crunch_scripts/picard-gatk2-prep | 211 ---- crunch_scripts/pyrtg.py | 75 -- crunch_scripts/rtg-fasta2sdf | 27 - crunch_scripts/rtg-fastq2sdf | 45 - crunch_scripts/rtg-map | 41 - crunch_scripts/rtg-snp | 34 - crunch_scripts/run-command | 458 -------- crunch_scripts/split-fastq.py | 70 -- crunch_scripts/test/task_output_dir | 19 - .../arvados/v1/job_tasks_controller.rb | 5 + .../controllers/arvados/v1/jobs_controller.rb | 107 +- .../v1/pipeline_instances_controller.rb | 10 +- .../v1/pipeline_templates_controller.rb | 5 + services/api/lib/crunch_dispatch.rb | 981 ------------------ services/api/script/crunch-dispatch.rb | 16 - services/api/script/crunch_failure_report.py | 222 ---- services/api/script/fail-jobs.rb | 21 - 42 files changed, 27 insertions(+), 4300 deletions(-) delete mode 100755 crunch_scripts/GATK2-VariantFiltration delete mode 100755 crunch_scripts/GATK2-bqsr delete mode 100755 crunch_scripts/GATK2-merge-call delete mode 100755 crunch_scripts/GATK2-realign delete mode 100755 crunch_scripts/arvados-bcbio-nextgen.py delete mode 100644 crunch_scripts/arvados_bwa.py delete mode 100644 crunch_scripts/arvados_gatk2.py delete mode 100644 crunch_scripts/arvados_ipc.py delete mode 100644 crunch_scripts/arvados_picard.py delete mode 100644 crunch_scripts/arvados_samtools.py delete mode 100755 crunch_scripts/bwa-aln delete mode 100755 crunch_scripts/bwa-index delete mode 100755 crunch_scripts/collection-merge delete mode 100755 crunch_scripts/crunchrunner delete mode 100644 crunch_scripts/crunchutil/__init__.py delete mode 100644 crunch_scripts/crunchutil/robust_put.py delete mode 100644 crunch_scripts/crunchutil/subst.py delete mode 100644 crunch_scripts/crunchutil/vwd.py delete mode 100755 crunch_scripts/cwl-runner delete mode 100755 crunch_scripts/decompress-all.py delete mode 100755 crunch_scripts/file-select delete mode 100755 crunch_scripts/grep delete mode 100755 crunch_scripts/hash delete mode 100755 crunch_scripts/pgp-survey-import delete mode 100755 crunch_scripts/pgp-survey-parse delete mode 100755 crunch_scripts/picard-gatk2-prep delete mode 100644 crunch_scripts/pyrtg.py delete mode 100755 crunch_scripts/rtg-fasta2sdf delete mode 100755 crunch_scripts/rtg-fastq2sdf delete mode 100755 crunch_scripts/rtg-map delete mode 100755 crunch_scripts/rtg-snp delete mode 100755 crunch_scripts/run-command delete mode 100755 crunch_scripts/split-fastq.py delete mode 100755 crunch_scripts/test/task_output_dir delete mode 100644 services/api/lib/crunch_dispatch.rb delete mode 100755 services/api/script/crunch-dispatch.rb delete mode 100755 services/api/script/crunch_failure_report.py delete mode 100755 services/api/script/fail-jobs.rb diff --git a/crunch_scripts/GATK2-VariantFiltration b/crunch_scripts/GATK2-VariantFiltration deleted file mode 100755 index 0ef4a74738..0000000000 --- a/crunch_scripts/GATK2-VariantFiltration +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import os -import re - -arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True) - -this_job = arvados.current_job() -this_task = arvados.current_task() -gatk_path = arvados.util.tarball_extract( - tarball = this_job['script_parameters']['gatk_binary_tarball'], - path = 'gatk') -bundle_path = arvados.util.collection_extract( - collection = this_job['script_parameters']['gatk_bundle'], - path = 'gatk-bundle', - files = ['human_g1k_v37.dict', 'human_g1k_v37.fasta', 'human_g1k_v37.fasta.fai']) -this_task_input = this_task['parameters']['input'] - -input_file = list(arvados.CollectionReader(this_task_input).all_files())[0] - -# choose vcf temporary file names -vcf_in = os.path.join(arvados.current_task().tmpdir, - os.path.basename(input_file.name())) -vcf_out = re.sub('(.*)\\.vcf', '\\1-filtered.vcf', vcf_in) - -# fetch the unfiltered data -vcf_in_file = open(vcf_in, 'w') -for buf in input_file.readall(): - vcf_in_file.write(buf) -vcf_in_file.close() - -stdoutdata, stderrdata = arvados.util.run_command( - ['java', '-Xmx1g', - '-jar', os.path.join(gatk_path,'GenomeAnalysisTK.jar'), - '-T', 'VariantFiltration', '--variant', vcf_in, - '--out', vcf_out, - '--filterExpression', 'QD < 2.0', - '--filterName', 'GATK_QD', - '--filterExpression', 'MQ < 40.0', - '--filterName', 'GATK_MQ', - '--filterExpression', 'FS > 60.0', - '--filterName', 'GATK_FS', - '--filterExpression', 'MQRankSum < -12.5', - '--filterName', 'GATK_MQRankSum', - '--filterExpression', 'ReadPosRankSum < -8.0', - '--filterName', 'GATK_ReadPosRankSum', - '-R', os.path.join(bundle_path, 'human_g1k_v37.fasta')], - cwd=arvados.current_task().tmpdir) - -# store the filtered data -with open(vcf_out, 'rb') as f: - out = arvados.CollectionWriter() - while True: - buf = f.read() - if len(buf) == 0: - break - out.write(buf) -out.set_current_file_name(os.path.basename(vcf_out)) - -this_task.set_output(out.finish()) diff --git a/crunch_scripts/GATK2-bqsr b/crunch_scripts/GATK2-bqsr deleted file mode 100755 index ab78226025..0000000000 --- a/crunch_scripts/GATK2-bqsr +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import os -import re -import arvados -import arvados_gatk2 -import arvados_samtools -from arvados_ipc import * - -class InvalidArgumentError(Exception): - pass - -arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True) - -this_job = arvados.current_job() -this_task = arvados.current_task() -tmpdir = arvados.current_task().tmpdir -arvados.util.clear_tmpdir() - -known_sites_files = arvados.getjobparam( - 'known_sites', - ['dbsnp_137.b37.vcf', - 'Mills_and_1000G_gold_standard.indels.b37.vcf', - ]) -bundle_dir = arvados.util.collection_extract( - collection = this_job['script_parameters']['gatk_bundle'], - files = [ - 'human_g1k_v37.dict', - 'human_g1k_v37.fasta', - 'human_g1k_v37.fasta.fai' - ] + known_sites_files + [v + '.idx' for v in known_sites_files], - path = 'gatk_bundle') -ref_fasta_files = [os.path.join(bundle_dir, f) - for f in os.listdir(bundle_dir) - if re.search(r'\.fasta(\.gz)?$', f)] - -input_collection = this_task['parameters']['input'] -input_dir = arvados.util.collection_extract( - collection = input_collection, - path = os.path.join(this_task.tmpdir, 'input')) -input_bam_files = [] -for f in arvados.util.listdir_recursive(input_dir): - if re.search(r'\.bam$', f): - input_stream_name, input_file_name = os.path.split(f) - input_bam_files += [os.path.join(input_dir, f)] -if len(input_bam_files) != 1: - raise InvalidArgumentError("Expected exactly one bam file per task.") - -known_sites_args = [] -for f in known_sites_files: - known_sites_args += ['-knownSites', os.path.join(bundle_dir, f)] - -recal_file = os.path.join(tmpdir, 'recal.csv') - -children = {} -pipes = {} - -arvados_gatk2.run( - args=[ - '-nct', arvados_gatk2.cpus_on_this_node(), - '-T', 'BaseRecalibrator', - '-R', ref_fasta_files[0], - '-I', input_bam_files[0], - '-o', recal_file, - ] + known_sites_args) - -pipe_setup(pipes, 'BQSR') -if 0 == named_fork(children, 'BQSR'): - pipe_closeallbut(pipes, ('BQSR', 'w')) - arvados_gatk2.run( - args=[ - '-T', 'PrintReads', - '-R', ref_fasta_files[0], - '-I', input_bam_files[0], - '-o', '/dev/fd/' + str(pipes['BQSR','w']), - '-BQSR', recal_file, - '--disable_bam_indexing', - ], - close_fds=False) - os._exit(0) -os.close(pipes.pop(('BQSR','w'), None)) - -out = arvados.CollectionWriter() -out.start_new_stream(input_stream_name) - -out.start_new_file(input_file_name + '.recal.csv') -out.write(open(recal_file, 'rb')) - -out.start_new_file(input_file_name) -while True: - buf = os.read(pipes['BQSR','r'], 2**20) - if len(buf) == 0: - break - out.write(buf) -pipe_closeallbut(pipes) - -if waitpid_and_check_children(children): - this_task.set_output(out.finish()) -else: - sys.exit(1) diff --git a/crunch_scripts/GATK2-merge-call b/crunch_scripts/GATK2-merge-call deleted file mode 100755 index 6d175172e4..0000000000 --- a/crunch_scripts/GATK2-merge-call +++ /dev/null @@ -1,242 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import os -import re -import string -import threading -import arvados -import arvados_gatk2 -import arvados_picard -from arvados_ipc import * - -class InvalidArgumentError(Exception): - pass - -this_job = arvados.current_job() -this_task = arvados.current_task() -tmpdir = arvados.current_task().tmpdir -arvados.util.clear_tmpdir() - -bundle_dir = arvados.util.collection_extract( - collection = this_job['script_parameters']['gatk_bundle'], - files = [ - 'human_g1k_v37.dict', - 'human_g1k_v37.fasta', - 'human_g1k_v37.fasta.fai', - 'dbsnp_137.b37.vcf', - 'dbsnp_137.b37.vcf.idx', - ], - path = 'gatk_bundle') -ref_fasta_files = [os.path.join(bundle_dir, f) - for f in os.listdir(bundle_dir) - if re.search(r'\.fasta(\.gz)?$', f)] -regions_args = [] -if 'regions' in this_job['script_parameters']: - regions_dir = arvados.util.collection_extract( - collection = this_job['script_parameters']['regions'], - path = 'regions') - region_padding = int(this_job['script_parameters']['region_padding']) - for f in os.listdir(regions_dir): - if re.search(r'\.bed$', f): - regions_args += [ - '--intervals', os.path.join(regions_dir, f), - '--interval_padding', str(region_padding) - ] - - -# Start a child process for each input file, feeding data to picard. - -input_child_names = [] -children = {} -pipes = {} - -input_collection = this_job['script_parameters']['input'] -input_index = 0 -for s in arvados.CollectionReader(input_collection).all_streams(): - for f in s.all_files(): - if not re.search(r'\.bam$', f.name()): - continue - input_index += 1 - childname = 'input-' + str(input_index) - input_child_names += [childname] - pipe_setup(pipes, childname) - childpid = named_fork(children, childname) - if childpid == 0: - pipe_closeallbut(pipes, (childname, 'w')) - for s in f.readall(): - os.write(pipes[childname, 'w'], s) - os.close(pipes[childname, 'w']) - os._exit(0) - sys.stderr.write("pid %d writing %s to fd %d->%d\n" % - (childpid, - s.name()+'/'+f.name(), - pipes[childname, 'w'], - pipes[childname, 'r'])) - pipe_closeallbut(pipes, *[(childname, 'r') - for childname in input_child_names]) - - -# Merge-sort the input files to merge.bam - -arvados_picard.run( - 'MergeSamFiles', - args=[ - 'I=/dev/fd/' + str(pipes[childname, 'r']) - for childname in input_child_names - ], - params={ - 'o': 'merge.bam', - 'quiet': 'true', - 'so': 'coordinate', - 'use_threading': 'true', - 'create_index': 'true', - 'validation_stringency': 'LENIENT', - }, - close_fds=False, - ) -pipe_closeallbut(pipes) - - -# Run CoverageBySample on merge.bam - -pipe_setup(pipes, 'stats_log') -pipe_setup(pipes, 'stats_out') -if 0 == named_fork(children, 'GATK'): - pipe_closeallbut(pipes, - ('stats_log', 'w'), - ('stats_out', 'w')) - arvados_gatk2.run( - args=[ - '-T', 'CoverageBySample', - '-R', ref_fasta_files[0], - '-I', 'merge.bam', - '-o', '/dev/fd/' + str(pipes['stats_out', 'w']), - '--log_to_file', '/dev/fd/' + str(pipes['stats_log', 'w']), - ] - + regions_args, - close_fds=False) - pipe_closeallbut(pipes) - os._exit(0) -pipe_closeallbut(pipes, ('stats_log', 'r'), ('stats_out', 'r')) - - -# Start two threads to read from CoverageBySample pipes - -class ExceptionPropagatingThread(threading.Thread): - """ - If a subclassed thread calls _raise(e) in run(), running join() on - the thread will raise e in the thread that calls join(). - """ - def __init__(self, *args, **kwargs): - super(ExceptionPropagatingThread, self).__init__(*args, **kwargs) - self.__exception = None - def join(self, *args, **kwargs): - ret = super(ExceptionPropagatingThread, self).join(*args, **kwargs) - if self.__exception: - raise self.__exception - return ret - def _raise(self, exception): - self.__exception = exception - -class StatsLogReader(ExceptionPropagatingThread): - def __init__(self, **kwargs): - super(StatsLogReader, self).__init__() - self.args = kwargs - def run(self): - try: - for logline in self.args['infile']: - x = re.search('Processing (\d+) bp from intervals', logline) - if x: - self._total_bp = int(x.group(1)) - except Exception as e: - self._raise(e) - def total_bp(self): - self.join() - return self._total_bp -stats_log_thr = StatsLogReader(infile=os.fdopen(pipes.pop(('stats_log', 'r')))) -stats_log_thr.start() - -class StatsOutReader(ExceptionPropagatingThread): - """ - Read output of CoverageBySample and collect a histogram of - coverage (last column) -> number of loci (number of rows). - """ - def __init__(self, **kwargs): - super(StatsOutReader, self).__init__() - self.args = kwargs - def run(self): - try: - hist = [0] - histtot = 0 - for line in self.args['infile']: - try: - i = int(string.split(line)[-1]) - except ValueError: - continue - if i >= 1: - if len(hist) <= i: - hist.extend([0 for x in range(1+i-len(hist))]) - hist[i] += 1 - histtot += 1 - hist[0] = stats_log_thr.total_bp() - histtot - self._histogram = hist - except Exception as e: - self._raise(e) - def histogram(self): - self.join() - return self._histogram -stats_out_thr = StatsOutReader(infile=os.fdopen(pipes.pop(('stats_out', 'r')))) -stats_out_thr.start() - - -# Run UnifiedGenotyper on merge.bam - -arvados_gatk2.run( - args=[ - '-nt', arvados_gatk2.cpus_on_this_node(), - '-T', 'UnifiedGenotyper', - '-R', ref_fasta_files[0], - '-I', 'merge.bam', - '-o', os.path.join(tmpdir, 'out.vcf'), - '--dbsnp', os.path.join(bundle_dir, 'dbsnp_137.b37.vcf'), - '-metrics', 'UniGenMetrics', - '-A', 'DepthOfCoverage', - '-A', 'AlleleBalance', - '-A', 'QualByDepth', - '-A', 'HaplotypeScore', - '-A', 'MappingQualityRankSumTest', - '-A', 'ReadPosRankSumTest', - '-A', 'FisherStrand', - '-glm', 'both', - ] - + regions_args - + arvados.getjobparam('GATK2_UnifiedGenotyper_args',[])) - -# Copy the output VCF file to Keep - -out = arvados.CollectionWriter() -out.start_new_stream() -out.start_new_file('out.vcf') -out.write(open(os.path.join(tmpdir, 'out.vcf'), 'rb')) - - -# Write statistics to Keep - -out.start_new_file('mincoverage_nlocus.csv') -sofar = 0 -hist = stats_out_thr.histogram() -total_bp = stats_log_thr.total_bp() -for i in range(len(hist)): - out.write("%d,%d,%f\n" % - (i, - total_bp - sofar, - 100.0 * (total_bp - sofar) / total_bp)) - sofar += hist[i] - -if waitpid_and_check_children(children): - this_task.set_output(out.finish()) -else: - sys.exit(1) diff --git a/crunch_scripts/GATK2-realign b/crunch_scripts/GATK2-realign deleted file mode 100755 index 2787dffd5b..0000000000 --- a/crunch_scripts/GATK2-realign +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import os -import re -import arvados -import arvados_gatk2 -import arvados_picard -import arvados_samtools -from arvados_ipc import * - -class InvalidArgumentError(Exception): - pass - -arvados_samtools.one_task_per_bam_file(if_sequence=0, and_end_task=True) - -this_job = arvados.current_job() -this_task = arvados.current_task() -tmpdir = arvados.current_task().tmpdir -arvados.util.clear_tmpdir() - -known_sites_files = arvados.getjobparam( - 'known_sites', - ['dbsnp_137.b37.vcf', - 'Mills_and_1000G_gold_standard.indels.b37.vcf', - ]) -bundle_dir = arvados.util.collection_extract( - collection = this_job['script_parameters']['gatk_bundle'], - files = [ - 'human_g1k_v37.dict', - 'human_g1k_v37.fasta', - 'human_g1k_v37.fasta.fai' - ] + known_sites_files + [v + '.idx' for v in known_sites_files], - path = 'gatk_bundle') -ref_fasta_files = [os.path.join(bundle_dir, f) - for f in os.listdir(bundle_dir) - if re.search(r'\.fasta(\.gz)?$', f)] -regions_args = [] -if 'regions' in this_job['script_parameters']: - regions_dir = arvados.util.collection_extract( - collection = this_job['script_parameters']['regions'], - path = 'regions') - region_padding = int(this_job['script_parameters']['region_padding']) - for f in os.listdir(regions_dir): - if re.search(r'\.bed$', f): - regions_args += [ - '--intervals', os.path.join(regions_dir, f), - '--interval_padding', str(region_padding) - ] - -input_collection = this_task['parameters']['input'] -input_dir = arvados.util.collection_extract( - collection = input_collection, - path = os.path.join(this_task.tmpdir, 'input')) -input_bam_files = [] -for f in arvados.util.listdir_recursive(input_dir): - if re.search(r'\.bam$', f): - input_stream_name, input_file_name = os.path.split(f) - input_bam_files += [os.path.join(input_dir, f)] -if len(input_bam_files) != 1: - raise InvalidArgumentError("Expected exactly one bam file per task.") - -known_sites_args = [] -for f in known_sites_files: - known_sites_args += ['-known', os.path.join(bundle_dir, f)] - -children = {} -pipes = {} - -arvados_gatk2.run( - args=[ - '-nt', arvados_gatk2.cpus_per_task(), - '-T', 'RealignerTargetCreator', - '-R', ref_fasta_files[0], - '-I', input_bam_files[0], - '-o', os.path.join(tmpdir, 'intervals.list') - ] + known_sites_args + regions_args) - -pipe_setup(pipes, 'IndelRealigner') -if 0 == named_fork(children, 'IndelRealigner'): - pipe_closeallbut(pipes, ('IndelRealigner', 'w')) - arvados_gatk2.run( - args=[ - '-T', 'IndelRealigner', - '-R', ref_fasta_files[0], - '-targetIntervals', os.path.join(tmpdir, 'intervals.list'), - '-I', input_bam_files[0], - '-o', '/dev/fd/' + str(pipes['IndelRealigner','w']), - '--disable_bam_indexing', - ] + known_sites_args + regions_args, - close_fds=False) - os._exit(0) -os.close(pipes.pop(('IndelRealigner','w'), None)) - -pipe_setup(pipes, 'bammanifest') -pipe_setup(pipes, 'bam') -if 0==named_fork(children, 'bammanifest'): - pipe_closeallbut(pipes, - ('IndelRealigner', 'r'), - ('bammanifest', 'w'), - ('bam', 'w')) - out = arvados.CollectionWriter() - out.start_new_stream(input_stream_name) - out.start_new_file(input_file_name) - while True: - buf = os.read(pipes['IndelRealigner','r'], 2**20) - if len(buf) == 0: - break - os.write(pipes['bam','w'], buf) - out.write(buf) - os.write(pipes['bammanifest','w'], out.manifest_text()) - os.close(pipes['bammanifest','w']) - os._exit(0) - -pipe_setup(pipes, 'index') -if 0==named_fork(children, 'index'): - pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w')) - arvados_picard.run( - 'BuildBamIndex', - params={ - 'i': '/dev/fd/' + str(pipes['bam','r']), - 'o': '/dev/fd/' + str(pipes['index','w']), - 'quiet': 'true', - 'validation_stringency': 'LENIENT' - }, - close_fds=False) - os._exit(0) - -pipe_setup(pipes, 'indexmanifest') -if 0==named_fork(children, 'indexmanifest'): - pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w')) - out = arvados.CollectionWriter() - out.start_new_stream(input_stream_name) - out.start_new_file(re.sub('\.bam$', '.bai', input_file_name)) - while True: - buf = os.read(pipes['index','r'], 2**20) - if len(buf) == 0: - break - out.write(buf) - os.write(pipes['indexmanifest','w'], out.manifest_text()) - os.close(pipes['indexmanifest','w']) - os._exit(0) - -pipe_closeallbut(pipes, ('bammanifest', 'r'), ('indexmanifest', 'r')) -outmanifest = '' -for which in ['bammanifest', 'indexmanifest']: - with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f: - while True: - buf = f.read() - if buf == '': - break - outmanifest += buf - -all_ok = True -for (childname, pid) in children.items(): - all_ok = all_ok and waitpid_and_check_exit(pid, childname) - -if all_ok: - this_task.set_output(outmanifest) -else: - sys.exit(1) diff --git a/crunch_scripts/arvados-bcbio-nextgen.py b/crunch_scripts/arvados-bcbio-nextgen.py deleted file mode 100755 index b7e19ecddb..0000000000 --- a/crunch_scripts/arvados-bcbio-nextgen.py +++ /dev/null @@ -1,145 +0,0 @@ -#!/usr/bin/python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import subprocess -import crunchutil.subst as subst -import shutil -import os -import sys -import time - -if len(arvados.current_task()['parameters']) > 0: - p = arvados.current_task()['parameters'] -else: - p = arvados.current_job()['script_parameters'] - -t = arvados.current_task().tmpdir - -os.unlink("/usr/local/share/bcbio-nextgen/galaxy") -os.mkdir("/usr/local/share/bcbio-nextgen/galaxy") -shutil.copy("/usr/local/share/bcbio-nextgen/config/bcbio_system.yaml", "/usr/local/share/bcbio-nextgen/galaxy") - -with open("/usr/local/share/bcbio-nextgen/galaxy/tool_data_table_conf.xml", "w") as f: - f.write(''' - - - value, dbkey, name, path - -
- - - value, dbkey, name, path - -
- - - value, dbkey, name, path - -
- - - index, value, path - -
- - - value, dbkey, name, path - -
- - - value, dbkey, name, path - -
-
-''') - -os.mkdir("/usr/local/share/bcbio-nextgen/galaxy/tool-data") - -with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/bowtie2_indices.loc", "w") as f: - f.write(subst.do_substitution(p, "GRCh37\tGRCh37\tHuman (GRCh37)\t$(dir $(bowtie2_indices))\n")) - -with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/bwa_index.loc", "w") as f: - f.write(subst.do_substitution(p, "GRCh37\tGRCh37\tHuman (GRCh37)\t$(file $(bwa_index))\n")) - -with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/gatk_sorted_picard_index.loc", "w") as f: - f.write(subst.do_substitution(p, "GRCh37\tGRCh37\tHuman (GRCh37)\t$(file $(gatk_sorted_picard_index))\n")) - -with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/picard_index.loc", "w") as f: - f.write(subst.do_substitution(p, "GRCh37\tGRCh37\tHuman (GRCh37)\t$(file $(picard_index))\n")) - -with open("/usr/local/share/bcbio-nextgen/galaxy/tool-data/sam_fa_indices.loc", "w") as f: - f.write(subst.do_substitution(p, "index\tGRCh37\t$(file $(sam_fa_indices))\n")) - -with open("/tmp/crunch-job/freebayes-variant.yaml", "w") as f: - f.write(''' -# Template for whole genome Illumina variant calling with FreeBayes -# This is a GATK-free pipeline without post-alignment BAM pre-processing -# (recalibration and realignment) ---- -details: - - analysis: variant2 - genome_build: GRCh37 - # to do multi-sample variant calling, assign samples the same metadata / batch - # metadata: - # batch: your-arbitrary-batch-name - algorithm: - aligner: bwa - mark_duplicates: true - recalibrate: false - realign: false - variantcaller: freebayes - platform: illumina - quality_format: Standard - # for targetted projects, set the region - # variant_regions: /path/to/your.bed -''') - -os.unlink("/usr/local/share/bcbio-nextgen/gemini_data") -os.symlink(arvados.get_job_param_mount("gemini_data"), "/usr/local/share/bcbio-nextgen/gemini_data") - -os.chdir(arvados.current_task().tmpdir) - -rcode = subprocess.call(["bcbio_nextgen.py", "--workflow", "template", "/tmp/crunch-job/freebayes-variant.yaml", "project1", - subst.do_substitution(p, "$(file $(R1))"), - subst.do_substitution(p, "$(file $(R2))")]) - -os.chdir("project1/work") - -os.symlink("/usr/local/share/bcbio-nextgen/galaxy/tool-data", "tool-data") - -rcode = subprocess.call(["bcbio_nextgen.py", "../config/project1.yaml", "-n", os.environ['CRUNCH_NODE_SLOTS']]) - -print("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed")) - -if rcode == 0: - os.chdir("../final") - - print("arvados-bcbio-nextgen: the follow output files will be saved to keep:") - - subprocess.call(["find", ".", "-type", "f", "-printf", "arvados-bcbio-nextgen: %12.12s %h/%f\\n"]) - - print("arvados-bcbio-nextgen: start writing output to keep") - - done = False - api = arvados.api('v1') - while not done: - try: - out = arvados.CollectionWriter() - out.write_directory_tree(".", max_manifest_depth=0) - outuuid = out.finish() - api.job_tasks().update(uuid=arvados.current_task()['uuid'], - body={ - 'output':outuuid, - 'success': (rcode == 0), - 'progress':1.0 - }).execute() - done = True - except Exception as e: - print("arvados-bcbio-nextgen: caught exception: {}".format(e)) - time.sleep(5) - -sys.exit(rcode) diff --git a/crunch_scripts/arvados_bwa.py b/crunch_scripts/arvados_bwa.py deleted file mode 100644 index aefc1f064b..0000000000 --- a/crunch_scripts/arvados_bwa.py +++ /dev/null @@ -1,115 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import re -import os -import sys -import fcntl -import subprocess - -bwa_install_path = None - -def install_path(): - """ - Extract the bwa source tree, build the bwa binary, and return the - path to the source tree. - """ - global bwa_install_path - if bwa_install_path: - return bwa_install_path - - bwa_install_path = arvados.util.tarball_extract( - tarball = arvados.current_job()['script_parameters']['bwa_tbz'], - path = 'bwa') - - # build "bwa" binary - lockfile = open(os.path.split(bwa_install_path)[0] + '.bwa-make.lock', - 'w') - fcntl.flock(lockfile, fcntl.LOCK_EX) - arvados.util.run_command(['make', '-j16'], cwd=bwa_install_path) - lockfile.close() - - return bwa_install_path - -def bwa_binary(): - """ - Return the path to the bwa executable. - """ - return os.path.join(install_path(), 'bwa') - -def run(command, command_args, **kwargs): - """ - Build and run the bwa binary. - - command is the bwa module, e.g., "index" or "aln". - - command_args is a list of additional command line arguments, e.g., - ['-a', 'bwtsw', 'ref.fasta'] - - It is assumed that we are running in a Crunch job environment, and - the job's "bwa_tbz" parameter is a collection containing the bwa - source tree in a .tbz file. - """ - execargs = [bwa_binary(), - command] - execargs += command_args - sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs))) - arvados.util.run_command( - execargs, - cwd=arvados.current_task().tmpdir, - stderr=sys.stderr, - stdin=kwargs.get('stdin', subprocess.PIPE), - stdout=kwargs.get('stdout', sys.stderr)) - -def one_task_per_pair_input_file(if_sequence=0, and_end_task=True): - """ - Queue one task for each pair of fastq files in this job's input - collection. - - Each new task will have two parameters, named "input_1" and - "input_2", each being a manifest containing a single fastq file. - - A matching pair of files in the input collection is assumed to - have names "x_1.y" and "x_2.y". - - Files in the input collection that are not part of a matched pair - are silently ignored. - - if_sequence and and_end_task arguments have the same significance - as in arvados.job_setup.one_task_per_input_file(). - """ - if if_sequence != arvados.current_task()['sequence']: - return - job_input = arvados.current_job()['script_parameters']['input'] - cr = arvados.CollectionReader(job_input) - all_files = [] - for s in cr.all_streams(): - all_files += list(s.all_files()) - for s in cr.all_streams(): - for left_file in s.all_files(): - left_name = left_file.name() - right_file = None - right_name = re.sub(r'(.*_)1\.', '\g<1>2.', left_name) - if right_name == left_name: - continue - for f2 in s.all_files(): - if right_name == f2.name(): - right_file = f2 - if right_file != None: - new_task_attrs = { - 'job_uuid': arvados.current_job()['uuid'], - 'created_by_job_task_uuid': arvados.current_task()['uuid'], - 'sequence': if_sequence + 1, - 'parameters': { - 'input_1':left_file.as_manifest(), - 'input_2':right_file.as_manifest() - } - } - arvados.api().job_tasks().create(body=new_task_attrs).execute() - if and_end_task: - arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'], - body={'success':True} - ).execute() - exit(0) diff --git a/crunch_scripts/arvados_gatk2.py b/crunch_scripts/arvados_gatk2.py deleted file mode 100644 index fa00b44d84..0000000000 --- a/crunch_scripts/arvados_gatk2.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import re -import os -import sys -import fcntl -import subprocess - -gatk2_install_path = None - -def install_path(): - global gatk2_install_path - if gatk2_install_path: - return gatk2_install_path - gatk2_install_path = arvados.util.tarball_extract( - tarball = arvados.current_job()['script_parameters']['gatk_tbz'], - path = 'gatk2') - return gatk2_install_path - -def memory_limit(): - taskspernode = int(os.environ.get('CRUNCH_NODE_SLOTS', '1')) - with open('/proc/meminfo', 'r') as f: - ram = int(re.search(r'MemTotal:\s*(\d+)', f.read()).group(1)) / 1024 - if taskspernode > 1: - ram = ram / taskspernode - return max(ram-700, 500) - -def cpus_on_this_node(): - with open('/proc/cpuinfo', 'r') as cpuinfo: - return max(int(os.environ.get('SLURM_CPUS_ON_NODE', 1)), - len(re.findall(r'^processor\s*:\s*\d', - cpuinfo.read(), - re.MULTILINE))) - -def cpus_per_task(): - return max(1, (cpus_on_this_node() - / int(os.environ.get('CRUNCH_NODE_SLOTS', 1)))) - -def run(**kwargs): - kwargs.setdefault('cwd', arvados.current_task().tmpdir) - kwargs.setdefault('stdout', sys.stderr) - execargs = ['java', - '-Xmx%dm' % memory_limit(), - '-Djava.io.tmpdir=' + arvados.current_task().tmpdir, - '-jar', os.path.join(install_path(), 'GenomeAnalysisTK.jar')] - execargs += [str(arg) for arg in kwargs.pop('args', [])] - sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs))) - return arvados.util.run_command(execargs, **kwargs) - diff --git a/crunch_scripts/arvados_ipc.py b/crunch_scripts/arvados_ipc.py deleted file mode 100644 index 97871627bf..0000000000 --- a/crunch_scripts/arvados_ipc.py +++ /dev/null @@ -1,51 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import os -import re -import sys -import subprocess - -def pipe_setup(pipes, name): - pipes[name,'r'], pipes[name,'w'] = os.pipe() - -def pipe_closeallbut(pipes, *keepus): - for n,m in pipes.keys(): - if (n,m) not in keepus: - os.close(pipes.pop((n,m), None)) - -def named_fork(children, name): - children[name] = os.fork() - return children[name] - -def waitpid_and_check_children(children): - """ - Given a dict of childname->pid, wait for each child process to - finish, and report non-zero exit status on stderr. Return True if - all children exited 0. - """ - all_ok = True - for (childname, pid) in children.items(): - # all_ok must be on RHS here -- we need to call waitpid() on - # every child, even if all_ok is already False. - all_ok = waitpid_and_check_exit(pid, childname) and all_ok - return all_ok - -def waitpid_and_check_exit(pid, childname=''): - """ - Wait for a child process to finish. If it exits non-zero, report - exit status on stderr (mentioning the given childname) and return - False. If it exits zero, return True. - """ - _, childstatus = os.waitpid(pid, 0) - exitvalue = childstatus >> 8 - signal = childstatus & 127 - dumpedcore = childstatus & 128 - if childstatus != 0: - sys.stderr.write("%s child %d failed: exit %d signal %d core %s\n" - % (childname, pid, exitvalue, signal, - ('y' if dumpedcore else 'n'))) - return False - return True - diff --git a/crunch_scripts/arvados_picard.py b/crunch_scripts/arvados_picard.py deleted file mode 100644 index 3d830dbcac..0000000000 --- a/crunch_scripts/arvados_picard.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import re -import os -import sys -import fcntl -import subprocess - -picard_install_path = None - -def install_path(): - global picard_install_path - if picard_install_path: - return picard_install_path - zipball = arvados.current_job()['script_parameters']['picard_zip'] - extracted = arvados.util.zipball_extract( - zipball = zipball, - path = 'picard') - for f in os.listdir(extracted): - if (re.search(r'^picard-tools-[\d\.]+$', f) and - os.path.exists(os.path.join(extracted, f, '.'))): - picard_install_path = os.path.join(extracted, f) - break - if not picard_install_path: - raise Exception("picard-tools-{version} directory not found in %s" % - zipball) - return picard_install_path - -def run(module, **kwargs): - kwargs.setdefault('cwd', arvados.current_task().tmpdir) - execargs = ['java', - '-Xmx1500m', - '-Djava.io.tmpdir=' + arvados.current_task().tmpdir, - '-jar', os.path.join(install_path(), module + '.jar')] - execargs += [str(arg) for arg in kwargs.pop('args', [])] - for key, value in kwargs.pop('params', {}).items(): - execargs += [key.upper() + '=' + str(value)] - sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs))) - return arvados.util.run_command(execargs, **kwargs) diff --git a/crunch_scripts/arvados_samtools.py b/crunch_scripts/arvados_samtools.py deleted file mode 100644 index 09992f6f21..0000000000 --- a/crunch_scripts/arvados_samtools.py +++ /dev/null @@ -1,110 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import re -import os -import sys -import fcntl -import subprocess - -samtools_path = None - -def samtools_install_path(): - """ - Extract the samtools source tree, build the samtools binary, and - return the path to the source tree. - """ - global samtools_path - if samtools_path: - return samtools_path - samtools_path = arvados.util.tarball_extract( - tarball = arvados.current_job()['script_parameters']['samtools_tgz'], - path = 'samtools') - - # build "samtools" binary - lockfile = open(os.path.split(samtools_path)[0] + '.samtools-make.lock', - 'w') - fcntl.flock(lockfile, fcntl.LOCK_EX) - arvados.util.run_command(['make', '-j16'], cwd=samtools_path) - lockfile.close() - - return samtools_path - -def samtools_binary(): - """ - Return the path to the samtools executable. - """ - return os.path.join(samtools_install_path(), 'samtools') - -def run(command, command_args, **kwargs): - """ - Build and run the samtools binary. - - command is the samtools subcommand, e.g., "view" or "sort". - - command_args is a list of additional command line arguments, e.g., - ['-bt', 'ref_list.txt', '-o', 'aln.bam', 'aln.sam.gz'] - - It is assumed that we are running in a Crunch job environment, and - the job's "samtools_tgz" parameter is a collection containing the - samtools source tree in a .tgz file. - """ - execargs = [samtools_binary(), - command] - execargs += command_args - sys.stderr.write("%s.run: exec %s\n" % (__name__, str(execargs))) - arvados.util.run_command( - execargs, - cwd=arvados.current_task().tmpdir, - stdin=kwargs.get('stdin', subprocess.PIPE), - stderr=kwargs.get('stderr', sys.stderr), - stdout=kwargs.get('stdout', sys.stderr)) - -def one_task_per_bam_file(if_sequence=0, and_end_task=True): - """ - Queue one task for each bam file in this job's input collection. - - Each new task will have an "input" parameter: a manifest - containing one .bam file and (if available) the corresponding .bai - index file. - - Files in the input collection that are not named *.bam or *.bai - (as well as *.bai files that do not match any .bam file present) - are silently ignored. - - if_sequence and and_end_task arguments have the same significance - as in arvados.job_setup.one_task_per_input_file(). - """ - if if_sequence != arvados.current_task()['sequence']: - return - job_input = arvados.current_job()['script_parameters']['input'] - cr = arvados.CollectionReader(job_input) - bam = {} - bai = {} - for s in cr.all_streams(): - for f in s.all_files(): - if re.search(r'\.bam$', f.name()): - bam[s.name(), f.name()] = f - elif re.search(r'\.bai$', f.name()): - bai[s.name(), f.name()] = f - for ((s_name, f_name), bam_f) in bam.items(): - bai_f = bai.get((s_name, re.sub(r'bam$', 'bai', f_name)), None) - task_input = bam_f.as_manifest() - if bai_f: - task_input += bai_f.as_manifest() - new_task_attrs = { - 'job_uuid': arvados.current_job()['uuid'], - 'created_by_job_task_uuid': arvados.current_task()['uuid'], - 'sequence': if_sequence + 1, - 'parameters': { - 'input': task_input - } - } - arvados.api().job_tasks().create(body=new_task_attrs).execute() - if and_end_task: - arvados.api().job_tasks().update(uuid=arvados.current_task()['uuid'], - body={'success':True} - ).execute() - exit(0) diff --git a/crunch_scripts/bwa-aln b/crunch_scripts/bwa-aln deleted file mode 100755 index e3d85a7c3d..0000000000 --- a/crunch_scripts/bwa-aln +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import arvados_bwa -import arvados_samtools -import os -import re -import sys -import subprocess - -arvados_bwa.one_task_per_pair_input_file(if_sequence=0, and_end_task=True) - -this_job = arvados.current_job() -this_task = arvados.current_task() -ref_dir = arvados.util.collection_extract( - collection = this_job['script_parameters']['reference_index'], - path = 'reference', - decompress = False) - -ref_basename = None -for f in os.listdir(ref_dir): - basename = re.sub(r'\.bwt$', '', f) - if basename != f: - ref_basename = os.path.join(ref_dir, basename) -if ref_basename == None: - raise Exception("Could not find *.bwt in reference collection.") - -tmp_dir = arvados.current_task().tmpdir - -class Aligner: - def input_filename(self): - for s in arvados.CollectionReader(self.collection).all_streams(): - for f in s.all_files(): - return f.decompressed_name() - def generate_input(self): - for s in arvados.CollectionReader(self.collection).all_streams(): - for f in s.all_files(): - for s in f.readall_decompressed(): - yield s - def aln(self, input_param): - self.collection = this_task['parameters'][input_param] - reads_filename = os.path.join(tmp_dir, self.input_filename()) - aln_filename = os.path.join(tmp_dir, self.input_filename() + '.sai') - reads_pipe_r, reads_pipe_w = os.pipe() - if os.fork() == 0: - os.close(reads_pipe_r) - reads_file = open(reads_filename, 'wb') - for s in self.generate_input(): - if len(s) != os.write(reads_pipe_w, s): - raise Exception("short write") - reads_file.write(s) - reads_file.close() - os.close(reads_pipe_w) - sys.exit(0) - os.close(reads_pipe_w) - - aln_file = open(aln_filename, 'wb') - bwa_proc = subprocess.Popen( - [arvados_bwa.bwa_binary(), - 'aln', '-t', '16', - ref_basename, - '-'], - stdin=os.fdopen(reads_pipe_r, 'rb', 2**20), - stdout=aln_file) - aln_file.close() - return reads_filename, aln_filename - -reads_1, alignments_1 = Aligner().aln('input_1') -reads_2, alignments_2 = Aligner().aln('input_2') -pid1, exit1 = os.wait() -pid2, exit2 = os.wait() -if exit1 != 0 or exit2 != 0: - raise Exception("bwa aln exited non-zero (0x%x, 0x%x)" % (exit1, exit2)) - -# output alignments in sam format to pipe -sam_pipe_r, sam_pipe_w = os.pipe() -sam_pid = os.fork() -if sam_pid != 0: - # parent - os.close(sam_pipe_w) -else: - # child - os.close(sam_pipe_r) - arvados_bwa.run('sampe', - [ref_basename, - alignments_1, alignments_2, - reads_1, reads_2], - stdout=os.fdopen(sam_pipe_w, 'wb', 2**20)) - sys.exit(0) - -# convert sam (sam_pipe_r) to bam (bam_pipe_w) -bam_pipe_r, bam_pipe_w = os.pipe() -bam_pid = os.fork() -if bam_pid != 0: - # parent - os.close(bam_pipe_w) - os.close(sam_pipe_r) -else: - # child - os.close(bam_pipe_r) - arvados_samtools.run('view', - ['-S', '-b', - '-'], - stdin=os.fdopen(sam_pipe_r, 'rb', 2**20), - stdout=os.fdopen(bam_pipe_w, 'wb', 2**20)) - sys.exit(0) - -# copy bam (bam_pipe_r) to Keep -out_bam_filename = os.path.split(reads_1)[-1] + '.bam' -out = arvados.CollectionWriter() -out.start_new_stream() -out.start_new_file(out_bam_filename) -out.write(os.fdopen(bam_pipe_r, 'rb', 2**20)) - -# make sure everyone exited nicely -pid3, exit3 = os.waitpid(sam_pid, 0) -if exit3 != 0: - raise Exception("bwa sampe exited non-zero (0x%x)" % exit3) -pid4, exit4 = os.waitpid(bam_pid, 0) -if exit4 != 0: - raise Exception("samtools view exited non-zero (0x%x)" % exit4) - -# proclaim success -this_task.set_output(out.finish()) diff --git a/crunch_scripts/bwa-index b/crunch_scripts/bwa-index deleted file mode 100755 index f5b7030c0a..0000000000 --- a/crunch_scripts/bwa-index +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import arvados_bwa -import os -import re -import sys - -this_job = arvados.current_job() -this_task = arvados.current_task() -ref_dir = arvados.util.collection_extract( - collection = this_job['script_parameters']['input'], - path = 'reference', - decompress = False) - -ref_fasta_files = (os.path.join(ref_dir, f) - for f in os.listdir(ref_dir) - if re.search(r'\.fasta(\.gz)?$', f)) - -# build reference index -arvados_bwa.run('index', - ['-a', 'bwtsw'] + list(ref_fasta_files)) - -# move output files to new empty directory -out_dir = os.path.join(arvados.current_task().tmpdir, 'out') -arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr) -os.mkdir(out_dir) -for f in os.listdir(ref_dir): - if re.search(r'\.(amb|ann|bwt|pac|rbwt|rpac|rsa|sa)$', f): - sys.stderr.write("bwa output: %s (%d)\n" % - (f, os.stat(os.path.join(ref_dir, f)).st_size)) - os.rename(os.path.join(ref_dir, f), - os.path.join(out_dir, f)) - -# store output -out = arvados.CollectionWriter() -out.write_directory_tree(out_dir, max_manifest_depth=0) -this_task.set_output(out.finish()) diff --git a/crunch_scripts/collection-merge b/crunch_scripts/collection-merge deleted file mode 100755 index f3aa5ce9cf..0000000000 --- a/crunch_scripts/collection-merge +++ /dev/null @@ -1,49 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -# collection-merge -# -# Merge two or more collections together. Can also be used to extract specific -# files from a collection to produce a new collection. -# -# input: -# An array of collections or collection/file paths in script_parameter["input"] -# -# output: -# A manifest with the collections merged. Duplicate file names will -# have their contents concatenated in the order that they appear in the input -# array. - -import arvados -import md5 -import crunchutil.subst as subst -import subprocess -import os -import hashlib - -p = arvados.current_job()['script_parameters'] - -merged = "" -src = [] -for c in p["input"]: - c = subst.do_substitution(p, c) - i = c.find('/') - if i == -1: - src.append(c) - merged += arvados.CollectionReader(c).manifest_text() - else: - src.append(c[0:i]) - cr = arvados.CollectionReader(c[0:i]) - j = c.rfind('/') - stream = c[i+1:j] - if stream == "": - stream = "." - fn = c[(j+1):] - for s in cr.all_streams(): - if s.name() == stream: - if fn in s.files(): - merged += s.files()[fn].as_manifest() - -arvados.current_task().set_output(merged) diff --git a/crunch_scripts/crunchrunner b/crunch_scripts/crunchrunner deleted file mode 100755 index 25d3ba524c..0000000000 --- a/crunch_scripts/crunchrunner +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/sh -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -if test -n "$JOB_PARAMETER_CRUNCHRUNNER" ; then - exec $TASK_KEEPMOUNT/$JOB_PARAMETER_CRUNCHRUNNER -else - exec /usr/local/bin/crunchrunner -fi diff --git a/crunch_scripts/crunchutil/__init__.py b/crunch_scripts/crunchutil/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/crunch_scripts/crunchutil/robust_put.py b/crunch_scripts/crunchutil/robust_put.py deleted file mode 100644 index 27b0bf3456..0000000000 --- a/crunch_scripts/crunchutil/robust_put.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import arvados.commands.put as put -import os -import logging -import time - -def machine_progress(bytes_written, bytes_expected): - return "upload wrote {} total {}\n".format( - bytes_written, -1 if (bytes_expected is None) else bytes_expected) - -class Args(object): - def __init__(self, fn): - self.filename = None - self.paths = [fn] - self.max_manifest_depth = 0 - -# Upload to Keep with error recovery. -# Return a uuid or raise an exception if there are too many failures. -def upload(source_dir, logger=None): - if logger is None: - logger = logging.getLogger("arvados") - - source_dir = os.path.abspath(source_dir) - done = False - if 'TASK_WORK' in os.environ: - resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint")) - else: - resume_cache = put.ResumeCache(put.ResumeCache.make_path(Args(source_dir))) - reporter = put.progress_writer(machine_progress) - bytes_expected = put.expected_bytes_for([source_dir]) - backoff = 1 - outuuid = None - while not done: - try: - out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected) - out.do_queued_work() - out.write_directory_tree(source_dir, max_manifest_depth=0) - outuuid = out.finish() - done = True - except KeyboardInterrupt as e: - logger.critical("caught interrupt signal 2") - raise e - except Exception as e: - logger.exception("caught exception:") - backoff *= 2 - if backoff > 256: - logger.critical("Too many upload failures, giving up") - raise e - else: - logger.warning("Sleeping for %s seconds before trying again" % backoff) - time.sleep(backoff) - return outuuid diff --git a/crunch_scripts/crunchutil/subst.py b/crunch_scripts/crunchutil/subst.py deleted file mode 100644 index 53def97f96..0000000000 --- a/crunch_scripts/crunchutil/subst.py +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import glob -import os -import re -import stat - -BACKSLASH_ESCAPE_RE = re.compile(r'\\(.)') - -class SubstitutionError(Exception): - pass - -def search(c): - DEFAULT = 0 - DOLLAR = 1 - - i = 0 - state = DEFAULT - start = None - depth = 0 - while i < len(c): - if c[i] == '\\': - i += 1 - elif state == DEFAULT: - if c[i] == '$': - state = DOLLAR - if depth == 0: - start = i - elif c[i] == ')': - if depth == 1: - return [start, i] - if depth > 0: - depth -= 1 - elif state == DOLLAR: - if c[i] == '(': - depth += 1 - state = DEFAULT - i += 1 - if depth != 0: - raise SubstitutionError("Substitution error, mismatched parentheses {}".format(c)) - return None - -def sub_file(v): - path = os.path.join(os.environ['TASK_KEEPMOUNT'], v) - st = os.stat(path) - if st and stat.S_ISREG(st.st_mode): - return path - else: - raise SubstitutionError("$(file {}) is not accessible or is not a regular file".format(path)) - -def sub_dir(v): - d = os.path.dirname(v) - if d == '': - d = v - path = os.path.join(os.environ['TASK_KEEPMOUNT'], d) - st = os.stat(path) - if st and stat.S_ISDIR(st.st_mode): - return path - else: - raise SubstitutionError("$(dir {}) is not accessible or is not a directory".format(path)) - -def sub_basename(v): - return os.path.splitext(os.path.basename(v))[0] - -def sub_glob(v): - l = glob.glob(v) - if len(l) == 0: - raise SubstitutionError("$(glob {}) no match found".format(v)) - else: - return l[0] - -default_subs = {"file ": sub_file, - "dir ": sub_dir, - "basename ": sub_basename, - "glob ": sub_glob} - -def do_substitution(p, c, subs=default_subs): - while True: - m = search(c) - if m is None: - return BACKSLASH_ESCAPE_RE.sub(r'\1', c) - - v = do_substitution(p, c[m[0]+2 : m[1]]) - var = True - for sub in subs: - if v.startswith(sub): - r = subs[sub](v[len(sub):]) - var = False - break - if var: - if v in p: - r = p[v] - else: - raise SubstitutionError("Unknown variable or function '%s' while performing substitution on '%s'" % (v, c)) - if r is None: - raise SubstitutionError("Substitution for '%s' is null while performing substitution on '%s'" % (v, c)) - if not isinstance(r, basestring): - raise SubstitutionError("Substitution for '%s' must be a string while performing substitution on '%s'" % (v, c)) - - c = c[:m[0]] + r + c[m[1]+1:] diff --git a/crunch_scripts/crunchutil/vwd.py b/crunch_scripts/crunchutil/vwd.py deleted file mode 100644 index 3245da14b3..0000000000 --- a/crunch_scripts/crunchutil/vwd.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import os -import stat -import arvados.commands.run -import logging - -# Implements "Virtual Working Directory" -# Provides a way of emulating a shared writable directory in Keep based -# on a "check out, edit, check in, merge" model. -# At the moment, this only permits adding new files, applications -# cannot modify or delete existing files. - -# Create a symlink tree rooted at target_dir mirroring arv-mounted -# source_collection. target_dir must be empty, and will be created if it -# doesn't exist. -def checkout(source_collection, target_dir, keepmount=None): - # create symlinks - if keepmount is None: - keepmount = os.environ['TASK_KEEPMOUNT'] - - if not os.path.exists(target_dir): - os.makedirs(target_dir) - - l = os.listdir(target_dir) - if len(l) > 0: - raise Exception("target_dir must be empty before checkout, contains %s" % l) - - stem = os.path.join(keepmount, source_collection) - for root, dirs, files in os.walk(os.path.join(keepmount, source_collection), topdown=True): - rel = root[len(stem)+1:] - for d in dirs: - os.mkdir(os.path.join(target_dir, rel, d)) - for f in files: - os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f)) - -def checkin(target_dir): - """Write files in `target_dir` to Keep. - - Regular files or symlinks to files outside the keep mount are written to - Keep as normal files (Keep does not support symlinks). - - Symlinks to files in the keep mount will result in files in the new - collection which reference existing Keep blocks, no data copying necessary. - - Returns a new Collection object, with data flushed but the collection record - not saved to the API. - - """ - - outputcollection = arvados.collection.Collection(num_retries=5) - - if target_dir[-1:] != '/': - target_dir += '/' - - collections = {} - - logger = logging.getLogger("arvados") - - last_error = None - for root, dirs, files in os.walk(target_dir): - for f in files: - try: - s = os.lstat(os.path.join(root, f)) - - writeIt = False - - if stat.S_ISREG(s.st_mode): - writeIt = True - elif stat.S_ISLNK(s.st_mode): - # 1. check if it is a link into a collection - real = os.path.split(os.path.realpath(os.path.join(root, f))) - (pdh, branch) = arvados.commands.run.is_in_collection(real[0], real[1]) - if pdh is not None: - # 2. load collection - if pdh not in collections: - # 2.1 make sure it is flushed (see #5787 note 11) - fd = os.open(real[0], os.O_RDONLY) - os.fsync(fd) - os.close(fd) - - # 2.2 get collection from API server - collections[pdh] = arvados.collection.CollectionReader(pdh, - api_client=outputcollection._my_api(), - keep_client=outputcollection._my_keep(), - num_retries=5) - # 3. copy arvfile to new collection - outputcollection.copy(branch, os.path.join(root[len(target_dir):], f), source_collection=collections[pdh]) - else: - writeIt = True - - if writeIt: - reldir = root[len(target_dir):] - with outputcollection.open(os.path.join(reldir, f), "wb") as writer: - with open(os.path.join(root, f), "rb") as reader: - dat = reader.read(64*1024) - while dat: - writer.write(dat) - dat = reader.read(64*1024) - except (IOError, OSError) as e: - logger.error(e) - last_error = e - - return (outputcollection, last_error) diff --git a/crunch_scripts/cwl-runner b/crunch_scripts/cwl-runner deleted file mode 100755 index 0c79844d5f..0000000000 --- a/crunch_scripts/cwl-runner +++ /dev/null @@ -1,117 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -# Crunch script integration for running arvados-cwl-runner inside a crunch job. - -import arvados_cwl -import sys - -try: - # Use the crunch script defined in the arvados_cwl package. This helps - # prevent the crunch script from going out of sync with the rest of the - # arvados_cwl package. - import arvados_cwl.crunch_script - arvados_cwl.crunch_script.run() - sys.exit() -except ImportError: - pass - -# When running against an older arvados-cwl-runner package without -# arvados_cwl.crunch_script, fall back to the old code. - - -# This gets the job record, transforms the script parameters into a valid CWL -# input object, then executes the CWL runner to run the underlying workflow or -# tool. When the workflow completes, record the output object in an output -# collection for this runner job. - -import arvados -import arvados.collection -import arvados.util -import cwltool.main -import logging -import os -import json -import argparse -import re -import functools - -from arvados.api import OrderedJsonModel -from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs -from cwltool.load_tool import load_tool - -# Print package versions -logging.info(cwltool.main.versionstring()) - -api = arvados.api("v1") - -try: - job_order_object = arvados.current_job()['script_parameters'] - - pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$') - - def keeppath(v): - if pdh_path.match(v): - return "keep:%s" % v - else: - return v - - def keeppathObj(v): - v["location"] = keeppath(v["location"]) - - job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"]) - - for k,v in job_order_object.items(): - if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v): - job_order_object[k] = { - "class": "File", - "location": "keep:%s" % v - } - - adjustFileObjs(job_order_object, keeppathObj) - adjustDirObjs(job_order_object, keeppathObj) - normalizeFilesDirs(job_order_object) - adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api))) - - output_name = None - if "arv:output_name" in job_order_object: - output_name = job_order_object["arv:output_name"] - del job_order_object["arv:output_name"] - - runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()), - output_name=output_name) - - t = load_tool(job_order_object, runner.arv_make_tool) - - args = argparse.Namespace() - args.project_uuid = arvados.current_job()["owner_uuid"] - args.enable_reuse = True - args.submit = False - args.debug = True - args.quiet = False - args.ignore_docker_for_reuse = False - args.basedir = os.getcwd() - args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]} - outputObj = runner.arv_executor(t, job_order_object, **vars(args)) - - if runner.final_output_collection: - outputCollection = runner.final_output_collection.portable_data_hash() - else: - outputCollection = None - - api.job_tasks().update(uuid=arvados.current_task()['uuid'], - body={ - 'output': outputCollection, - 'success': True, - 'progress':1.0 - }).execute() -except Exception as e: - logging.exception("Unhandled exception") - api.job_tasks().update(uuid=arvados.current_task()['uuid'], - body={ - 'output': None, - 'success': False, - 'progress':1.0 - }).execute() diff --git a/crunch_scripts/decompress-all.py b/crunch_scripts/decompress-all.py deleted file mode 100755 index 100ea12239..0000000000 --- a/crunch_scripts/decompress-all.py +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -# -# decompress-all.py -# -# Decompress all compressed files in the collection using the "dtrx" tool and -# produce a new collection with the contents. Uncompressed files -# are passed through. -# -# input: -# A collection at script_parameters["input"] -# -# output: -# A manifest of the uncompressed contents of the input collection. - -import arvados -import re -import subprocess -import os -import sys -import crunchutil.robust_put as robust_put - -arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True, - input_as_path=True) - -task = arvados.current_task() - -input_file = task['parameters']['input'] - -infile_parts = re.match(r"(^[a-f0-9]{32}\+\d+)(\+\S+)*(/.*)?(/[^/]+)$", input_file) - -outdir = os.path.join(task.tmpdir, "output") -os.makedirs(outdir) -os.chdir(outdir) - -if infile_parts is None: - print >>sys.stderr, "Failed to parse input filename '%s' as a Keep file\n" % input_file - sys.exit(1) - -cr = arvados.CollectionReader(infile_parts.group(1)) -streamname = infile_parts.group(3)[1:] -filename = infile_parts.group(4)[1:] - -if streamname is not None: - subprocess.call(["mkdir", "-p", streamname]) - os.chdir(streamname) -else: - streamname = '.' - -m = re.match(r'.*\.(gz|Z|bz2|tgz|tbz|zip|rar|7z|cab|deb|rpm|cpio|gem)$', arvados.get_task_param_mount('input'), re.IGNORECASE) - -if m is not None: - rc = subprocess.call(["dtrx", "-r", "-n", "-q", arvados.get_task_param_mount('input')]) - if rc == 0: - task.set_output(robust_put.upload(outdir)) - else: - sys.exit(rc) -else: - streamreader = filter(lambda s: s.name() == streamname, cr.all_streams())[0] - filereader = streamreader.files()[filename] - task.set_output(streamname + filereader.as_manifest()[1:]) diff --git a/crunch_scripts/file-select b/crunch_scripts/file-select deleted file mode 100755 index c4af05c820..0000000000 --- a/crunch_scripts/file-select +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import os -import re - -this_job = arvados.current_job() -this_task = arvados.current_task() -this_job_input = this_job['script_parameters']['input'] -manifest_text = "" -for f in arvados.CollectionReader(this_job_input).all_files(): - if f.name() in this_job['script_parameters']['names']: - manifest_text += f.as_manifest() - -this_task.set_output(arvados.Keep.put(manifest_text)) diff --git a/crunch_scripts/grep b/crunch_scripts/grep deleted file mode 100755 index a84c0f671c..0000000000 --- a/crunch_scripts/grep +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import re - -arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True) - -this_job = arvados.current_job() -this_task = arvados.current_task() -this_task_input = this_task['parameters']['input'] -pattern = re.compile(this_job['script_parameters']['pattern']) - -input_file = list(arvados.CollectionReader(this_task_input).all_files())[0] -out = arvados.CollectionWriter() -out.set_current_file_name(input_file.decompressed_name()) -out.set_current_stream_name(input_file.stream_name()) -for line in input_file.readlines(): - if pattern.search(line): - out.write(line) - -this_task.set_output(out.finish()) diff --git a/crunch_scripts/hash b/crunch_scripts/hash deleted file mode 100755 index 56eec7a5ff..0000000000 --- a/crunch_scripts/hash +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import hashlib -import os - -arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=True) - -this_job = arvados.current_job() -this_task = arvados.current_task() - -if 'algorithm' in this_job['script_parameters']: - alg = this_job['script_parameters']['algorithm'] -else: - alg = 'md5' -digestor = hashlib.new(alg) - -input_file = arvados.get_task_param_mount('input') - -with open(input_file) as f: - while True: - buf = f.read(2**20) - if len(buf) == 0: - break - digestor.update(buf) - -hexdigest = digestor.hexdigest() - -file_name = '/'.join(this_task['parameters']['input'].split('/')[1:]) - -out = arvados.CollectionWriter() -out.set_current_file_name("md5sum.txt") -out.write("%s %s\n" % (hexdigest, file_name)) -this_task.set_output(out.finish()) diff --git a/crunch_scripts/pgp-survey-import b/crunch_scripts/pgp-survey-import deleted file mode 100755 index f12e84b2da..0000000000 --- a/crunch_scripts/pgp-survey-import +++ /dev/null @@ -1,119 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import string -import json -import UserDict -import sys - -this_job = arvados.current_job() -this_task = arvados.current_task() -this_job_input = this_job['script_parameters']['input'] - -out = arvados.CollectionWriter() -out.set_current_file_name("arvados_objects.json") -out.write("[\n") -separator = "" - -traits = {} -done_bytes = 0 -done_ratio = 0 -for input_file in arvados.CollectionReader(this_job_input).all_files(): - for line_number, line in enumerate(input_file.readlines()): - - done_bytes += len(line) - new_done_ratio = 1.0 * done_bytes / input_file.size() - if line_number == 2 or new_done_ratio - done_ratio > 0.05: - sys.stderr.write("progress: %d%% after %d lines\n" % (int(done_ratio * 100), line_number+1)) - done_ratio = new_done_ratio - - words = string.split(string.strip(line), "\t") - if line_number == 0: - headings = words - for t in arvados.api('v1').traits().list( - where={'name':words}, - limit=1000 - ).execute()['items']: - traits[t['name']] = t - for i, trait_name in enumerate(words[3:], start=3): - # find or create trait - if trait_name not in traits: - traits_match = arvados.api('v1').traits().list( - where={'name':trait_name} - ).execute()['items'] - if len(traits_match) > 0: - traits[trait_name] = traits_match[0] - else: - traits[trait_name] = arvados.api('v1').traits().create( - trait={'name':trait_name}).execute() - out.write(separator) - out.write(json.dumps(traits[trait_name])) - separator = ",\n" - else: - huID_links_match = arvados.api('v1').links().list( - where={'link_class':'identifier','name':words[0]} - ).execute()['items'] - if len(huID_links_match) > 0: - human_uuid = huID_links_match[0]['head_uuid'] - else: - human = arvados.api('v1').humans().create( - body={} - ).execute() - huID_link = arvados.api('v1').links().create( - body={ - 'link_class':'identifier', - 'name':words[0], - 'head_kind':'arvados#human', - 'head_uuid':human['uuid'] - } - ).execute() - human_uuid = human['uuid'] - human_trait = {} - for t in arvados.api('v1').links().list( - limit=10000, - where={ - 'tail_uuid':human_uuid, - 'tail_kind':'arvados#human', - 'head_kind':'arvados#trait', - 'link_class':'human_trait', - 'name':'pgp-survey-response' - } - ).execute()['items']: - human_trait[t['head_uuid']] = t - for i, trait_value in enumerate(words[3:], start=3): - trait_uuid = traits[headings[i]]['uuid'] - if trait_uuid in human_trait: - trait_link = human_trait[trait_uuid] - if trait_link['properties']['value'] != trait_value: - # update database value to match survey response - trait_link['properties']['value'] = trait_value - arvados.api('v1').links().update( - uuid=trait_link['uuid'], - body={'properties':trait_link['properties']} - ).execute() - out.write(",\n") - out.write(json.dumps(trait_link)) - elif trait_value == '': - # nothing in database, nothing in input - pass - else: - trait_link = { - 'tail_uuid':human_uuid, - 'tail_kind':'arvados#human', - 'head_uuid':traits[headings[i]]['uuid'], - 'head_kind':'arvados#trait', - 'link_class':'human_trait', - 'name':'pgp-survey-response', - 'properties': { 'value': trait_value } - } - arvados.api('v1').links().create( - body=trait_link - ).execute() - out.write(",\n") - out.write(json.dumps(trait_link)) - -out.write("\n]\n") -this_task.set_output(out.finish()) diff --git a/crunch_scripts/pgp-survey-parse b/crunch_scripts/pgp-survey-parse deleted file mode 100755 index ee852f1d24..0000000000 --- a/crunch_scripts/pgp-survey-parse +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados - -this_job = arvados.current_job() -this_task = arvados.current_task() -parser_path = arvados.util.git_checkout( - url = this_job['script_parameters']['parser_url'], - version = this_job['script_parameters']['parser_version'], - path = 'parser') - -stdoutdata, stderrdata = arvados.util.run_command( - ["python", "demo.py"], - cwd=parser_path) - -out = arvados.CollectionWriter() -out.write(stdoutdata) -out.set_current_file_name('participant_traits.tsv') -this_task.set_output(out.finish()) diff --git a/crunch_scripts/picard-gatk2-prep b/crunch_scripts/picard-gatk2-prep deleted file mode 100755 index 976060f01b..0000000000 --- a/crunch_scripts/picard-gatk2-prep +++ /dev/null @@ -1,211 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import os -import re -import sys -import subprocess -import arvados_picard -from arvados_ipc import * - -arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True) - -this_job = arvados.current_job() -this_task = arvados.current_task() -ref_dir = arvados.util.collection_extract( - collection = this_job['script_parameters']['reference'], - path = 'reference', - decompress = True) -ref_fasta_files = [os.path.join(ref_dir, f) - for f in os.listdir(ref_dir) - if re.search(r'\.fasta(\.gz)?$', f)] -input_collection = this_task['parameters']['input'] - -for s in arvados.CollectionReader(input_collection).all_streams(): - for f in s.all_files(): - input_stream_name = s.name() - input_file_name = f.name() - break - -# Unfortunately, picard FixMateInformation cannot read from a pipe. We -# must copy the input to a temporary file before running picard. -input_bam_path = os.path.join(this_task.tmpdir, input_file_name) -with open(input_bam_path, 'wb') as bam: - for s in arvados.CollectionReader(input_collection).all_streams(): - for f in s.all_files(): - for s in f.readall(): - bam.write(s) - -children = {} -pipes = {} - -pipe_setup(pipes, 'fixmate') -if 0==named_fork(children, 'fixmate'): - pipe_closeallbut(pipes, ('fixmate', 'w')) - arvados_picard.run( - 'FixMateInformation', - params={ - 'i': input_bam_path, - 'o': '/dev/stdout', - 'quiet': 'true', - 'so': 'coordinate', - 'validation_stringency': 'LENIENT', - 'compression_level': 0 - }, - stdout=os.fdopen(pipes['fixmate','w'], 'wb', 2**20)) - os._exit(0) -os.close(pipes.pop(('fixmate','w'), None)) - -pipe_setup(pipes, 'sortsam') -if 0==named_fork(children, 'sortsam'): - pipe_closeallbut(pipes, ('fixmate', 'r'), ('sortsam', 'w')) - arvados_picard.run( - 'SortSam', - params={ - 'i': '/dev/stdin', - 'o': '/dev/stdout', - 'quiet': 'true', - 'so': 'coordinate', - 'validation_stringency': 'LENIENT', - 'compression_level': 0 - }, - stdin=os.fdopen(pipes['fixmate','r'], 'rb', 2**20), - stdout=os.fdopen(pipes['sortsam','w'], 'wb', 2**20)) - os._exit(0) - -pipe_setup(pipes, 'reordersam') -if 0==named_fork(children, 'reordersam'): - pipe_closeallbut(pipes, ('sortsam', 'r'), ('reordersam', 'w')) - arvados_picard.run( - 'ReorderSam', - params={ - 'i': '/dev/stdin', - 'o': '/dev/stdout', - 'reference': ref_fasta_files[0], - 'quiet': 'true', - 'validation_stringency': 'LENIENT', - 'compression_level': 0 - }, - stdin=os.fdopen(pipes['sortsam','r'], 'rb', 2**20), - stdout=os.fdopen(pipes['reordersam','w'], 'wb', 2**20)) - os._exit(0) - -pipe_setup(pipes, 'addrg') -if 0==named_fork(children, 'addrg'): - pipe_closeallbut(pipes, ('reordersam', 'r'), ('addrg', 'w')) - arvados_picard.run( - 'AddOrReplaceReadGroups', - params={ - 'i': '/dev/stdin', - 'o': '/dev/stdout', - 'quiet': 'true', - 'rglb': this_job['script_parameters'].get('rglb', 0), - 'rgpl': this_job['script_parameters'].get('rgpl', 'illumina'), - 'rgpu': this_job['script_parameters'].get('rgpu', 0), - 'rgsm': this_job['script_parameters'].get('rgsm', 0), - 'validation_stringency': 'LENIENT' - }, - stdin=os.fdopen(pipes['reordersam','r'], 'rb', 2**20), - stdout=os.fdopen(pipes['addrg','w'], 'wb', 2**20)) - os._exit(0) - -pipe_setup(pipes, 'bammanifest') -pipe_setup(pipes, 'bam') -pipe_setup(pipes, 'casm_in') -if 0==named_fork(children, 'bammanifest'): - pipe_closeallbut(pipes, - ('addrg', 'r'), - ('bammanifest', 'w'), - ('bam', 'w'), - ('casm_in', 'w')) - out = arvados.CollectionWriter() - out.start_new_stream(input_stream_name) - out.start_new_file(input_file_name) - while True: - buf = os.read(pipes['addrg','r'], 2**20) - if len(buf) == 0: - break - os.write(pipes['bam','w'], buf) - os.write(pipes['casm_in','w'], buf) - out.write(buf) - os.write(pipes['bammanifest','w'], out.manifest_text()) - os.close(pipes['bammanifest','w']) - os._exit(0) - -pipe_setup(pipes, 'casm') -if 0 == named_fork(children, 'casm'): - pipe_closeallbut(pipes, ('casm_in', 'r'), ('casm', 'w')) - arvados_picard.run( - 'CollectAlignmentSummaryMetrics', - params={ - 'input': '/dev/fd/' + str(pipes['casm_in','r']), - 'output': '/dev/fd/' + str(pipes['casm','w']), - 'reference_sequence': ref_fasta_files[0], - 'validation_stringency': 'LENIENT', - }, - close_fds=False) - os._exit(0) - -pipe_setup(pipes, 'index') -if 0==named_fork(children, 'index'): - pipe_closeallbut(pipes, ('bam', 'r'), ('index', 'w')) - arvados_picard.run( - 'BuildBamIndex', - params={ - 'i': '/dev/stdin', - 'o': '/dev/stdout', - 'quiet': 'true', - 'validation_stringency': 'LENIENT' - }, - stdin=os.fdopen(pipes['bam','r'], 'rb', 2**20), - stdout=os.fdopen(pipes['index','w'], 'wb', 2**20)) - os._exit(0) - -pipe_setup(pipes, 'indexmanifest') -if 0==named_fork(children, 'indexmanifest'): - pipe_closeallbut(pipes, ('index', 'r'), ('indexmanifest', 'w')) - out = arvados.CollectionWriter() - out.start_new_stream(input_stream_name) - out.start_new_file(re.sub('\.bam$', '.bai', input_file_name)) - while True: - buf = os.read(pipes['index','r'], 2**20) - if len(buf) == 0: - break - out.write(buf) - os.write(pipes['indexmanifest','w'], out.manifest_text()) - os.close(pipes['indexmanifest','w']) - os._exit(0) - -pipe_closeallbut(pipes, - ('bammanifest', 'r'), - ('indexmanifest', 'r'), - ('casm', 'r')) - -outmanifest = '' - -for which in ['bammanifest', 'indexmanifest']: - with os.fdopen(pipes[which,'r'], 'rb', 2**20) as f: - while True: - buf = f.read() - if buf == '': - break - outmanifest += buf - -casm_out = arvados.CollectionWriter() -casm_out.start_new_stream(input_stream_name) -casm_out.start_new_file(input_file_name + '.casm.tsv') -casm_out.write(os.fdopen(pipes.pop(('casm','r')))) - -outmanifest += casm_out.manifest_text() - -all_ok = True -for (childname, pid) in children.items(): - all_ok = all_ok and waitpid_and_check_exit(pid, childname) - -if all_ok: - this_task.set_output(outmanifest) -else: - sys.exit(1) diff --git a/crunch_scripts/pyrtg.py b/crunch_scripts/pyrtg.py deleted file mode 100644 index d733270f87..0000000000 --- a/crunch_scripts/pyrtg.py +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import re -import os -import sys - -rtg_install_path = None - -def setup(): - global rtg_install_path - if rtg_install_path: - return rtg_install_path - rtg_path = arvados.util.zipball_extract( - zipball = arvados.current_job()['script_parameters']['rtg_binary_zip'], - path = 'rtg') - rtg_license_path = arvados.util.collection_extract( - collection = arvados.current_job()['script_parameters']['rtg_license'], - path = 'license', - decompress = False) - - # symlink to rtg-license.txt - license_txt_path = os.path.join(rtg_license_path, 'rtg-license.txt') - try: - os.symlink(license_txt_path, os.path.join(rtg_path,'rtg-license.txt')) - except OSError: - if not os.path.exists(os.path.join(rtg_path,'rtg-license.txt')): - os.symlink(license_txt_path, os.path.join(rtg_path,'rtg-license.txt')) - - rtg_install_path = rtg_path - return rtg_path - -def run_rtg(command, output_dir, command_args, **kwargs): - global rtg_install_path - execargs = [os.path.join(rtg_install_path, 'rtg'), - command, - '-o', output_dir] - execargs += command_args - sys.stderr.write("run_rtg: exec %s\n" % str(execargs)) - arvados.util.run_command( - execargs, - cwd=arvados.current_task().tmpdir, - stderr=sys.stderr, - stdout=sys.stderr) - - # Exit status cannot be trusted in rtg 1.1.1. - assert_done(output_dir) - - # Copy log files to stderr and delete them to avoid storing them - # in Keep with the output data. - for dirent in arvados.util.listdir_recursive(output_dir): - if is_log_file(dirent): - log_file = os.path.join(output_dir, dirent) - sys.stderr.write(' '.join(['==>', dirent, '<==\n'])) - with open(log_file, 'rb') as f: - while True: - buf = f.read(2**20) - if len(buf) == 0: - break - sys.stderr.write(buf) - sys.stderr.write('\n') # in case log does not end in newline - os.unlink(log_file) - -def assert_done(output_dir): - # Sanity-check exit code. - done_file = os.path.join(output_dir, 'done') - if not os.path.exists(done_file): - raise Exception("rtg exited 0 but %s does not exist. abort.\n" % done_file) - -def is_log_file(filename): - return re.search(r'^(.*/)?(progress|done|\S+.log)$', filename) - -setup() diff --git a/crunch_scripts/rtg-fasta2sdf b/crunch_scripts/rtg-fasta2sdf deleted file mode 100755 index f1ef617f6a..0000000000 --- a/crunch_scripts/rtg-fasta2sdf +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import os -import re -import sys -import pyrtg - -this_job = arvados.current_job() -this_task = arvados.current_task() -fasta_path = arvados.util.collection_extract( - collection = this_job['script_parameters']['input'], - path = 'fasta', - decompress = False) -fasta_files = filter(lambda f: f != '.locator', os.listdir(fasta_path)) -out_dir = os.path.join(arvados.current_task().tmpdir, 'ref-sdf') -arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr) - -pyrtg.run_rtg('format', out_dir, - map(lambda f: os.path.join(fasta_path, f), fasta_files)) - -out = arvados.CollectionWriter() -out.write_directory_tree(out_dir, max_manifest_depth=0) -this_task.set_output(out.finish()) diff --git a/crunch_scripts/rtg-fastq2sdf b/crunch_scripts/rtg-fastq2sdf deleted file mode 100755 index e42697fc40..0000000000 --- a/crunch_scripts/rtg-fastq2sdf +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import os -import re -import sys -import pyrtg - -this_job = arvados.current_job() -this_task = arvados.current_task() -fastq_path = arvados.util.collection_extract( - collection = this_job['script_parameters']['input'], - path = 'fastq') -fastq_files = filter(lambda f: f != '.locator', os.listdir(fastq_path)) -tmp_dir_base = os.path.join(arvados.current_task().tmpdir, 'tmp') -out_dir = os.path.join(arvados.current_task().tmpdir, 'reads') - -arvados.util.run_command(['rm', '-rf', tmp_dir_base], stderr=sys.stderr) -arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr) -os.mkdir(tmp_dir_base) - -# convert fastq to sdf -tmp_dirs = [] -for leftarm in fastq_files: - if re.search(r'_1.f(ast)?q(.gz)?$', leftarm): - rightarm = re.sub(r'_1(.f(ast)?q(.gz)?)$', '_2\\1', leftarm) - if rightarm in fastq_files: - tmp_dirs += ['%s/%08d' % (tmp_dir_base, len(tmp_dirs))] - pyrtg.run_rtg('format', tmp_dirs[-1], - ['-f', 'fastq', - '-q', 'sanger', - '-l', os.path.join(fastq_path, leftarm), - '-r', os.path.join(fastq_path, rightarm)]) - -# split sdf -pyrtg.run_rtg('sdfsplit', out_dir, - ['-n', '1500000'] + tmp_dirs) - -# store output -out = arvados.CollectionWriter() -out.write_directory_tree(out_dir, max_manifest_depth=1) -this_task.set_output(out.finish()) diff --git a/crunch_scripts/rtg-map b/crunch_scripts/rtg-map deleted file mode 100755 index f740888b9f..0000000000 --- a/crunch_scripts/rtg-map +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import os -import re -import sys -import pyrtg - -arvados.job_setup.one_task_per_input_stream(if_sequence=0, and_end_task=True) - -this_job = arvados.current_job() -this_task = arvados.current_task() -in_dir = os.path.join(this_task.tmpdir, 'input') -arvados.util.run_command(['rm', '-rf', in_dir], stderr=sys.stderr) -in_dir = arvados.util.stream_extract( - stream = arvados.StreamReader(this_task['parameters']['input']), - path = in_dir, - decompress = False) -ref_dir = arvados.util.collection_extract( - collection = this_job['script_parameters']['reference'], - path = 'reference', - decompress = False) - -out_dir = os.path.join(arvados.current_task().tmpdir, 'out') -arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr) - -# map reads -pyrtg.run_rtg('map', out_dir, - ['-i', in_dir, - '-t', ref_dir, - '-a', '2', - '-b', '1', - '--sam-rg', '@RG\\tID:NA\\tSM:NA\\tPL:ILLUMINA']) - -# store output -out = arvados.CollectionWriter() -out.write_directory_tree(out_dir, this_task['parameters']['input'][0], 0) -this_task.set_output(out.finish()) diff --git a/crunch_scripts/rtg-snp b/crunch_scripts/rtg-snp deleted file mode 100755 index 1d8a605b9c..0000000000 --- a/crunch_scripts/rtg-snp +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import os -import re -import sys -import pyrtg - -this_job = arvados.current_job() -this_task = arvados.current_task() -ref_dir = arvados.util.collection_extract( - collection = this_job['script_parameters']['reference'], - path = 'reference', - decompress = False) -input_dir = arvados.util.collection_extract( - collection = this_job['script_parameters']['input'], - path = 'input') -bam_files = map(lambda f: os.path.join(input_dir, f), - filter(lambda f: re.search(r'^(.*/)?alignments.bam$', f), - arvados.util.listdir_recursive(input_dir))) -out_dir = os.path.join(arvados.current_task().tmpdir, 'out') -arvados.util.run_command(['rm', '-rf', out_dir], stderr=sys.stderr) - -# call sequence variants -pyrtg.run_rtg('snp', out_dir, - ['-t', ref_dir] + bam_files) - -# store output -out = arvados.CollectionWriter() -out.write_directory_tree(out_dir, max_manifest_depth=0) -this_task.set_output(out.finish()) diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command deleted file mode 100755 index 3fd08bf28b..0000000000 --- a/crunch_scripts/run-command +++ /dev/null @@ -1,458 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import logging - -logger = logging.getLogger('run-command') -log_handler = logging.StreamHandler() -log_handler.setFormatter(logging.Formatter("run-command: %(message)s")) -logger.addHandler(log_handler) -logger.setLevel(logging.INFO) - -import arvados -import re -import os -import subprocess -import sys -import shutil -import crunchutil.subst as subst -import time -import arvados.commands.put as put -import signal -import stat -import copy -import traceback -import pprint -import multiprocessing -import crunchutil.robust_put as robust_put -import crunchutil.vwd as vwd -import argparse -import json -import tempfile -import errno - -parser = argparse.ArgumentParser() -parser.add_argument('--dry-run', action='store_true') -parser.add_argument('--script-parameters', type=str, default="{}") -args = parser.parse_args() - -os.umask(0077) - -if not args.dry_run: - api = arvados.api('v1') - t = arvados.current_task().tmpdir - os.chdir(arvados.current_task().tmpdir) - os.mkdir("tmpdir") - os.mkdir("output") - - os.chdir("output") - - outdir = os.getcwd() - - taskp = None - jobp = arvados.current_job()['script_parameters'] - if len(arvados.current_task()['parameters']) > 0: - taskp = arvados.current_task()['parameters'] -else: - outdir = "/tmp" - jobp = json.loads(args.script_parameters) - os.environ['JOB_UUID'] = 'zzzzz-8i9sb-1234567890abcde' - os.environ['TASK_UUID'] = 'zzzzz-ot0gb-1234567890abcde' - os.environ['CRUNCH_SRC'] = '/tmp/crunch-src' - if 'TASK_KEEPMOUNT' not in os.environ: - os.environ['TASK_KEEPMOUNT'] = '/keep' - -def sub_tmpdir(v): - return os.path.join(arvados.current_task().tmpdir, 'tmpdir') - -def sub_outdir(v): - return outdir - -def sub_cores(v): - return str(multiprocessing.cpu_count()) - -def sub_jobid(v): - return os.environ['JOB_UUID'] - -def sub_taskid(v): - return os.environ['TASK_UUID'] - -def sub_jobsrc(v): - return os.environ['CRUNCH_SRC'] - -subst.default_subs["task.tmpdir"] = sub_tmpdir -subst.default_subs["task.outdir"] = sub_outdir -subst.default_subs["job.srcdir"] = sub_jobsrc -subst.default_subs["node.cores"] = sub_cores -subst.default_subs["job.uuid"] = sub_jobid -subst.default_subs["task.uuid"] = sub_taskid - -class SigHandler(object): - def __init__(self): - self.sig = None - - def send_signal(self, subprocesses, signum): - for sp in subprocesses: - sp.send_signal(signum) - self.sig = signum - -# http://rightfootin.blogspot.com/2006/09/more-on-python-flatten.html -def flatten(l, ltypes=(list, tuple)): - ltype = type(l) - l = list(l) - i = 0 - while i < len(l): - while isinstance(l[i], ltypes): - if not l[i]: - l.pop(i) - i -= 1 - break - else: - l[i:i + 1] = l[i] - i += 1 - return ltype(l) - -def add_to_group(gr, match): - m = match.groups() - if m not in gr: - gr[m] = [] - gr[m].append(match.group(0)) - -class EvaluationError(Exception): - pass - -# Return the name of variable ('var') that will take on each value in 'items' -# when performing an inner substitution -def var_items(p, c, key): - if key not in c: - raise EvaluationError("'%s' was expected in 'p' but is missing" % key) - - if "var" in c: - if not isinstance(c["var"], basestring): - raise EvaluationError("Value of 'var' must be a string") - # Var specifies the variable name for inner parameter substitution - return (c["var"], get_items(p, c[key])) - else: - # The component function ('key') value is a list, so return the list - # directly with no parameter selected. - if isinstance(c[key], list): - return (None, get_items(p, c[key])) - elif isinstance(c[key], basestring): - # check if c[key] is a string that looks like a parameter - m = re.match("^\$\((.*)\)$", c[key]) - if m and m.group(1) in p: - return (m.group(1), get_items(p, c[key])) - else: - # backwards compatible, foreach specifies bare parameter name to use - return (c[key], get_items(p, p[c[key]])) - else: - raise EvaluationError("Value of '%s' must be a string or list" % key) - -# "p" is the parameter scope, "c" is the item to be expanded. -# If "c" is a dict, apply function expansion. -# If "c" is a list, recursively expand each item and return a new list. -# If "c" is a string, apply parameter substitution -def expand_item(p, c): - if isinstance(c, dict): - if "foreach" in c and "command" in c: - # Expand a command template for each item in the specified user - # parameter - var, items = var_items(p, c, "foreach") - if var is None: - raise EvaluationError("Must specify 'var' in foreach") - r = [] - for i in items: - params = copy.copy(p) - params[var] = i - r.append(expand_item(params, c["command"])) - return r - elif "list" in c and "index" in c and "command" in c: - # extract a single item from a list - var, items = var_items(p, c, "list") - if var is None: - raise EvaluationError("Must specify 'var' in list") - params = copy.copy(p) - params[var] = items[int(c["index"])] - return expand_item(params, c["command"]) - elif "regex" in c: - pattern = re.compile(c["regex"]) - if "filter" in c: - # filter list so that it only includes items that match a - # regular expression - _, items = var_items(p, c, "filter") - return [i for i in items if pattern.match(i)] - elif "group" in c: - # generate a list of lists, where items are grouped on common - # subexpression match - _, items = var_items(p, c, "group") - groups = {} - for i in items: - match = pattern.match(i) - if match: - add_to_group(groups, match) - return [groups[k] for k in groups] - elif "extract" in c: - # generate a list of lists, where items are split by - # subexpression match - _, items = var_items(p, c, "extract") - r = [] - for i in items: - match = pattern.match(i) - if match: - r.append(list(match.groups())) - return r - elif "batch" in c and "size" in c: - # generate a list of lists, where items are split into a batch size - _, items = var_items(p, c, "batch") - sz = int(c["size"]) - r = [] - for j in xrange(0, len(items), sz): - r.append(items[j:j+sz]) - return r - raise EvaluationError("Missing valid list context function") - elif isinstance(c, list): - return [expand_item(p, arg) for arg in c] - elif isinstance(c, basestring): - m = re.match("^\$\((.*)\)$", c) - if m and m.group(1) in p: - return expand_item(p, p[m.group(1)]) - else: - return subst.do_substitution(p, c) - else: - raise EvaluationError("expand_item() unexpected parameter type %s" % type(c)) - -# Evaluate in a list context -# "p" is the parameter scope, "value" will be evaluated -# if "value" is a list after expansion, return that -# if "value" is a path to a directory, return a list consisting of each entry in the directory -# if "value" is a path to a file, return a list consisting of each line of the file -def get_items(p, value): - value = expand_item(p, value) - if isinstance(value, list): - return value - elif isinstance(value, basestring): - mode = os.stat(value).st_mode - prefix = value[len(os.environ['TASK_KEEPMOUNT'])+1:] - if mode is not None: - if stat.S_ISDIR(mode): - items = [os.path.join(value, l) for l in os.listdir(value)] - elif stat.S_ISREG(mode): - with open(value) as f: - items = [line.rstrip("\r\n") for line in f] - return items - raise EvaluationError("get_items did not yield a list") - -stdoutname = None -stdoutfile = None -stdinname = None -stdinfile = None - -# Construct the cross product of all values of each variable listed in fvars -def recursive_foreach(params, fvars): - var = fvars[0] - fvars = fvars[1:] - items = get_items(params, params[var]) - logger.info("parallelizing on %s with items %s" % (var, items)) - if items is not None: - for i in items: - params = copy.copy(params) - params[var] = i - if len(fvars) > 0: - recursive_foreach(params, fvars) - else: - if not args.dry_run: - arvados.api().job_tasks().create(body={ - 'job_uuid': arvados.current_job()['uuid'], - 'created_by_job_task_uuid': arvados.current_task()['uuid'], - 'sequence': 1, - 'parameters': params - }).execute() - else: - if isinstance(params["command"][0], list): - for c in params["command"]: - logger.info(flatten(expand_item(params, c))) - else: - logger.info(flatten(expand_item(params, params["command"]))) - else: - logger.error("parameter %s with value %s in task.foreach yielded no items" % (var, params[var])) - sys.exit(1) - -try: - if "task.foreach" in jobp: - if args.dry_run or arvados.current_task()['sequence'] == 0: - # This is the first task to start the other tasks and exit - fvars = jobp["task.foreach"] - if isinstance(fvars, basestring): - fvars = [fvars] - if not isinstance(fvars, list) or len(fvars) == 0: - logger.error("value of task.foreach must be a string or non-empty list") - sys.exit(1) - recursive_foreach(jobp, jobp["task.foreach"]) - if not args.dry_run: - if "task.vwd" in jobp: - # Set output of the first task to the base vwd collection so it - # will be merged with output fragments from the other tasks by - # crunch. - arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"])) - else: - arvados.current_task().set_output(None) - sys.exit(0) - else: - # This is the only task so taskp/jobp are the same - taskp = jobp -except Exception as e: - logger.exception("caught exception") - logger.error("job parameters were:") - logger.error(pprint.pformat(jobp)) - sys.exit(1) - -try: - if not args.dry_run: - if "task.vwd" in taskp: - # Populate output directory with symlinks to files in collection - vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir) - - if "task.cwd" in taskp: - os.chdir(subst.do_substitution(taskp, taskp["task.cwd"])) - - cmd = [] - if isinstance(taskp["command"][0], list): - for c in taskp["command"]: - cmd.append(flatten(expand_item(taskp, c))) - else: - cmd.append(flatten(expand_item(taskp, taskp["command"]))) - - if "task.stdin" in taskp: - stdinname = subst.do_substitution(taskp, taskp["task.stdin"]) - if not args.dry_run: - stdinfile = open(stdinname, "rb") - - if "task.stdout" in taskp: - stdoutname = subst.do_substitution(taskp, taskp["task.stdout"]) - if not args.dry_run: - stdoutfile = open(stdoutname, "wb") - - if "task.env" in taskp: - env = copy.copy(os.environ) - for k,v in taskp["task.env"].items(): - env[k] = subst.do_substitution(taskp, v) - else: - env = None - - logger.info("{}{}{}".format(' | '.join([' '.join(c) for c in cmd]), (" < " + stdinname) if stdinname is not None else "", (" > " + stdoutname) if stdoutname is not None else "")) - - if args.dry_run: - sys.exit(0) -except subst.SubstitutionError as e: - logger.error(str(e)) - logger.error("task parameters were:") - logger.error(pprint.pformat(taskp)) - sys.exit(1) -except Exception as e: - logger.exception("caught exception") - logger.error("task parameters were:") - logger.error(pprint.pformat(taskp)) - sys.exit(1) - -# rcode holds the return codes produced by each subprocess -rcode = {} -try: - subprocesses = [] - close_streams = [] - if stdinfile: - close_streams.append(stdinfile) - next_stdin = stdinfile - - for i in xrange(len(cmd)): - if i == len(cmd)-1: - # this is the last command in the pipeline, so its stdout should go to stdoutfile - next_stdout = stdoutfile - else: - # this is an intermediate command in the pipeline, so its stdout should go to a pipe - next_stdout = subprocess.PIPE - - sp = subprocess.Popen(cmd[i], shell=False, stdin=next_stdin, stdout=next_stdout, env=env) - - # Need to close the FDs on our side so that subcommands will get SIGPIPE if the - # consuming process ends prematurely. - if sp.stdout: - close_streams.append(sp.stdout) - - # Send this processes's stdout to to the next process's stdin - next_stdin = sp.stdout - - subprocesses.append(sp) - - # File descriptors have been handed off to the subprocesses, so close them here. - for s in close_streams: - s.close() - - # Set up signal handling - sig = SigHandler() - - # Forward terminate signals to the subprocesses. - signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(subprocesses, signum)) - signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(subprocesses, signum)) - signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(subprocesses, signum)) - - active = 1 - pids = set([s.pid for s in subprocesses]) - while len(pids) > 0: - try: - (pid, status) = os.wait() - except OSError as e: - if e.errno == errno.EINTR: - pass - else: - raise - else: - pids.discard(pid) - if not taskp.get("task.ignore_rcode"): - rcode[pid] = (status >> 8) - else: - rcode[pid] = 0 - - if sig.sig is not None: - logger.critical("terminating on signal %s" % sig.sig) - sys.exit(2) - else: - for i in xrange(len(cmd)): - r = rcode[subprocesses[i].pid] - logger.info("%s completed with exit code %i (%s)" % (cmd[i][0], r, "success" if r == 0 else "failed")) - -except Exception as e: - logger.exception("caught exception") - -# restore default signal handlers. -signal.signal(signal.SIGINT, signal.SIG_DFL) -signal.signal(signal.SIGTERM, signal.SIG_DFL) -signal.signal(signal.SIGQUIT, signal.SIG_DFL) - -logger.info("the following output files will be saved to keep:") - -subprocess.call(["find", "-L", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir) - -logger.info("start writing output to keep") - -if "task.vwd" in taskp and "task.foreach" in jobp: - for root, dirs, files in os.walk(outdir): - for f in files: - s = os.lstat(os.path.join(root, f)) - if stat.S_ISLNK(s.st_mode): - os.unlink(os.path.join(root, f)) - -(outcollection, checkin_error) = vwd.checkin(outdir) - -# Success if we ran any subprocess, and they all exited 0. -success = rcode and all(status == 0 for status in rcode.itervalues()) and not checkin_error - -api.job_tasks().update(uuid=arvados.current_task()['uuid'], - body={ - 'output': outcollection.manifest_text(), - 'success': success, - 'progress':1.0 - }).execute() - -sys.exit(0 if success else 1) diff --git a/crunch_scripts/split-fastq.py b/crunch_scripts/split-fastq.py deleted file mode 100755 index 61c384fbf0..0000000000 --- a/crunch_scripts/split-fastq.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import re -import hashlib -import string - -api = arvados.api('v1') - -piece = 0 -manifest_text = "" - -# Look for paired reads - -inp = arvados.CollectionReader(arvados.getjobparam('reads')) - -manifest_list = [] - -def nextline(reader, start): - n = -1 - while True: - r = reader.readfrom(start, 128) - if r == '': - break - n = string.find(r, "\n") - if n > -1: - break - else: - start += 128 - return n - -prog = re.compile(r'(.*?)(_[12])?\.fastq(\.gz)?$') - -# Look for fastq files -for s in inp.all_streams(): - for f in s.all_files(): - name_pieces = prog.match(f.name()) - if name_pieces is not None: - if s.name() != ".": - # The downstream tool (run-command) only iterates over the top - # level of directories so if there are fastq files in - # directories in the input, the choice is either to forget - # there are directories (which might lead to name conflicts) or - # just fail. - print >>sys.stderr, "fastq must be at the root of the collection" - sys.exit(1) - - p = None - if name_pieces.group(2) is not None: - if name_pieces.group(2) == "_1": - p = [{}, {}] - p[0]["reader"] = s.files()[name_pieces.group(0)] - p[1]["reader"] = s.files()[name_pieces.group(1) + "_2.fastq" + (name_pieces.group(3) if name_pieces.group(3) else '')] - else: - p = [{}] - p[0]["reader"] = s.files()[name_pieces.group(0)] - - if p is not None: - for i in xrange(0, len(p)): - m = p[i]["reader"].as_manifest().split() - m[0] = "./_" + str(piece) - manifest_list.append(m) - piece += 1 - -manifest_text = "\n".join(" ".join(m) for m in manifest_list) + "\n" - -arvados.current_task().set_output(manifest_text) diff --git a/crunch_scripts/test/task_output_dir b/crunch_scripts/test/task_output_dir deleted file mode 100755 index 8b2c7ced47..0000000000 --- a/crunch_scripts/test/task_output_dir +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -import arvados -import arvados.crunch -import hashlib -import os - -out = arvados.crunch.TaskOutputDir() - -string = open(__file__).read() -with open(os.path.join(out.path, 'example.out'), 'w') as f: - f.write(string) -with open(os.path.join(out.path, 'example.out.SHA1'), 'w') as f: - f.write(hashlib.sha1(string).hexdigest() + "\n") - -arvados.current_task().set_output(out.manifest_text()) diff --git a/services/api/app/controllers/arvados/v1/job_tasks_controller.rb b/services/api/app/controllers/arvados/v1/job_tasks_controller.rb index 07bbc33ab2..b960d2e9e4 100644 --- a/services/api/app/controllers/arvados/v1/job_tasks_controller.rb +++ b/services/api/app/controllers/arvados/v1/job_tasks_controller.rb @@ -4,4 +4,9 @@ class Arvados::V1::JobTasksController < ApplicationController accept_attribute_as_json :parameters, Hash + + def create + return send_error("Unsupported legacy jobs API", + status: 400) + end end diff --git a/services/api/app/controllers/arvados/v1/jobs_controller.rb b/services/api/app/controllers/arvados/v1/jobs_controller.rb index c3655272dd..f6308c528f 100644 --- a/services/api/app/controllers/arvados/v1/jobs_controller.rb +++ b/services/api/app/controllers/arvados/v1/jobs_controller.rb @@ -13,115 +13,28 @@ class Arvados::V1::JobsController < ApplicationController include DbCurrentTime def create - [:repository, :script, :script_version, :script_parameters].each do |r| - if !resource_attrs[r] - return send_error("#{r} attribute must be specified", - status: :unprocessable_entity) - end - end - - # We used to ask for the minimum_, exclude_, and no_reuse params - # in the job resource. Now we advertise them as flags that alter - # the behavior of the create action. - [:minimum_script_version, :exclude_script_versions].each do |attr| - if resource_attrs.has_key? attr - params[attr] = resource_attrs.delete attr - end - end - if resource_attrs.has_key? :no_reuse - params[:find_or_create] = !resource_attrs.delete(:no_reuse) - end - - return super if !params[:find_or_create] - return if !load_filters_param - - begin - @object = Job.find_reusable(resource_attrs, params, @filters, @read_users) - rescue ArgumentError => error - return send_error(error.message) - end - - if @object - show - else - super - end + return send_error("Unsupported legacy jobs API", + status: 400) end def cancel - reload_object_before_update - @object.cancel cascade: params[:cascade] - show + return send_error("Unsupported legacy jobs API", + status: 400) end def lock - @object.lock current_user.uuid - show - end - - class LogStreamer - Q_UPDATE_INTERVAL = 12 - def initialize(job, opts={}) - @job = job - @opts = opts - end - def each - if @job.finished_at - yield "#{@job.uuid} finished at #{@job.finished_at}\n" - return - end - while not @job.started_at - # send a summary (job queue + available nodes) to the client - # every few seconds while waiting for the job to start - current_time = db_current_time - last_ack_at ||= current_time - Q_UPDATE_INTERVAL - 1 - if current_time - last_ack_at >= Q_UPDATE_INTERVAL - nodes_in_state = {idle: 0, alloc: 0} - ActiveRecord::Base.uncached do - Node.where('hostname is not ?', nil).collect do |n| - if n.info[:slurm_state] - nodes_in_state[n.info[:slurm_state]] ||= 0 - nodes_in_state[n.info[:slurm_state]] += 1 - end - end - end - job_queue = Job.queue.select(:uuid) - n_queued_before_me = 0 - job_queue.each do |j| - break if j.uuid == @job.uuid - n_queued_before_me += 1 - end - yield "#{db_current_time}" \ - " job #{@job.uuid}" \ - " queue_position #{n_queued_before_me}" \ - " queue_size #{job_queue.count}" \ - " nodes_idle #{nodes_in_state[:idle]}" \ - " nodes_alloc #{nodes_in_state[:alloc]}\n" - last_ack_at = db_current_time - end - sleep 3 - ActiveRecord::Base.uncached do - @job.reload - end - end - end + return send_error("Unsupported legacy jobs API", + status: 400) end def queue - params[:order] ||= ['priority desc', 'created_at'] - load_limit_offset_order_params - load_where_param - @where.merge!({state: Job::Queued}) - return if !load_filters_param - find_objects_for_index - index + return send_error("Unsupported legacy jobs API", + status: 400) end def queue_size - # Users may not be allowed to see all the jobs in the queue, so provide a - # method to get just the queue size in order to get a gist of how busy the - # cluster is. - render :json => {:queue_size => Job.queue.size} + return send_error("Unsupported legacy jobs API", + status: 400) end def self._create_requires_parameters diff --git a/services/api/app/controllers/arvados/v1/pipeline_instances_controller.rb b/services/api/app/controllers/arvados/v1/pipeline_instances_controller.rb index baffda1c99..166f71049b 100644 --- a/services/api/app/controllers/arvados/v1/pipeline_instances_controller.rb +++ b/services/api/app/controllers/arvados/v1/pipeline_instances_controller.rb @@ -7,9 +7,13 @@ class Arvados::V1::PipelineInstancesController < ApplicationController accept_attribute_as_json :properties, Hash accept_attribute_as_json :components_summary, Hash + def create + return send_error("Unsupported legacy jobs API", + status: 400) + end + def cancel - reload_object_before_update - @object.cancel cascade: params[:cascade] - show + return send_error("Unsupported legacy jobs API", + status: 400) end end diff --git a/services/api/app/controllers/arvados/v1/pipeline_templates_controller.rb b/services/api/app/controllers/arvados/v1/pipeline_templates_controller.rb index a276948d59..4a5e724ee6 100644 --- a/services/api/app/controllers/arvados/v1/pipeline_templates_controller.rb +++ b/services/api/app/controllers/arvados/v1/pipeline_templates_controller.rb @@ -4,4 +4,9 @@ class Arvados::V1::PipelineTemplatesController < ApplicationController accept_attribute_as_json :components, Hash + + def create + return send_error("Unsupported legacy jobs API", + status: 400) + end end diff --git a/services/api/lib/crunch_dispatch.rb b/services/api/lib/crunch_dispatch.rb deleted file mode 100644 index 4e640186d1..0000000000 --- a/services/api/lib/crunch_dispatch.rb +++ /dev/null @@ -1,981 +0,0 @@ -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: AGPL-3.0 - -require 'open3' -require 'shellwords' - -class CrunchDispatch - extend DbCurrentTime - include ApplicationHelper - include Process - - EXIT_TEMPFAIL = 75 - EXIT_RETRY_UNLOCKED = 93 - RETRY_UNLOCKED_LIMIT = 3 - - class LogTime < Time - def to_s - self.utc.strftime "%Y-%m-%d_%H:%M:%S" - end - end - - def initialize - @crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip) - if @crunch_job_bin.empty? - raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path." - end - - @docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN'] - @docker_run_args = ENV['CRUNCH_JOB_DOCKER_RUN_ARGS'] - @cgroup_root = ENV['CRUNCH_CGROUP_ROOT'] - @srun_sync_timeout = ENV['CRUNCH_SRUN_SYNC_TIMEOUT'] - - @arvados_internal = Rails.configuration.Containers.JobsAPI.GitInternalDir - if not File.exist? @arvados_internal - $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}` - raise "No internal git repository available" unless ($? == 0) - end - - @repo_root = Rails.configuration.Git.Repositories - @arvados_repo_path = Repository.where(name: "arvados").first.server_path - @authorizations = {} - @did_recently = {} - @fetched_commits = {} - @git_tags = {} - @node_state = {} - @pipe_auth_tokens = {} - @running = {} - @todo = [] - @todo_job_retries = {} - @job_retry_counts = Hash.new(0) - @todo_pipelines = [] - end - - def sysuser - return act_as_system_user - end - - def refresh_todo - if @runoptions[:jobs] - @todo = @todo_job_retries.values + Job.queue.select(&:repository) - end - if @runoptions[:pipelines] - @todo_pipelines = PipelineInstance.queue - end - end - - def each_slurm_line(cmd, outfmt, max_fields=nil) - max_fields ||= outfmt.split(":").size - max_fields += 1 # To accommodate the node field we add - @@slurm_version ||= Gem::Version.new(`sinfo --version`.match(/\b[\d\.]+\b/)[0]) - if Gem::Version.new('2.3') <= @@slurm_version - `#{cmd} --noheader -o '%n:#{outfmt}'`.each_line do |line| - yield line.chomp.split(":", max_fields) - end - else - # Expand rows with hostname ranges (like "foo[1-3,5,9-12]:idle") - # into multiple rows with one hostname each. - `#{cmd} --noheader -o '%N:#{outfmt}'`.each_line do |line| - tokens = line.chomp.split(":", max_fields) - if (re = tokens[0].match(/^(.*?)\[([-,\d]+)\]$/)) - tokens.shift - re[2].split(",").each do |range| - range = range.split("-").collect(&:to_i) - (range[0]..range[-1]).each do |n| - yield [re[1] + n.to_s] + tokens - end - end - else - yield tokens - end - end - end - end - - def slurm_status - slurm_nodes = {} - each_slurm_line("sinfo", "%t") do |hostname, state| - # Treat nodes in idle* state as down, because the * means that slurm - # hasn't been able to communicate with it recently. - state.sub!(/^idle\*/, "down") - state.sub!(/\W+$/, "") - state = "down" unless %w(idle alloc comp mix drng down).include?(state) - slurm_nodes[hostname] = {state: state, job: nil} - end - each_slurm_line("squeue", "%j") do |hostname, job_uuid| - slurm_nodes[hostname][:job] = job_uuid if slurm_nodes[hostname] - end - slurm_nodes - end - - def update_node_status - return unless Rails.configuration.Containers.JobsAPI.CrunchJobWrapper.to_s.match(/^slurm/) - slurm_status.each_pair do |hostname, slurmdata| - next if @node_state[hostname] == slurmdata - begin - node = Node.where('hostname=?', hostname).order(:last_ping_at).last - if node - $stderr.puts "dispatch: update #{hostname} state to #{slurmdata}" - node.info["slurm_state"] = slurmdata[:state] - node.job_uuid = slurmdata[:job] - if node.save - @node_state[hostname] = slurmdata - else - $stderr.puts "dispatch: failed to update #{node.uuid}: #{node.errors.messages}" - end - elsif slurmdata[:state] != 'down' - $stderr.puts "dispatch: SLURM reports '#{hostname}' is not down, but no node has that name" - end - rescue => error - $stderr.puts "dispatch: error updating #{hostname} node status: #{error}" - end - end - end - - def positive_int(raw_value, default=nil) - value = begin raw_value.to_i rescue 0 end - if value > 0 - value - else - default - end - end - - NODE_CONSTRAINT_MAP = { - # Map Job runtime_constraints keys to the corresponding Node info key. - 'min_ram_mb_per_node' => 'total_ram_mb', - 'min_scratch_mb_per_node' => 'total_scratch_mb', - 'min_cores_per_node' => 'total_cpu_cores', - } - - def nodes_available_for_job_now(job) - # Find Nodes that satisfy a Job's runtime constraints (by building - # a list of Procs and using them to test each Node). If there - # enough to run the Job, return an array of their names. - # Otherwise, return nil. - need_procs = NODE_CONSTRAINT_MAP.each_pair.map do |job_key, node_key| - Proc.new do |node| - positive_int(node.properties[node_key], 0) >= - positive_int(job.runtime_constraints[job_key], 0) - end - end - min_node_count = positive_int(job.runtime_constraints['min_nodes'], 1) - usable_nodes = [] - Node.all.select do |node| - node.info['slurm_state'] == 'idle' - end.sort_by do |node| - # Prefer nodes with no price, then cheap nodes, then expensive nodes - node.properties['cloud_node']['price'].to_f rescue 0 - end.each do |node| - if need_procs.select { |need_proc| not need_proc.call(node) }.any? - # At least one runtime constraint is not satisfied by this node - next - end - usable_nodes << node - if usable_nodes.count >= min_node_count - hostnames = usable_nodes.map(&:hostname) - log_nodes = usable_nodes.map do |n| - "#{n.hostname} #{n.uuid} #{n.properties.to_json}" - end - log_job = "#{job.uuid} #{job.runtime_constraints}" - log_text = "dispatching job #{log_job} to #{log_nodes.join(", ")}" - $stderr.puts log_text - begin - act_as_system_user do - Log.new(object_uuid: job.uuid, - event_type: 'dispatch', - owner_uuid: system_user_uuid, - summary: "dispatching to #{hostnames.join(", ")}", - properties: {'text' => log_text}).save! - end - rescue => e - $stderr.puts "dispatch: log.create failed: #{e}" - end - return hostnames - end - end - nil - end - - def nodes_available_for_job(job) - # Check if there are enough idle nodes with the Job's minimum - # hardware requirements to run it. If so, return an array of - # their names. If not, up to once per hour, signal start_jobs to - # hold off launching Jobs. This delay is meant to give the Node - # Manager an opportunity to make new resources available for new - # Jobs. - # - # The exact timing parameters here might need to be adjusted for - # the best balance between helping the longest-waiting Jobs run, - # and making efficient use of immediately available resources. - # These are all just first efforts until we have more data to work - # with. - nodelist = nodes_available_for_job_now(job) - if nodelist.nil? and not did_recently(:wait_for_available_nodes, 3600) - $stderr.puts "dispatch: waiting for nodes for #{job.uuid}" - @node_wait_deadline = Time.now + 5.minutes - end - nodelist - end - - def fail_job job, message, skip_lock: false - $stderr.puts "dispatch: #{job.uuid}: #{message}" - begin - Log.new(object_uuid: job.uuid, - event_type: 'dispatch', - owner_uuid: job.owner_uuid, - summary: message, - properties: {"text" => message}).save! - rescue => e - $stderr.puts "dispatch: log.create failed: #{e}" - end - - if not skip_lock and not have_job_lock?(job) - begin - job.lock @authorizations[job.uuid].user.uuid - rescue ArvadosModel::AlreadyLockedError - $stderr.puts "dispatch: tried to mark job #{job.uuid} as failed but it was already locked by someone else" - return - end - end - - job.state = "Failed" - if not job.save - $stderr.puts "dispatch: save failed setting job #{job.uuid} to failed" - end - end - - def stdout_s(cmd_a, opts={}) - IO.popen(cmd_a, "r", opts) do |pipe| - return pipe.read.chomp - end - end - - def git_cmd(*cmd_a) - ["git", "--git-dir=#{@arvados_internal}"] + cmd_a - end - - def get_authorization(job) - if @authorizations[job.uuid] and - @authorizations[job.uuid].user.uuid != job.modified_by_user_uuid - # We already made a token for this job, but we need a new one - # because modified_by_user_uuid has changed (the job will run - # as a different user). - @authorizations[job.uuid].update_attributes expires_at: Time.now - @authorizations[job.uuid] = nil - end - if not @authorizations[job.uuid] - auth = ApiClientAuthorization. - new(user: User.where('uuid=?', job.modified_by_user_uuid).first, - api_client_id: 0) - if not auth.save - $stderr.puts "dispatch: auth.save failed for #{job.uuid}" - else - @authorizations[job.uuid] = auth - end - end - @authorizations[job.uuid] - end - - def internal_repo_has_commit? sha1 - if (not @fetched_commits[sha1] and - sha1 == stdout_s(git_cmd("rev-list", "-n1", sha1), err: "/dev/null") and - $? == 0) - @fetched_commits[sha1] = true - end - return @fetched_commits[sha1] - end - - def get_commit src_repo, sha1 - return true if internal_repo_has_commit? sha1 - - # commit does not exist in internal repository, so import the - # source repository using git fetch-pack - cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo) - $stderr.puts "dispatch: #{cmd}" - $stderr.puts(stdout_s(cmd)) - @fetched_commits[sha1] = ($? == 0) - end - - def tag_commit(job, commit_hash, tag_name) - # @git_tags[T]==V if we know commit V has been tagged T in the - # arvados_internal repository. - if not @git_tags[tag_name] - cmd = git_cmd("tag", tag_name, commit_hash) - $stderr.puts "dispatch: #{cmd}" - $stderr.puts(stdout_s(cmd, err: "/dev/null")) - unless $? == 0 - # git tag failed. This may be because the tag already exists, so check for that. - tag_rev = stdout_s(git_cmd("rev-list", "-n1", tag_name)) - if $? == 0 - # We got a revision back - if tag_rev != commit_hash - # Uh oh, the tag doesn't point to the revision we were expecting. - # Someone has been monkeying with the job record and/or git. - fail_job job, "Existing tag #{tag_name} points to commit #{tag_rev} but expected commit #{commit_hash}" - return nil - end - # we're okay (fall through to setting @git_tags below) - else - # git rev-list failed for some reason. - fail_job job, "'git tag' for #{tag_name} failed but did not find any existing tag using 'git rev-list'" - return nil - end - end - # 'git tag' was successful, or there is an existing tag that points to the same revision. - @git_tags[tag_name] = commit_hash - elsif @git_tags[tag_name] != commit_hash - fail_job job, "Existing tag #{tag_name} points to commit #{@git_tags[tag_name]} but this job uses commit #{commit_hash}" - return nil - end - @git_tags[tag_name] - end - - def start_jobs - @todo.each do |job| - next if @running[job.uuid] - - cmd_args = nil - case Rails.configuration.Containers.JobsAPI.CrunchJobWrapper - when "none" - if @running.size > 0 - # Don't run more than one at a time. - return - end - cmd_args = [] - when "slurm_immediate" - nodelist = nodes_available_for_job(job) - if nodelist.nil? - if Time.now < @node_wait_deadline - break - else - next - end - end - cmd_args = ["salloc", - "--chdir=/", - "--immediate", - "--exclusive", - "--no-kill", - "--job-name=#{job.uuid}", - "--nodelist=#{nodelist.join(',')}"] - else - raise "Unknown crunch_job_wrapper: #{Rails.configuration.Containers.JobsAPI.CrunchJobWrapper}" - end - - cmd_args = sudo_preface + cmd_args - - next unless get_authorization job - - ready = internal_repo_has_commit? job.script_version - - if not ready - # Import the commit from the specified repository into the - # internal repository. This should have been done already when - # the job was created/updated; this code is obsolete except to - # avoid deployment races. Failing the job would be a - # reasonable thing to do at this point. - repo = Repository.where(name: job.repository).first - if repo.nil? or repo.server_path.nil? - fail_job job, "Repository #{job.repository} not found under #{@repo_root}" - next - end - ready &&= get_commit repo.server_path, job.script_version - ready &&= tag_commit job, job.script_version, job.uuid - end - - # This should be unnecessary, because API server does it during - # job create/update, but it's still not a bad idea to verify the - # tag is correct before starting the job: - ready &&= tag_commit job, job.script_version, job.uuid - - # The arvados_sdk_version doesn't support use of arbitrary - # remote URLs, so the requested version isn't necessarily copied - # into the internal repository yet. - if job.arvados_sdk_version - ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version - ready &&= tag_commit job, job.arvados_sdk_version, "#{job.uuid}-arvados-sdk" - end - - if not ready - fail_job job, "commit not present in internal repository" - next - end - - cmd_args += [@crunch_job_bin, - '--job-api-token', @authorizations[job.uuid].api_token, - '--job', job.uuid, - '--git-dir', @arvados_internal] - - if @cgroup_root - cmd_args += ['--cgroup-root', @cgroup_root] - end - - if @docker_bin - cmd_args += ['--docker-bin', @docker_bin] - end - - if @docker_run_args - cmd_args += ['--docker-run-args', @docker_run_args] - end - - if @srun_sync_timeout - cmd_args += ['--srun-sync-timeout', @srun_sync_timeout] - end - - if have_job_lock?(job) - cmd_args << "--force-unlock" - end - - $stderr.puts "dispatch: #{cmd_args.join ' '}" - - begin - i, o, e, t = Open3.popen3(*cmd_args) - rescue - $stderr.puts "dispatch: popen3: #{$!}" - # This is a dispatch problem like "Too many open files"; - # retrying another job right away would be futile. Just return - # and hope things are better next time, after (at least) a - # did_recently() delay. - return - end - - $stderr.puts "dispatch: job #{job.uuid}" - start_banner = "dispatch: child #{t.pid} start #{LogTime.now}" - $stderr.puts start_banner - - @running[job.uuid] = { - stdin: i, - stdout: o, - stderr: e, - wait_thr: t, - job: job, - buf: {stderr: '', stdout: ''}, - started: false, - sent_int: 0, - job_auth: @authorizations[job.uuid], - stderr_buf_to_flush: '', - stderr_flushed_at: Time.new(0), - bytes_logged: 0, - events_logged: 0, - log_throttle_is_open: true, - log_throttle_reset_time: Time.now + Rails.configuration.Containers.Logging.LogThrottlePeriod, - log_throttle_bytes_so_far: 0, - log_throttle_lines_so_far: 0, - log_throttle_bytes_skipped: 0, - log_throttle_partial_line_last_at: Time.new(0), - log_throttle_first_partial_line: true, - } - i.close - @todo_job_retries.delete(job.uuid) - update_node_status - end - end - - # Test for hard cap on total output and for log throttling. Returns whether - # the log line should go to output or not. Modifies "line" in place to - # replace it with an error if a logging limit is tripped. - def rate_limit running_job, line - message = false - linesize = line.size - if running_job[:log_throttle_is_open] - partial_line = false - skip_counts = false - matches = line.match(/^\S+ \S+ \d+ \d+ stderr (.*)/) - if matches and matches[1] and matches[1].start_with?('[...]') and matches[1].end_with?('[...]') - partial_line = true - if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.Containers.Logging.LogPartialLineThrottlePeriod - running_job[:log_throttle_partial_line_last_at] = Time.now - else - skip_counts = true - end - end - - if !skip_counts - running_job[:log_throttle_lines_so_far] += 1 - running_job[:log_throttle_bytes_so_far] += linesize - running_job[:bytes_logged] += linesize - end - - if (running_job[:bytes_logged] > - Rails.configuration.Containers.Logging.LimitLogBytesPerJob) - message = "Exceeded log limit #{Rails.configuration.Containers.Logging.LimitLogBytesPerJob} bytes (LimitLogBytesPerJob). Log will be truncated." - running_job[:log_throttle_reset_time] = Time.now + 100.years - running_job[:log_throttle_is_open] = false - - elsif (running_job[:log_throttle_bytes_so_far] > - Rails.configuration.Containers.Logging.LogThrottleBytes) - remaining_time = running_job[:log_throttle_reset_time] - Time.now - message = "Exceeded rate #{Rails.configuration.Containers.Logging.LogThrottleBytes} bytes per #{Rails.configuration.Containers.Logging.LogThrottlePeriod} seconds (LogThrottleBytes). Logging will be silenced for the next #{remaining_time.round} seconds." - running_job[:log_throttle_is_open] = false - - elsif (running_job[:log_throttle_lines_so_far] > - Rails.configuration.Containers.Logging.LogThrottleLines) - remaining_time = running_job[:log_throttle_reset_time] - Time.now - message = "Exceeded rate #{Rails.configuration.Containers.Logging.LogThrottleLines} lines per #{Rails.configuration.Containers.Logging.LogThrottlePeriod} seconds (LogThrottleLines), logging will be silenced for the next #{remaining_time.round} seconds." - running_job[:log_throttle_is_open] = false - - elsif partial_line and running_job[:log_throttle_first_partial_line] - running_job[:log_throttle_first_partial_line] = false - message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.Containers.Logging.LogPartialLineThrottlePeriod} seconds." - end - end - - if not running_job[:log_throttle_is_open] - # Don't log anything if any limit has been exceeded. Just count lossage. - running_job[:log_throttle_bytes_skipped] += linesize - end - - if message - # Yes, write to logs, but use our "rate exceeded" message - # instead of the log message that exceeded the limit. - message += " A complete log is still being written to Keep, and will be available when the job finishes.\n" - line.replace message - true - elsif partial_line - false - else - running_job[:log_throttle_is_open] - end - end - - def read_pipes - @running.each do |job_uuid, j| - now = Time.now - if now > j[:log_throttle_reset_time] - # It has been more than throttle_period seconds since the last - # checkpoint so reset the throttle - if j[:log_throttle_bytes_skipped] > 0 - message = "#{job_uuid} ! Skipped #{j[:log_throttle_bytes_skipped]} bytes of log" - $stderr.puts message - j[:stderr_buf_to_flush] << "#{LogTime.now} #{message}\n" - end - - j[:log_throttle_reset_time] = now + Rails.configuration.Containers.Logging.LogThrottlePeriod - j[:log_throttle_bytes_so_far] = 0 - j[:log_throttle_lines_so_far] = 0 - j[:log_throttle_bytes_skipped] = 0 - j[:log_throttle_is_open] = true - j[:log_throttle_partial_line_last_at] = Time.new(0) - j[:log_throttle_first_partial_line] = true - end - - j[:buf].each do |stream, streambuf| - # Read some data from the child stream - buf = '' - begin - # It's important to use a big enough buffer here. When we're - # being flooded with logs, we must read and discard many - # bytes at once. Otherwise, we can easily peg a CPU with - # time-checking and other loop overhead. (Quick tests show a - # 1MiB buffer working 2.5x as fast as a 64 KiB buffer.) - # - # So don't reduce this buffer size! - buf = j[stream].read_nonblock(2**20) - rescue Errno::EAGAIN, EOFError - end - - # Short circuit the counting code if we're just going to throw - # away the data anyway. - if not j[:log_throttle_is_open] - j[:log_throttle_bytes_skipped] += streambuf.size + buf.size - streambuf.replace '' - next - elsif buf == '' - next - end - - # Append to incomplete line from previous read, if any - streambuf << buf - - bufend = '' - streambuf.each_line do |line| - if not line.end_with? $/ - if line.size > Rails.configuration.Containers.Logging.LogThrottleBytes - # Without a limit here, we'll use 2x an arbitrary amount - # of memory, and waste a lot of time copying strings - # around, all without providing any feedback to anyone - # about what's going on _or_ hitting any of our throttle - # limits. - # - # Here we leave "line" alone, knowing it will never be - # sent anywhere: rate_limit() will reach - # crunch_log_throttle_bytes immediately. However, we'll - # leave [...] in bufend: if the trailing end of the long - # line does end up getting sent anywhere, it will have - # some indication that it is incomplete. - bufend = "[...]" - else - # If line length is sane, we'll wait for the rest of the - # line to appear in the next read_pipes() call. - bufend = line - break - end - end - # rate_limit returns true or false as to whether to actually log - # the line or not. It also modifies "line" in place to replace - # it with an error if a logging limit is tripped. - if rate_limit j, line - $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) - $stderr.puts line - pub_msg = "#{LogTime.now} #{line.strip}\n" - j[:stderr_buf_to_flush] << pub_msg - end - end - - # Leave the trailing incomplete line (if any) in streambuf for - # next time. - streambuf.replace bufend - end - # Flush buffered logs to the logs table, if appropriate. We have - # to do this even if we didn't collect any new logs this time: - # otherwise, buffered data older than seconds_between_events - # won't get flushed until new data arrives. - write_log j - end - end - - def reap_children - return if 0 == @running.size - pid_done = nil - j_done = nil - - @running.each do |uuid, j| - if !j[:wait_thr].status - pid_done = j[:wait_thr].pid - j_done = j - break - end - end - - return if !pid_done - - job_done = j_done[:job] - - # Ensure every last drop of stdout and stderr is consumed. - read_pipes - # Reset flush timestamp to make sure log gets written. - j_done[:stderr_flushed_at] = Time.new(0) - # Write any remaining logs. - write_log j_done - - j_done[:buf].each do |stream, streambuf| - if streambuf != '' - $stderr.puts streambuf + "\n" - end - end - - # Wait the thread (returns a Process::Status) - exit_status = j_done[:wait_thr].value.exitstatus - exit_tempfail = exit_status == EXIT_TEMPFAIL - - $stderr.puts "dispatch: child #{pid_done} exit #{exit_status}" - $stderr.puts "dispatch: job #{job_done.uuid} end" - - jobrecord = Job.find_by_uuid(job_done.uuid) - - if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid) - $stderr.puts("dispatch: job #{jobrecord.uuid} was interrupted by node failure") - # Only this crunch-dispatch process can retry the job: - # it's already locked, and there's no way to put it back in the - # Queued state. Put it in our internal todo list unless the job - # has failed this way excessively. - @job_retry_counts[jobrecord.uuid] += 1 - exit_tempfail = @job_retry_counts[jobrecord.uuid] <= RETRY_UNLOCKED_LIMIT - do_what_next = "give up now" - if exit_tempfail - @todo_job_retries[jobrecord.uuid] = jobrecord - do_what_next = "re-attempt" - end - $stderr.puts("dispatch: job #{jobrecord.uuid} has been interrupted " + - "#{@job_retry_counts[jobrecord.uuid]}x, will #{do_what_next}") - end - - if !exit_tempfail - @job_retry_counts.delete(jobrecord.uuid) - if jobrecord.state == "Running" - # Apparently there was an unhandled error. That could potentially - # include "all allocated nodes failed" when we don't to retry - # because the job has already been retried RETRY_UNLOCKED_LIMIT - # times. Fail the job. - jobrecord.state = "Failed" - if not jobrecord.save - $stderr.puts "dispatch: jobrecord.save failed" - end - end - else - # If the job failed to run due to an infrastructure - # issue with crunch-job or slurm, we want the job to stay in the - # queue. If crunch-job exited after losing a race to another - # crunch-job process, it exits 75 and we should leave the job - # record alone so the winner of the race can do its thing. - # If crunch-job exited after all of its allocated nodes failed, - # it exits 93, and we want to retry it later (see the - # EXIT_RETRY_UNLOCKED `if` block). - # - # There is still an unhandled race condition: If our crunch-job - # process is about to lose a race with another crunch-job - # process, but crashes before getting to its "exit 75" (for - # example, "cannot fork" or "cannot reach API server") then we - # will assume incorrectly that it's our process's fault - # jobrecord.started_at is non-nil, and mark the job as failed - # even though the winner of the race is probably still doing - # fine. - end - - # Invalidate the per-job auth token, unless the job is still queued and we - # might want to try it again. - if jobrecord.state != "Queued" and !@todo_job_retries.include?(jobrecord.uuid) - j_done[:job_auth].update_attributes expires_at: Time.now - end - - @running.delete job_done.uuid - end - - def update_pipelines - expire_tokens = @pipe_auth_tokens.dup - @todo_pipelines.each do |p| - pipe_auth = (@pipe_auth_tokens[p.uuid] ||= ApiClientAuthorization. - create(user: User.where('uuid=?', p.modified_by_user_uuid).first, - api_client_id: 0)) - puts `export ARVADOS_API_TOKEN=#{pipe_auth.api_token} && arv-run-pipeline-instance --run-pipeline-here --no-wait --instance #{p.uuid}` - expire_tokens.delete p.uuid - end - - expire_tokens.each do |k, v| - v.update_attributes expires_at: Time.now - @pipe_auth_tokens.delete k - end - end - - def parse_argv argv - @runoptions = {} - (argv.any? ? argv : ['--jobs', '--pipelines']).each do |arg| - case arg - when '--jobs' - @runoptions[:jobs] = true - when '--pipelines' - @runoptions[:pipelines] = true - else - abort "Unrecognized command line option '#{arg}'" - end - end - if not (@runoptions[:jobs] or @runoptions[:pipelines]) - abort "Nothing to do. Please specify at least one of: --jobs, --pipelines." - end - end - - def run argv - parse_argv argv - - # We want files written by crunch-dispatch to be writable by other - # processes with the same GID, see bug #7228 - File.umask(0002) - - # This is how crunch-job child procs know where the "refresh" - # trigger file is - ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.Containers.JobsAPI.CrunchRefreshTrigger - - # If salloc can't allocate resources immediately, make it use our - # temporary failure exit code. This ensures crunch-dispatch won't - # mark a job failed because of an issue with node allocation. - # This often happens when another dispatcher wins the race to - # allocate nodes. - ENV["SLURM_EXIT_IMMEDIATE"] = CrunchDispatch::EXIT_TEMPFAIL.to_s - - if ENV["CRUNCH_DISPATCH_LOCKFILE"] - lockfilename = ENV.delete "CRUNCH_DISPATCH_LOCKFILE" - lockfile = File.open(lockfilename, File::RDWR|File::CREAT, 0644) - unless lockfile.flock File::LOCK_EX|File::LOCK_NB - abort "Lock unavailable on #{lockfilename} - exit" - end - end - - @signal = {} - %w{TERM INT}.each do |sig| - signame = sig - Signal.trap(sig) do - $stderr.puts "Received #{signame} signal" - @signal[:term] = true - end - end - - act_as_system_user - User.first.group_permissions - $stderr.puts "dispatch: ready" - while !@signal[:term] or @running.size > 0 - read_pipes - if @signal[:term] - @running.each do |uuid, j| - if !j[:started] and j[:sent_int] < 2 - begin - Process.kill 'INT', j[:wait_thr].pid - rescue Errno::ESRCH - # No such pid = race condition + desired result is - # already achieved - end - j[:sent_int] += 1 - end - end - else - refresh_todo unless did_recently(:refresh_todo, 1.0) - update_node_status unless did_recently(:update_node_status, 1.0) - unless @todo.empty? or did_recently(:start_jobs, 1.0) or @signal[:term] - start_jobs - end - unless (@todo_pipelines.empty? and @pipe_auth_tokens.empty?) or did_recently(:update_pipelines, 5.0) - update_pipelines - end - unless did_recently('check_orphaned_slurm_jobs', 60) - check_orphaned_slurm_jobs - end - end - reap_children - select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten, - [], [], 1) - end - # If there are jobs we wanted to retry, we have to mark them as failed now. - # Other dispatchers can't pick them up because we hold their lock. - @todo_job_retries.each_key do |job_uuid| - job = Job.find_by_uuid(job_uuid) - if job.state == "Running" - fail_job(job, "crunch-dispatch was stopped during job's tempfail retry loop") - end - end - end - - def fail_jobs before: nil - act_as_system_user do - threshold = nil - if before == 'reboot' - boottime = nil - open('/proc/stat').map(&:split).each do |stat, t| - if stat == 'btime' - boottime = t - end - end - if not boottime - raise "Could not find btime in /proc/stat" - end - threshold = Time.at(boottime.to_i) - elsif before - threshold = Time.parse(before, Time.now) - else - threshold = db_current_time - end - Rails.logger.info "fail_jobs: threshold is #{threshold}" - - squeue = squeue_jobs - Job.where('state = ? and started_at < ?', Job::Running, threshold). - each do |job| - Rails.logger.debug "fail_jobs: #{job.uuid} started #{job.started_at}" - squeue.each do |slurm_name| - if slurm_name == job.uuid - Rails.logger.info "fail_jobs: scancel #{job.uuid}" - scancel slurm_name - end - end - fail_job(job, "cleaned up stale job: started before #{threshold}", - skip_lock: true) - end - end - end - - def check_orphaned_slurm_jobs - act_as_system_user do - squeue_uuids = squeue_jobs.select{|uuid| uuid.match(/^[0-9a-z]{5}-8i9sb-[0-9a-z]{15}$/)}. - select{|uuid| !@running.has_key?(uuid)} - - return if squeue_uuids.size == 0 - - scancel_uuids = squeue_uuids - Job.where('uuid in (?) and (state in (?) or modified_at>?)', - squeue_uuids, - ['Running', 'Queued'], - (Time.now - 60)). - collect(&:uuid) - scancel_uuids.each do |uuid| - Rails.logger.info "orphaned job: scancel #{uuid}" - scancel uuid - end - end - end - - def sudo_preface - return [] if not Rails.configuration.Containers.JobsAPI.CrunchJobUser - ["sudo", "-E", "-u", - Rails.configuration.Containers.JobsAPI.CrunchJobUser, - "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}", - "PATH=#{ENV['PATH']}", - "PERLLIB=#{ENV['PERLLIB']}", - "PYTHONPATH=#{ENV['PYTHONPATH']}", - "RUBYLIB=#{ENV['RUBYLIB']}", - "GEM_PATH=#{ENV['GEM_PATH']}"] - end - - protected - - def have_job_lock?(job) - # Return true if the given job is locked by this crunch-dispatch, normally - # because we've run crunch-job for it. - @todo_job_retries.include?(job.uuid) - end - - def did_recently(thing, min_interval) - if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval - @did_recently[thing] = Time.now - false - else - true - end - end - - # send message to log table. we want these records to be transient - def write_log running_job - return if running_job[:stderr_buf_to_flush] == '' - - # Send out to log event if buffer size exceeds the bytes per event or if - # it has been at least crunch_log_seconds_between_events seconds since - # the last flush. - if running_job[:stderr_buf_to_flush].size > Rails.configuration.Containers.Logging.LogBytesPerEvent or - (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.Containers.Logging.LogSecondsBetweenEvents - begin - log = Log.new(object_uuid: running_job[:job].uuid, - event_type: 'stderr', - owner_uuid: running_job[:job].owner_uuid, - properties: {"text" => running_job[:stderr_buf_to_flush]}) - log.save! - running_job[:events_logged] += 1 - rescue => exception - $stderr.puts "Failed to write logs" - $stderr.puts exception.backtrace - end - running_job[:stderr_buf_to_flush] = '' - running_job[:stderr_flushed_at] = Time.now - end - end - - # An array of job_uuids in squeue - def squeue_jobs - if Rails.configuration.Containers.JobsAPI.CrunchJobWrapper == "slurm_immediate" - p = IO.popen(['squeue', '-a', '-h', '-o', '%j']) - begin - p.readlines.map {|line| line.strip} - ensure - p.close - end - else - [] - end - end - - def scancel slurm_name - cmd = sudo_preface + ['scancel', '-n', slurm_name] - IO.popen(cmd) do |scancel_pipe| - puts scancel_pipe.read - end - if not $?.success? - Rails.logger.error "scancel #{slurm_name.shellescape}: $?" - end - end -end diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb deleted file mode 100755 index 38bd54b5c3..0000000000 --- a/services/api/script/crunch-dispatch.rb +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env ruby -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: AGPL-3.0 - -dispatch_argv = [] -ARGV.reject! do |arg| - dispatch_argv.push(arg) if /^--/ =~ arg -end - -ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development" -require File.dirname(__FILE__) + '/../config/boot' -require File.dirname(__FILE__) + '/../config/environment' -require './lib/crunch_dispatch.rb' - -CrunchDispatch.new.run dispatch_argv diff --git a/services/api/script/crunch_failure_report.py b/services/api/script/crunch_failure_report.py deleted file mode 100755 index 83217d8513..0000000000 --- a/services/api/script/crunch_failure_report.py +++ /dev/null @@ -1,222 +0,0 @@ -#! /usr/bin/env python -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: AGPL-3.0 - -import argparse -import datetime -import json -import re -import sys - -import arvados - -# Useful configuration variables: - -# Number of log lines to use as context in diagnosing failure. -LOG_CONTEXT_LINES = 10 - -# Regex that signifies a failed task. -FAILED_TASK_REGEX = re.compile(' \d+ failure (.*permanent)') - -# Regular expressions used to classify failure types. -JOB_FAILURE_TYPES = { - 'sys/docker': 'Cannot destroy container', - 'crunch/node': 'User not found on host', - 'slurm/comm': 'Communication connection failure' -} - -def parse_arguments(arguments): - arg_parser = argparse.ArgumentParser( - description='Produce a report of Crunch failures within a specified time range') - - arg_parser.add_argument( - '--start', - help='Start date and time') - arg_parser.add_argument( - '--end', - help='End date and time') - - args = arg_parser.parse_args(arguments) - - if args.start and not is_valid_timestamp(args.start): - raise ValueError(args.start) - if args.end and not is_valid_timestamp(args.end): - raise ValueError(args.end) - - return args - - -def api_timestamp(when=None): - """Returns a string representing the timestamp 'when' in a format - suitable for delivering to the API server. Defaults to the - current time. - """ - if when is None: - when = datetime.datetime.utcnow() - return when.strftime("%Y-%m-%dT%H:%M:%SZ") - - -def is_valid_timestamp(ts): - return re.match(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z', ts) - - -def jobs_created_between_dates(api, start, end): - return arvados.util.list_all( - api.jobs().list, - filters=json.dumps([ ['created_at', '>=', start], - ['created_at', '<=', end] ])) - - -def job_logs(api, job): - # Returns the contents of the log for this job (as an array of lines). - if job['log']: - log_collection = arvados.CollectionReader(job['log'], api) - log_filename = "{}.log.txt".format(job['uuid']) - return log_collection.open(log_filename).readlines() - return [] - - -user_names = {} -def job_user_name(api, user_uuid): - def _lookup_user_name(api, user_uuid): - try: - return api.users().get(uuid=user_uuid).execute()['full_name'] - except arvados.errors.ApiError: - return user_uuid - - if user_uuid not in user_names: - user_names[user_uuid] = _lookup_user_name(api, user_uuid) - return user_names[user_uuid] - - -job_pipeline_names = {} -def job_pipeline_name(api, job_uuid): - def _lookup_pipeline_name(api, job_uuid): - try: - pipelines = api.pipeline_instances().list( - filters='[["components", "like", "%{}%"]]'.format(job_uuid)).execute() - pi = pipelines['items'][0] - if pi['name']: - return pi['name'] - else: - # Use the pipeline template name - pt = api.pipeline_templates().get(uuid=pi['pipeline_template_uuid']).execute() - return pt['name'] - except (TypeError, ValueError, IndexError): - return "" - - if job_uuid not in job_pipeline_names: - job_pipeline_names[job_uuid] = _lookup_pipeline_name(api, job_uuid) - return job_pipeline_names[job_uuid] - - -def is_failed_task(logline): - return FAILED_TASK_REGEX.search(logline) != None - - -def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): - args = parse_arguments(arguments) - - api = arvados.api('v1') - - now = datetime.datetime.utcnow() - start_time = args.start or api_timestamp(now - datetime.timedelta(days=1)) - end_time = args.end or api_timestamp(now) - - # Find all jobs created within the specified window, - # and their corresponding job logs. - jobs_created = jobs_created_between_dates(api, start_time, end_time) - jobs_by_state = {} - for job in jobs_created: - jobs_by_state.setdefault(job['state'], []) - jobs_by_state[job['state']].append(job) - - # Find failed jobs and record the job failure text. - - # failure_stats maps failure types (e.g. "sys/docker") to - # a set of job UUIDs that failed for that reason. - failure_stats = {} - for job in jobs_by_state['Failed']: - job_uuid = job['uuid'] - logs = job_logs(api, job) - # Find the first permanent task failure, and collect the - # preceding log lines. - failure_type = None - for i, lg in enumerate(logs): - if is_failed_task(lg): - # Get preceding log record to provide context. - log_start = i - LOG_CONTEXT_LINES if i >= LOG_CONTEXT_LINES else 0 - log_end = i + 1 - lastlogs = ''.join(logs[log_start:log_end]) - # try to identify the type of failure. - for key, rgx in JOB_FAILURE_TYPES.iteritems(): - if re.search(rgx, lastlogs): - failure_type = key - break - if failure_type is not None: - break - if failure_type is None: - failure_type = 'unknown' - failure_stats.setdefault(failure_type, set()) - failure_stats[failure_type].add(job_uuid) - - # Report percentages of successful, failed and unfinished jobs. - print "Start: {:20s}".format(start_time) - print "End: {:20s}".format(end_time) - print "" - - print "Overview" - print "" - - job_start_count = len(jobs_created) - print " {: <25s} {:4d}".format('Started', job_start_count) - for state in ['Complete', 'Failed', 'Queued', 'Cancelled', 'Running']: - if state in jobs_by_state: - job_count = len(jobs_by_state[state]) - job_percentage = job_count / float(job_start_count) - print " {: <25s} {:4d} ({: >4.0%})".format(state, - job_count, - job_percentage) - print "" - - # Report failure types. - failure_summary = "" - failure_detail = "" - - # Generate a mapping from failed job uuids to job records, to assist - # in generating detailed statistics for job failures. - jobs_failed_map = { job['uuid']: job for job in jobs_by_state.get('Failed', []) } - - # sort the failure stats in descending order by occurrence. - sorted_failures = sorted(failure_stats, - reverse=True, - key=lambda failure_type: len(failure_stats[failure_type])) - for failtype in sorted_failures: - job_uuids = failure_stats[failtype] - failstat = " {: <25s} {:4d} ({: >4.0%})\n".format( - failtype, - len(job_uuids), - len(job_uuids) / float(len(jobs_by_state['Failed']))) - failure_summary = failure_summary + failstat - failure_detail = failure_detail + failstat - for j in job_uuids: - job_info = jobs_failed_map[j] - job_owner = job_user_name(api, job_info['modified_by_user_uuid']) - job_name = job_pipeline_name(api, job_info['uuid']) - failure_detail = failure_detail + " {} {: <15.15s} {:29.29s}\n".format(j, job_owner, job_name) - failure_detail = failure_detail + "\n" - - print "Failures by class" - print "" - print failure_summary - - print "Failures by class (detail)" - print "" - print failure_detail - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/services/api/script/fail-jobs.rb b/services/api/script/fail-jobs.rb deleted file mode 100755 index e52bfc075d..0000000000 --- a/services/api/script/fail-jobs.rb +++ /dev/null @@ -1,21 +0,0 @@ -#!/usr/bin/env ruby -# Copyright (C) The Arvados Authors. All rights reserved. -# -# SPDX-License-Identifier: AGPL-3.0 - -require 'optimist' - -opts = Optimist::options do - banner 'Fail jobs that have state=="Running".' - banner 'Options:' - opt(:before, - 'fail only jobs that started before the given time (or "reboot")', - type: :string) -end - -ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development" -require File.dirname(__FILE__) + '/../config/boot' -require File.dirname(__FILE__) + '/../config/environment' -require Rails.root.join('lib/crunch_dispatch.rb') - -CrunchDispatch.new.fail_jobs before: opts[:before] -- 2.30.2