#!/usr/bin/env ruby
-
-# == Synopsis
-#
-# arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
-# arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
-#
-# Satisfy a pipeline template by finding or submitting a mapreduce job
-# for each pipeline component.
-#
-# == Options
-#
-# [--template uuid] Use the specified pipeline template.
-#
-# [--template path] Load the pipeline template from the specified
-# local file.
-#
-# [--instance uuid] Use the specified pipeline instance.
-#
-# [-n, --dry-run] Do not start any new jobs or wait for existing jobs
-# to finish. Just find out whether jobs are finished,
-# queued, or running for each component
-#
-# [--submit] Do not try to satisfy any components. Just
-# create an instance, print its UUID to
-# stdout, and exit.
-#
-# [--no-wait] Make only as much progress as possible without entering
-# a sleep/poll loop.
-#
-# [--no-reuse] Do not reuse existing jobs to satisfy pipeline
-# components. Submit a new job for every component.
-#
-# [--debug] Print extra debugging information on stderr.
-#
-# [--debug-level N] Increase amount of debugging information. Default
-# 1, possible range 0..3.
-#
-# [--status-text path] Print plain text status report to a file or
-# fifo. Default: /dev/stdout
-#
-# [--status-json path] Print JSON status report to a file or
-# fifo. Default: /dev/null
-#
-# [--description] Description for the pipeline instance.
-#
-# == Parameters
-#
-# [param_name=param_value]
-#
-# [param_name param_value] Set (or override) the default value for
-# every parameter with the given name.
-#
-# [component_name::param_name=param_value]
-# [component_name::param_name param_value]
-# [--component_name::param_name=param_value]
-# [--component_name::param_name param_value] Set the value of a
-# parameter for a single
-# component.
+# Copyright (C) The Arvados Authors. All rights reserved.
#
+# SPDX-License-Identifier: Apache-2.0
+
class WhRunPipelineInstance
end
require 'trollop'
require 'google/api_client'
rescue LoadError => l
- puts $:
+ $stderr.puts $:
abort <<-EOS
#{$0}: fatal: #{l.message}
Some runtime dependencies may be missing.
p = Trollop::Parser.new do
version __FILE__
+ banner(<<EOF)
+
+Usage:
+ arv-run-pipeline-instance --template TEMPLATE_UUID [options] [--] [parameters]
+ arv-run-pipeline-instance --instance INSTANCE_UUID [options] [--] [parameters]
+
+Parameters:
+ param_name=param_value
+ param_name param_value
+ Set (or override) the default value for every
+ pipeline component parameter with the given
+ name.
+
+ component_name::param_name=param_value
+ component_name::param_name param_value
+ --component_name::param_name=param_value
+ --component_name::param_name param_value
+ Set the value of a parameter for a single
+ pipeline component.
+
+Options:
+EOF
opt(:dry_run,
"Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
:type => :boolean,
abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
end
elsif not $options[:template]
- puts "error: you must supply a --template or --instance."
+ $stderr.puts "error: you must supply a --template or --instance."
p.educate
abort
end
[]
end
end
+
+ # create() returns [job, exception]. If both job and exception are
+ # nil, there was a non-retryable error and the call should not be
+ # attempted again.
def self.create(pipeline, component, job, create_params)
@cache ||= {}
body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
- result = $client.execute(:api_method => $arvados.jobs.create,
- :body_object => body,
- :authenticated => false,
- :headers => {
- authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
- })
- j = JSON.parse result.body, :symbolize_names => true
- if j.is_a? Hash and j[:uuid]
+ result = nil
+ begin
+ result = $client.execute(
+ :api_method => $arvados.jobs.create,
+ :body_object => body,
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
+ })
+ if result.status == 429 || result.status >= 500
+ raise Exception.new("HTTP status #{result.status}")
+ end
+ rescue Exception => e
+ return nil, e
+ end
+ j = JSON.parse(result.body, :symbolize_names => true) rescue nil
+ if result.status == 200 && j.is_a?(Hash) && j[:uuid]
@cache[j[:uuid]] = j
+ return j, nil
else
- debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
+ errors = j[:errors] rescue []
+ debuglog "create job: [#{result.status}] #{errors.inspect} with attributes #{body}", 0
msg = ""
- j[:errors].each do |err|
+ errors.each do |err|
msg += "Error creating job for component #{component}: #{err}\n"
end
msg += "Job submission was: #{body.to_json}"
pipeline.log_stderr(msg)
- nil
+ return nil, nil
end
end
@components.each do |componentname, component|
component[:script_parameters].each do |parametername, parameter|
parameter = { :value => parameter } unless parameter.is_a? Hash
- value =
- (params["#{componentname}::#{parametername}"] ||
- parameter[:value] ||
- (parameter[:output_of].nil? &&
- (params[parametername.to_s] ||
- parameter[:default])) ||
- nil)
- if value.nil? and
- ![false,'false',0,'0'].index parameter[:required]
- if parameter[:output_of]
- if not @components[parameter[:output_of].intern]
- errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
- end
- next
+ if params.has_key?("#{componentname}::#{parametername}")
+ value = params["#{componentname}::#{parametername}"]
+ elsif parameter.has_key?(:value)
+ value = parameter[:value]
+ elsif parameter.has_key?(:output_of)
+ if !@components[parameter[:output_of].intern]
+ errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
+ else
+ # value will be filled in later when the upstream
+ # component's output becomes known
end
+ next
+ elsif params.has_key?(parametername.to_s)
+ value = params[parametername.to_s]
+ elsif parameter.has_key?(:default)
+ value = parameter[:default]
+ elsif [false, 'false', 0, '0'].index(parameter[:required])
+ value = nil
+ else
errors << [componentname, parametername, "required parameter is missing"]
+ next
end
debuglog "parameter #{componentname}::#{parametername} == #{value}"
end
end
if !errors.empty?
- abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
+ all_errors = errors.collect do |c,p,e|
+ "#{c}::#{p} - #{e}\n"
+ end.join("")
+ abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{all_errors}"
end
debuglog "options=" + @options.pretty_inspect
self
# are fully specified (any output_of script_parameters are resolved
# to real value)
my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
- job = JobCache.create(@instance, cname, {
+ job, err = JobCache.create(@instance, cname, {
:script => c[:script],
:script_parameters => Hash[c[:script_parameters].map do |key, spec|
[key, spec[:value]]
c[:job] = job
c[:run_in_process] = (@options[:run_jobs_here] and
job[:submit_id] == my_submit_id)
- else
+ elsif err.nil?
debuglog "component #{cname} new job failed", 0
job_creation_failed += 1
+ else
+ debuglog "component #{cname} new job failed, err=#{err}", 0
end
end
@instance[:state] = 'Complete'
else
@instance[:state] = 'Paused'
- end
+ end
else
if ended == @components.length or failed > 0
@instance[:state] = success ? 'Complete' : 'Failed'