X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/593ed179715e007763a027919b38b69b8bb7d59d..c7399ec7afdf0cfdd0f3177f410f102083a26e15:/sdk/cli/bin/arv-run-pipeline-instance diff --git a/sdk/cli/bin/arv-run-pipeline-instance b/sdk/cli/bin/arv-run-pipeline-instance index 70e2f42ede..b66e9c0526 100755 --- a/sdk/cli/bin/arv-run-pipeline-instance +++ b/sdk/cli/bin/arv-run-pipeline-instance @@ -1,4 +1,7 @@ #!/usr/bin/env ruby +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 class WhRunPipelineInstance end @@ -17,7 +20,7 @@ begin require 'trollop' require 'google/api_client' rescue LoadError => l - puts $: + $stderr.puts $: abort <<-EOS #{$0}: fatal: #{l.message} Some runtime dependencies may be missing. @@ -132,7 +135,7 @@ if $options[:instance] abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit." end elsif not $options[:template] - puts "error: you must supply a --template or --instance." + $stderr.puts "error: you must supply a --template or --instance." p.educate abort end @@ -257,31 +260,46 @@ class JobCache [] end end + + # create() returns [job, exception]. If both job and exception are + # nil, there was a non-retryable error and the call should not be + # attempted again. def self.create(pipeline, component, job, create_params) @cache ||= {} body = {job: no_nil_values(job)}.merge(no_nil_values(create_params)) - result = $client.execute(:api_method => $arvados.jobs.create, - :body_object => body, - :authenticated => false, - :headers => { - authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN'] - }) - j = JSON.parse result.body, :symbolize_names => true - if j.is_a? Hash and j[:uuid] + result = nil + begin + result = $client.execute( + :api_method => $arvados.jobs.create, + :body_object => body, + :authenticated => false, + :headers => { + authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN'] + }) + if result.status == 429 || result.status >= 500 + raise Exception.new("HTTP status #{result.status}") + end + rescue Exception => e + return nil, e + end + j = JSON.parse(result.body, :symbolize_names => true) rescue nil + if result.status == 200 && j.is_a?(Hash) && j[:uuid] @cache[j[:uuid]] = j + return j, nil else - debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0 + errors = j[:errors] rescue [] + debuglog "create job: [#{result.status}] #{errors.inspect} with attributes #{body}", 0 msg = "" - j[:errors].each do |err| + errors.each do |err| msg += "Error creating job for component #{component}: #{err}\n" end msg += "Job submission was: #{body.to_json}" pipeline.log_stderr(msg) - nil + return nil, nil end end @@ -364,22 +382,27 @@ class WhRunPipelineInstance @components.each do |componentname, component| component[:script_parameters].each do |parametername, parameter| parameter = { :value => parameter } unless parameter.is_a? Hash - value = - (params["#{componentname}::#{parametername}"] || - parameter[:value] || - (parameter[:output_of].nil? && - (params[parametername.to_s] || - parameter[:default])) || - nil) - if value.nil? and - ![false,'false',0,'0'].index parameter[:required] - if parameter[:output_of] - if not @components[parameter[:output_of].intern] - errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"] - end - next + if params.has_key?("#{componentname}::#{parametername}") + value = params["#{componentname}::#{parametername}"] + elsif parameter.has_key?(:value) + value = parameter[:value] + elsif parameter.has_key?(:output_of) + if !@components[parameter[:output_of].intern] + errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"] + else + # value will be filled in later when the upstream + # component's output becomes known end + next + elsif params.has_key?(parametername.to_s) + value = params[parametername.to_s] + elsif parameter.has_key?(:default) + value = parameter[:default] + elsif [false, 'false', 0, '0'].index(parameter[:required]) + value = nil + else errors << [componentname, parametername, "required parameter is missing"] + next end debuglog "parameter #{componentname}::#{parametername} == #{value}" @@ -388,7 +411,10 @@ class WhRunPipelineInstance end end if !errors.empty? - abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}" + all_errors = errors.collect do |c,p,e| + "#{c}::#{p} - #{e}\n" + end.join("") + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{all_errors}" end debuglog "options=" + @options.pretty_inspect self @@ -455,7 +481,7 @@ class WhRunPipelineInstance # are fully specified (any output_of script_parameters are resolved # to real value) my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}" - job = JobCache.create(@instance, cname, { + job, err = JobCache.create(@instance, cname, { :script => c[:script], :script_parameters => Hash[c[:script_parameters].map do |key, spec| [key, spec[:value]] @@ -482,9 +508,11 @@ class WhRunPipelineInstance c[:job] = job c[:run_in_process] = (@options[:run_jobs_here] and job[:submit_id] == my_submit_id) - else + elsif err.nil? debuglog "component #{cname} new job failed", 0 job_creation_failed += 1 + else + debuglog "component #{cname} new job failed, err=#{err}", 0 end end @@ -649,7 +677,7 @@ class WhRunPipelineInstance @instance[:state] = 'Complete' else @instance[:state] = 'Paused' - end + end else if ended == @components.length or failed > 0 @instance[:state] = success ? 'Complete' : 'Failed'