X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/68f12bfd1ca6612338ff65106f404019a28d0cd3..32af86f5f4a2849b12c96e40ed20d33b7682ba55:/sdk/cli/bin/arv-run-pipeline-instance diff --git a/sdk/cli/bin/arv-run-pipeline-instance b/sdk/cli/bin/arv-run-pipeline-instance index 304628f438..f20acebbd0 100755 --- a/sdk/cli/bin/arv-run-pipeline-instance +++ b/sdk/cli/bin/arv-run-pipeline-instance @@ -243,7 +243,7 @@ class PipelineInstance end def self.create(attributes) result = $client.execute(:api_method => $arvados.pipeline_instances.create, - :body => { + :body_object => { :pipeline_instance => attributes }, :authenticated => false, @@ -252,7 +252,7 @@ class PipelineInstance }) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] - abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}" + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}" end debuglog "Created pipeline instance: #{j[:uuid]}" self.new(j) @@ -262,8 +262,8 @@ class PipelineInstance :parameters => { :uuid => @pi[:uuid] }, - :body => { - :pipeline_instance => @attributes_to_update.to_json + :body_object => { + :pipeline_instance => @attributes_to_update }, :authenticated => false, :headers => { @@ -322,18 +322,13 @@ class JobCache [] end end - def self.create(job, create_params) + def self.create(pipeline, component, job, create_params) @cache ||= {} - jsonified_create_params = {} - create_params.each do |k, v| - jsonified_create_params[k] = v.to_json - end + body = {job: no_nil_values(job)}.merge(no_nil_values(create_params)) result = $client.execute(:api_method => $arvados.jobs.create, - :body => { - :job => job.to_json - }.merge(jsonified_create_params), + :body_object => body, :authenticated => false, :headers => { authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] @@ -342,10 +337,36 @@ class JobCache if j.is_a? Hash and j[:uuid] @cache[j[:uuid]] = j else - debuglog "create job: #{j[:errors] rescue nil} with attributes #{job}", 0 + debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0 + + msg = "" + j[:errors].each do |err| + msg += "Error creating job for component #{component}: #{err}\n" + end + msg += "Job submission was: #{body.to_json}" + + $client.execute(:api_method => $arvados.logs.create, + :body_object => { + :log => { + :object_uuid => pipeline[:uuid], + :event_type => 'stderr', + :owner_uuid => pipeline[:owner_uuid], + :properties => {"text" => msg} + } + }, + :authenticated => false, + :headers => { + authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] + }) nil end end + + protected + + def self.no_nil_values(hash) + hash.reject { |key, value| value.nil? } + end end class WhRunPipelineInstance @@ -397,7 +418,7 @@ class WhRunPipelineInstance param = params_args.shift.sub /^--/, '' params[param] = params_args.shift else - abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\"" + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\"" end end @@ -426,23 +447,35 @@ class WhRunPipelineInstance end end if !errors.empty? - abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}" + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}" end debuglog "options=" + @options.pretty_inspect self end def setup_instance - if $options[:submit] - @instance ||= PipelineInstance. - create(:components => @components, - :pipeline_template_uuid => @template[:uuid], - :state => 'New') + if @instance + @instance[:properties][:run_options] ||= {} + if @options[:no_reuse] + # override properties of existing instance + @instance[:properties][:run_options][:enable_job_reuse] = false + else + # Default to "enable reuse" if not specified. (This code path + # can go away when old clients go away.) + if @instance[:properties][:run_options][:enable_job_reuse].nil? + @instance[:properties][:run_options][:enable_job_reuse] = true + end + end else - @instance ||= PipelineInstance. - create(:components => @components, - :pipeline_template_uuid => @template[:uuid], - :state => 'RunningOnClient') + @instance = PipelineInstance. + create(components: @components, + properties: { + run_options: { + enable_job_reuse: !@options[:no_reuse] + } + }, + pipeline_template_uuid: @template[:uuid], + state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')) end self end @@ -468,7 +501,7 @@ class WhRunPipelineInstance # No job yet associated with this component and is component inputs # are fully specified (any output_of script_parameters are resolved # to real value) - job = JobCache.create({ + job = JobCache.create(@instance, cname, { :script => c[:script], :script_parameters => c[:script_parameters], :script_version => c[:script_version], @@ -482,7 +515,8 @@ class WhRunPipelineInstance # dealing with new API servers. :minimum_script_version => c[:minimum_script_version], :exclude_script_versions => c[:exclude_minimum_script_versions], - :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]), + :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] && + !c[:nondeterministic]), :filters => c[:filters] }) if job