X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b1c303302dbac2ea6e9ab6775aecaf661b5da9c8..33f848525d46357fbe7669bc9ea09f2f4379f425:/sdk/cli/bin/arv-run-pipeline-instance diff --git a/sdk/cli/bin/arv-run-pipeline-instance b/sdk/cli/bin/arv-run-pipeline-instance index e552d77f3a..e37f2a9ff6 100755 --- a/sdk/cli/bin/arv-run-pipeline-instance +++ b/sdk/cli/bin/arv-run-pipeline-instance @@ -187,7 +187,9 @@ if $options[:instance] abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit." end elsif not $options[:template] - abort "#{$0}: syntax error: you must supply a --template or --instance." + puts "error: you must supply a --template or --instance." + p.educate + abort end if $options[:run_here] == $options[:submit] @@ -242,7 +244,7 @@ class PipelineInstance def self.create(attributes) result = $client.execute(:api_method => $arvados.pipeline_instances.create, :body => { - :pipeline_instance => attributes + :pipeline_instance => attributes.to_json }, :authenticated => false, :headers => { @@ -250,7 +252,7 @@ class PipelineInstance }) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] - abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}" + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}" end debuglog "Created pipeline instance: #{j[:uuid]}" self.new(j) @@ -320,12 +322,13 @@ class JobCache [] end end - def self.create(job, create_params) + def self.create(pipeline, component, job, create_params) @cache ||= {} + + body = {job: no_nil_values(job)}.merge(no_nil_values(create_params)) + result = $client.execute(:api_method => $arvados.jobs.create, - :body => { - :job => job.to_json - }.merge(create_params), + :body => body, :authenticated => false, :headers => { authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] @@ -334,10 +337,36 @@ class JobCache if j.is_a? Hash and j[:uuid] @cache[j[:uuid]] = j else - debuglog "create job: #{j[:errors] rescue nil} with attributes #{job}", 0 + debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0 + + msg = "" + j[:errors].each do |err| + msg += "Error creating job for component #{component}: #{err}\n" + end + msg += "Job submission was: #{body.to_json}" + + $client.execute(:api_method => $arvados.logs.create, + :body => { + :log => { + :object_uuid => pipeline[:uuid], + :event_type => 'stderr', + :owner_uuid => pipeline[:owner_uuid], + :properties => {"text" => msg} + } + }, + :authenticated => false, + :headers => { + authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] + }) nil end end + + protected + + def self.no_nil_values(hash) + hash.reject { |key, value| value.nil? } + end end class WhRunPipelineInstance @@ -389,7 +418,7 @@ class WhRunPipelineInstance param = params_args.shift.sub /^--/, '' params[param] = params_args.shift else - abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\"" + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\"" end end @@ -418,23 +447,35 @@ class WhRunPipelineInstance end end if !errors.empty? - abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}" + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}" end debuglog "options=" + @options.pretty_inspect self end def setup_instance - if $options[:submit] - @instance ||= PipelineInstance. - create(:components => @components, - :pipeline_template_uuid => @template[:uuid], - :state => 'New') + if @instance + @instance[:properties][:run_options] ||= {} + if @options[:no_reuse] + # override properties of existing instance + @instance[:properties][:run_options][:enable_job_reuse] = false + else + # Default to "enable reuse" if not specified. (This code path + # can go away when old clients go away.) + if @instance[:properties][:run_options][:enable_job_reuse].nil? + @instance[:properties][:run_options][:enable_job_reuse] = true + end + end else - @instance ||= PipelineInstance. - create(:components => @components, - :pipeline_template_uuid => @template[:uuid], - :state => 'RunningOnClient') + @instance = PipelineInstance. + create(components: @components, + properties: { + run_options: { + enable_job_reuse: !@options[:no_reuse] + } + }, + pipeline_template_uuid: @template[:uuid], + state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')) end self end @@ -443,10 +484,12 @@ class WhRunPipelineInstance moretodo = true interrupted = false + job_creation_failed = 0 while moretodo moretodo = false @components.each do |cname, c| job = nil + owner_uuid = @instance[:owner_uuid] # Is the job satisfying this component already known to be # finished? (Already meaning "before we query API server about # the job's current state") @@ -458,32 +501,30 @@ class WhRunPipelineInstance # No job yet associated with this component and is component inputs # are fully specified (any output_of script_parameters are resolved # to real value) - job = JobCache.create({ + job = JobCache.create(@instance, cname, { :script => c[:script], :script_parameters => c[:script_parameters], :script_version => c[:script_version], :repository => c[:repository], :nondeterministic => c[:nondeterministic], :output_is_persistent => c[:output_is_persistent] || false, - # TODO: Delete the following three attributes when - # supporting pre-20140418 API servers is no longer - # important. New API servers take these as flags that - # control behavior of create, rather than job attributes. - :minimum_script_version => c[:minimum_script_version], - :exclude_script_versions => c[:exclude_minimum_script_versions], - :no_reuse => @options[:no_reuse] || c[:nondeterministic], + :runtime_constraints => c[:runtime_constraints], + :owner_uuid => owner_uuid, }, { # This is the right place to put these attributes when # dealing with new API servers. :minimum_script_version => c[:minimum_script_version], :exclude_script_versions => c[:exclude_minimum_script_versions], - :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]), + :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] && + !c[:nondeterministic]), + :filters => c[:filters] }) if job debuglog "component #{cname} new job #{job[:uuid]}" c[:job] = job else - debuglog "component #{cname} new job failed" + debuglog "component #{cname} new job failed", 0 + job_creation_failed += 1 end end @@ -537,7 +578,8 @@ class WhRunPipelineInstance tail_kind: 'arvados#user', tail_uuid: @my_user_uuid, head_kind: 'arvados#collection', - head_uuid: wanted + head_uuid: wanted, + owner_uuid: owner_uuid } debuglog "added link, uuid #{newlink[:uuid]}" end @@ -559,6 +601,11 @@ class WhRunPipelineInstance moretodo = false end + # If job creation fails, just give up on this pipeline instance. + if job_creation_failed > 0 + moretodo = false + end + if moretodo begin sleep 10 @@ -575,11 +622,11 @@ class WhRunPipelineInstance failed = 0 @components.each do |cname, c| if c[:job] - if c[:job][:finished_at] + if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false) ended += 1 if c[:job][:success] == true succeeded += 1 - elsif c[:job][:success] == false + elsif c[:job][:success] == false or c[:job][:cancelled_at] failed += 1 end end @@ -588,6 +635,13 @@ class WhRunPipelineInstance success = (succeeded == @components.length) + # A job create call failed. Just give up. + if job_creation_failed > 0 + debuglog "job creation failed - giving up on this pipeline instance", 0 + success = false + failed += 1 + end + if interrupted if success @instance[:state] = 'Complete' @@ -600,6 +654,8 @@ class WhRunPipelineInstance end end + debuglog "pipeline instance state is #{@instance[:state]}" + # set components_summary components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed} @instance[:components_summary] = components_summary