From ff2b5624dfbb779823577bda3ef716d39dfe83d3 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 26 Mar 2013 17:14:59 -0700 Subject: [PATCH 1/1] add --create-only and --instance arguments. refs #1448 --- cli/wh-run-pipeline-instance | 118 +++++++++++++++++++++++++++++------ 1 file changed, 99 insertions(+), 19 deletions(-) diff --git a/cli/wh-run-pipeline-instance b/cli/wh-run-pipeline-instance index 40941c04f7..197519ad63 100755 --- a/cli/wh-run-pipeline-instance +++ b/cli/wh-run-pipeline-instance @@ -3,6 +3,7 @@ # == Synopsis # # wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters] +# wh-run-pipeline-instance --instance pipeline-instance-uuid [options] # # Satisfy a pipeline template by finding or submitting a mapreduce job # for each pipeline component. @@ -11,10 +12,15 @@ # # [--template uuid] Use the specified pipeline template. # +# [--instance uuid] Use the specified pipeline instance. +# # [-n, --dry-run] Do not start any new jobs or wait for existing jobs # to finish. Just find out whether jobs are finished, # queued, or running for each component # +# [--create-only] Do not try to satisfy any components. Just create an +# instance, print its UUID to stdout, and exit. +# # [--no-wait] Make only as much progress as possible without entering # a sleep/poll loop. # @@ -141,8 +147,15 @@ p = Trollop::Parser.new do opt(:template, "UUID of pipeline template.", :short => :none, - :type => :string, - :required => true) + :type => :string) + opt(:instance, + "UUID of pipeline instance.", + :short => :none, + :type => :string) + opt(:create_only, + "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.", + :short => :none, + :type => :boolean) stop_on [:'--'] end $options = Trollop::with_standard_exception_handling p do @@ -150,6 +163,13 @@ $options = Trollop::with_standard_exception_handling p do end $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0 +if $options[:instance] + if $options[:template] or $options[:create_only] + abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-only." + end +elsif not $options[:template] + abort "#{$0}: syntax error: you must supply a --template or --instance." +end # Set up the API client. @@ -161,8 +181,23 @@ $orvos = $client.discovered_api('orvos', $orvos_api_version) class PipelineInstance - def initialize(attributes) - @attributes_to_update = {} + def self.find(uuid) + result = $client.execute(:api_method => $orvos.pipeline_instances.get, + :parameters => { + :api_token => ENV['ORVOS_API_TOKEN'], + :uuid => uuid + }, + :authenticated => false) + j = JSON.parse result.body, :symbolize_names => true + unless j.is_a? Hash and j[:uuid] + debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0 + nil + else + debuglog "Retrieved pipeline_instance #{j[:uuid]}" + self.new(j) + end + end + def self.create(attributes) result = $client.execute(:api_method => $orvos.pipeline_instances.create, :parameters => { :api_token => ENV['ORVOS_API_TOKEN'], @@ -171,10 +206,10 @@ class PipelineInstance :authenticated => false) j = JSON.parse result.body, :symbolize_names => true unless j.is_a? Hash and j[:uuid] - abort "Failed to create pipeline_instance: #{j[:errors] rescue nil}" + abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}" end debuglog "Created pipeline instance: #{j[:uuid]}" - @pi = j + self.new(j) end def save result = $client.execute(:api_method => $orvos.pipeline_instances.update, @@ -200,6 +235,11 @@ class PipelineInstance def [](x) @pi[x] end + protected + def initialize(j) + @attributes_to_update = {} + @pi = j + end end class JobCache @@ -247,6 +287,8 @@ class JobCache end class WhRunPipelineInstance + attr_reader :instance + def initialize(_options) @options = _options end @@ -265,6 +307,12 @@ class WhRunPipelineInstance self end + def fetch_instance(instance_uuid) + @instance = PipelineInstance.find(instance_uuid) + @template = @instance + self + end + def apply_parameters(params_args) params_args.shift if params_args[0] == '--' params = {} @@ -311,14 +359,18 @@ class WhRunPipelineInstance self end + def setup_instance + @instance ||= PipelineInstance. + create(:components => @components, + :pipeline_template_uuid => @template[:uuid], + :active => true) + self + end + def run moretodo = true while moretodo moretodo = false - - @instance ||= PipelineInstance.new(:components => @components, - :pipeline_template_uuid => @template[:uuid], - :active => true) @components.each do |cname, c| job = nil if !c[:job] and @@ -329,6 +381,7 @@ class WhRunPipelineInstance debuglog "component #{cname} ready to satisfy." c.delete :wait + second_place_job = nil # satisfies component, but not finished yet JobCache.where(:script => c[:script], :script_parameters => c[:script_parameters], :script_version_descends_from => c[:script_version_descends_from]). @@ -350,10 +403,19 @@ class WhRunPipelineInstance next end - job = candidate_job + if candidate_job[:success] + job = candidate_job + debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}" + c[:job] = job + else + second_place_job ||= candidate_job + end + break + end + if not c[:job] and second_place_job + job = second_place_job debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}" c[:job] = job - break end if not c[:job] debuglog "component #{cname} not satisfied by any existing job." @@ -405,8 +467,14 @@ class WhRunPipelineInstance end def cleanup - @instance[:active] = false - @instance.save + if @instance + @instance[:active] = false + @instance.save + end + end + + def uuid + @instance[:uuid] end protected @@ -433,6 +501,8 @@ class WhRunPipelineInstance c[:job][:output] elsif c[:job][:cancelled_at] "cancelled #{c[:job][:cancelled_at]}" + elsif c[:job][:finished_at] + "failed #{c[:job][:finished_at]}" elsif c[:job][:started_at] "started #{c[:job][:started_at]}" else @@ -445,12 +515,22 @@ class WhRunPipelineInstance end end -instance = WhRunPipelineInstance.new($options) +runner = WhRunPipelineInstance.new($options) begin - instance.fetch_template($options[:template]). - apply_parameters(p.leftovers). - run + if $options[:template] + runner.fetch_template($options[:template]) + else + runner.fetch_instance($options[:instance]) + end + runner.apply_parameters(p.leftovers) + runner.setup_instance + if $options[:create_only] + runner.instance.save + puts runner.instance[:uuid] + else + runner.run + end rescue Exception => e - instance.cleanup + runner.cleanup raise e end -- 2.30.2