X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/879e1f31b0165c6abaa35a023ff66400757bb44b..b17d7233f5ccb8ec3d857ae3e506d53377207376:/sdk/cli/bin/arv-run-pipeline-instance diff --git a/sdk/cli/bin/arv-run-pipeline-instance b/sdk/cli/bin/arv-run-pipeline-instance index e9b3f00b61..980ce83ae3 100755 --- a/sdk/cli/bin/arv-run-pipeline-instance +++ b/sdk/cli/bin/arv-run-pipeline-instance @@ -187,7 +187,9 @@ if $options[:instance] abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit." end elsif not $options[:template] - abort "#{$0}: syntax error: you must supply a --template or --instance." + puts "error: you must supply a --template or --instance." + p.educate + abort end if $options[:run_here] == $options[:submit] @@ -241,7 +243,7 @@ class PipelineInstance end def self.create(attributes) result = $client.execute(:api_method => $arvados.pipeline_instances.create, - :body => { + :body_object => { :pipeline_instance => attributes }, :authenticated => false, @@ -250,7 +252,7 @@ class PipelineInstance }) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] - abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}" + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}" end debuglog "Created pipeline instance: #{j[:uuid]}" self.new(j) @@ -260,8 +262,8 @@ class PipelineInstance :parameters => { :uuid => @pi[:uuid] }, - :body => { - :pipeline_instance => @attributes_to_update.to_json + :body_object => { + :pipeline_instance => @attributes_to_update }, :authenticated => false, :headers => { @@ -283,6 +285,16 @@ class PipelineInstance def [](x) @pi[x] end + + def log_stderr(msg) + $arv.log.create log: { + event_type: 'stderr', + object_uuid: self[:uuid], + owner_uuid: self[:owner_uuid], + properties: {"text" => msg}, + } + end + protected def initialize(j) @attributes_to_update = {} @@ -320,12 +332,13 @@ class JobCache [] end end - def self.create(job, create_params) + def self.create(pipeline, component, job, create_params) @cache ||= {} + + body = {job: no_nil_values(job)}.merge(no_nil_values(create_params)) + result = $client.execute(:api_method => $arvados.jobs.create, - :body => { - :job => job.to_json - }.merge(create_params), + :body_object => body, :authenticated => false, :headers => { authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] @@ -334,10 +347,24 @@ class JobCache if j.is_a? Hash and j[:uuid] @cache[j[:uuid]] = j else - debuglog "create job: #{j[:errors] rescue nil} with attributes #{job}", 0 + debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0 + + msg = "" + j[:errors].each do |err| + msg += "Error creating job for component #{component}: #{err}\n" + end + msg += "Job submission was: #{body.to_json}" + + pipeline.log_stderr(msg) nil end end + + protected + + def self.no_nil_values(hash) + hash.reject { |key, value| value.nil? } + end end class WhRunPipelineInstance @@ -351,10 +378,6 @@ class WhRunPipelineInstance if template.match /[^-0-9a-z]/ # Doesn't look like a uuid -- use it as a filename. @template = JSON.parse File.read(template), :symbolize_names => true - if !@template[:components] - abort ("#{$0}: Template loaded from #{template} " + - "does not have a \"components\" key") - end else result = $client.execute(:api_method => $arvados.pipeline_templates.get, :parameters => { @@ -389,12 +412,29 @@ class WhRunPipelineInstance param = params_args.shift.sub /^--/, '' params[param] = params_args.shift else - abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\"" + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\"" end end + if not @template[:components].is_a?(Hash) + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash" + end @components = @template[:components].dup + bad_components = @components.each_pair.select do |cname, cspec| + not cspec.is_a?(Hash) + end + if bad_components.any? + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}" + end + + bad_components = @components.each_pair.select do |cname, cspec| + not cspec[:script_parameters].is_a?(Hash) + end + if bad_components.any? + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}" + end + errors = [] @components.each do |componentname, component| component[:script_parameters].each do |parametername, parameter| @@ -418,23 +458,35 @@ class WhRunPipelineInstance end end if !errors.empty? - abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}" + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}" end debuglog "options=" + @options.pretty_inspect self end def setup_instance - if $options[:submit] - @instance ||= PipelineInstance. - create(:components => @components, - :pipeline_template_uuid => @template[:uuid], - :state => 'New') + if @instance + @instance[:properties][:run_options] ||= {} + if @options[:no_reuse] + # override properties of existing instance + @instance[:properties][:run_options][:enable_job_reuse] = false + else + # Default to "enable reuse" if not specified. (This code path + # can go away when old clients go away.) + if @instance[:properties][:run_options][:enable_job_reuse].nil? + @instance[:properties][:run_options][:enable_job_reuse] = true + end + end else - @instance ||= PipelineInstance. - create(:components => @components, - :pipeline_template_uuid => @template[:uuid], - :state => 'RunningOnClient') + @instance = PipelineInstance. + create(components: @components, + properties: { + run_options: { + enable_job_reuse: !@options[:no_reuse] + } + }, + pipeline_template_uuid: @template[:uuid], + state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')) end self end @@ -443,6 +495,7 @@ class WhRunPipelineInstance moretodo = true interrupted = false + job_creation_failed = 0 while moretodo moretodo = false @components.each do |cname, c| @@ -459,33 +512,30 @@ class WhRunPipelineInstance # No job yet associated with this component and is component inputs # are fully specified (any output_of script_parameters are resolved # to real value) - job = JobCache.create({ + job = JobCache.create(@instance, cname, { :script => c[:script], :script_parameters => c[:script_parameters], :script_version => c[:script_version], :repository => c[:repository], :nondeterministic => c[:nondeterministic], :output_is_persistent => c[:output_is_persistent] || false, + :runtime_constraints => c[:runtime_constraints], :owner_uuid => owner_uuid, - # TODO: Delete the following three attributes when - # supporting pre-20140418 API servers is no longer - # important. New API servers take these as flags that - # control behavior of create, rather than job attributes. - :minimum_script_version => c[:minimum_script_version], - :exclude_script_versions => c[:exclude_minimum_script_versions], - :no_reuse => @options[:no_reuse] || c[:nondeterministic], }, { # This is the right place to put these attributes when # dealing with new API servers. :minimum_script_version => c[:minimum_script_version], :exclude_script_versions => c[:exclude_minimum_script_versions], - :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]), + :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] && + !c[:nondeterministic]), + :filters => c[:filters] }) if job debuglog "component #{cname} new job #{job[:uuid]}" c[:job] = job else - debuglog "component #{cname} new job failed" + debuglog "component #{cname} new job failed", 0 + job_creation_failed += 1 end end @@ -562,6 +612,11 @@ class WhRunPipelineInstance moretodo = false end + # If job creation fails, just give up on this pipeline instance. + if job_creation_failed > 0 + moretodo = false + end + if moretodo begin sleep 10 @@ -582,7 +637,7 @@ class WhRunPipelineInstance ended += 1 if c[:job][:success] == true succeeded += 1 - elsif c[:job][:success] == false + elsif c[:job][:success] == false or c[:job][:cancelled_at] failed += 1 end end @@ -591,6 +646,13 @@ class WhRunPipelineInstance success = (succeeded == @components.length) + # A job create call failed. Just give up. + if job_creation_failed > 0 + debuglog "job creation failed - giving up on this pipeline instance", 0 + success = false + failed += 1 + end + if interrupted if success @instance[:state] = 'Complete' @@ -603,6 +665,8 @@ class WhRunPipelineInstance end end + debuglog "pipeline instance state is #{@instance[:state]}" + # set components_summary components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed} @instance[:components_summary] = components_summary @@ -658,6 +722,18 @@ class WhRunPipelineInstance end end end + + def abort(msg) + if @instance + if ["New", "Ready", "RunningOnClient", + "RunningOnServer"].include?(@instance[:state]) + @instance[:state] = "Failed" + @instance.save + end + @instance.log_stderr(msg) + end + Kernel::abort(msg) + end end runner = WhRunPipelineInstance.new($options)