X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f00ee875d99ba65aaac178e762fbd3e35ddc5f87..b17d7233f5ccb8ec3d857ae3e506d53377207376:/sdk/cli/bin/arv-run-pipeline-instance diff --git a/sdk/cli/bin/arv-run-pipeline-instance b/sdk/cli/bin/arv-run-pipeline-instance index ab3702cafc..980ce83ae3 100755 --- a/sdk/cli/bin/arv-run-pipeline-instance +++ b/sdk/cli/bin/arv-run-pipeline-instance @@ -244,7 +244,7 @@ class PipelineInstance def self.create(attributes) result = $client.execute(:api_method => $arvados.pipeline_instances.create, :body_object => { - :pipeline_instance => attributes.to_json + :pipeline_instance => attributes }, :authenticated => false, :headers => { @@ -263,7 +263,7 @@ class PipelineInstance :uuid => @pi[:uuid] }, :body_object => { - :pipeline_instance => @attributes_to_update.to_json + :pipeline_instance => @attributes_to_update }, :authenticated => false, :headers => { @@ -285,6 +285,16 @@ class PipelineInstance def [](x) @pi[x] end + + def log_stderr(msg) + $arv.log.create log: { + event_type: 'stderr', + object_uuid: self[:uuid], + owner_uuid: self[:owner_uuid], + properties: {"text" => msg}, + } + end + protected def initialize(j) @attributes_to_update = {} @@ -345,19 +355,7 @@ class JobCache end msg += "Job submission was: #{body.to_json}" - $client.execute(:api_method => $arvados.logs.create, - :body_object => { - :log => { - :object_uuid => pipeline[:uuid], - :event_type => 'stderr', - :owner_uuid => pipeline[:owner_uuid], - :properties => {"text" => msg} - } - }, - :authenticated => false, - :headers => { - authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] - }) + pipeline.log_stderr(msg) nil end end @@ -380,10 +378,6 @@ class WhRunPipelineInstance if template.match /[^-0-9a-z]/ # Doesn't look like a uuid -- use it as a filename. @template = JSON.parse File.read(template), :symbolize_names => true - if !@template[:components] - abort ("#{$0}: Template loaded from #{template} " + - "does not have a \"components\" key") - end else result = $client.execute(:api_method => $arvados.pipeline_templates.get, :parameters => { @@ -422,8 +416,25 @@ class WhRunPipelineInstance end end + if not @template[:components].is_a?(Hash) + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash" + end @components = @template[:components].dup + bad_components = @components.each_pair.select do |cname, cspec| + not cspec.is_a?(Hash) + end + if bad_components.any? + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}" + end + + bad_components = @components.each_pair.select do |cname, cspec| + not cspec[:script_parameters].is_a?(Hash) + end + if bad_components.any? + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}" + end + errors = [] @components.each do |componentname, component| component[:script_parameters].each do |parametername, parameter| @@ -454,16 +465,28 @@ class WhRunPipelineInstance end def setup_instance - if $options[:submit] - @instance ||= PipelineInstance. - create(:components => @components, - :pipeline_template_uuid => @template[:uuid], - :state => 'New') + if @instance + @instance[:properties][:run_options] ||= {} + if @options[:no_reuse] + # override properties of existing instance + @instance[:properties][:run_options][:enable_job_reuse] = false + else + # Default to "enable reuse" if not specified. (This code path + # can go away when old clients go away.) + if @instance[:properties][:run_options][:enable_job_reuse].nil? + @instance[:properties][:run_options][:enable_job_reuse] = true + end + end else - @instance ||= PipelineInstance. - create(:components => @components, - :pipeline_template_uuid => @template[:uuid], - :state => 'RunningOnClient') + @instance = PipelineInstance. + create(components: @components, + properties: { + run_options: { + enable_job_reuse: !@options[:no_reuse] + } + }, + pipeline_template_uuid: @template[:uuid], + state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')) end self end @@ -503,7 +526,8 @@ class WhRunPipelineInstance # dealing with new API servers. :minimum_script_version => c[:minimum_script_version], :exclude_script_versions => c[:exclude_minimum_script_versions], - :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]), + :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] && + !c[:nondeterministic]), :filters => c[:filters] }) if job @@ -698,6 +722,18 @@ class WhRunPipelineInstance end end end + + def abort(msg) + if @instance + if ["New", "Ready", "RunningOnClient", + "RunningOnServer"].include?(@instance[:state]) + @instance[:state] = "Failed" + @instance.save + end + @instance.log_stderr(msg) + end + Kernel::abort(msg) + end end runner = WhRunPipelineInstance.new($options)