add --create-only and --instance arguments. refs #1448
authorTom Clegg <tom@clinicalfuture.com>
Wed, 27 Mar 2013 00:14:59 +0000 (17:14 -0700)
committerTom Clegg <tom@clinicalfuture.com>
Wed, 27 Mar 2013 00:51:53 +0000 (17:51 -0700)
cli/wh-run-pipeline-instance

index 40941c04f7dc9b219769210c69b0f34245024e0f..197519ad63de5992a598fad92a46e3421fa06f2b 100755 (executable)
@@ -3,6 +3,7 @@
 # == Synopsis
 #
 #  wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
+#  wh-run-pipeline-instance --instance pipeline-instance-uuid [options]
 #
 # Satisfy a pipeline template by finding or submitting a mapreduce job
 # for each pipeline component.
 #
 # [--template uuid] Use the specified pipeline template.
 #
+# [--instance uuid] Use the specified pipeline instance.
+#
 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
 #                 to finish. Just find out whether jobs are finished,
 #                 queued, or running for each component
 #
+# [--create-only] Do not try to satisfy any components. Just create an
+#                 instance, print its UUID to stdout, and exit.
+#
 # [--no-wait] Make only as much progress as possible without entering
 #             a sleep/poll loop.
 #
@@ -141,8 +147,15 @@ p = Trollop::Parser.new do
   opt(:template,
       "UUID of pipeline template.",
       :short => :none,
-      :type => :string,
-      :required => true)
+      :type => :string)
+  opt(:instance,
+      "UUID of pipeline instance.",
+      :short => :none,
+      :type => :string)
+  opt(:create_only,
+      "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
+      :short => :none,
+      :type => :boolean)
   stop_on [:'--']
 end
 $options = Trollop::with_standard_exception_handling p do
@@ -150,6 +163,13 @@ $options = Trollop::with_standard_exception_handling p do
 end
 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
 
+if $options[:instance]
+  if $options[:template] or $options[:create_only]
+    abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-only."
+  end
+elsif not $options[:template]
+  abort "#{$0}: syntax error: you must supply a --template or --instance."
+end
 
 # Set up the API client.
 
@@ -161,8 +181,23 @@ $orvos = $client.discovered_api('orvos', $orvos_api_version)
 
 
 class PipelineInstance
-  def initialize(attributes)
-    @attributes_to_update = {}
+  def self.find(uuid)
+    result = $client.execute(:api_method => $orvos.pipeline_instances.get,
+                             :parameters => {
+                               :api_token => ENV['ORVOS_API_TOKEN'],
+                               :uuid => uuid
+                             },
+                             :authenticated => false)
+    j = JSON.parse result.body, :symbolize_names => true
+    unless j.is_a? Hash and j[:uuid]
+      debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
+      nil
+    else
+      debuglog "Retrieved pipeline_instance #{j[:uuid]}"
+      self.new(j)
+    end
+  end
+  def self.create(attributes)
     result = $client.execute(:api_method => $orvos.pipeline_instances.create,
                              :parameters => {
                                :api_token => ENV['ORVOS_API_TOKEN'],
@@ -171,10 +206,10 @@ class PipelineInstance
                              :authenticated => false)
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
-      abort "Failed to create pipeline_instance: #{j[:errors] rescue nil}"
+      abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
     end
     debuglog "Created pipeline instance: #{j[:uuid]}"
-    @pi = j
+    self.new(j)
   end
   def save
     result = $client.execute(:api_method => $orvos.pipeline_instances.update,
@@ -200,6 +235,11 @@ class PipelineInstance
   def [](x)
     @pi[x]
   end
+  protected
+  def initialize(j)
+    @attributes_to_update = {}
+    @pi = j
+  end
 end
 
 class JobCache
@@ -247,6 +287,8 @@ class JobCache
 end
 
 class WhRunPipelineInstance
+  attr_reader :instance
+
   def initialize(_options)
     @options = _options
   end
@@ -265,6 +307,12 @@ class WhRunPipelineInstance
     self
   end
 
+  def fetch_instance(instance_uuid)
+    @instance = PipelineInstance.find(instance_uuid)
+    @template = @instance
+    self
+  end
+
   def apply_parameters(params_args)
     params_args.shift if params_args[0] == '--'
     params = {}
@@ -311,14 +359,18 @@ class WhRunPipelineInstance
     self
   end
 
+  def setup_instance
+    @instance ||= PipelineInstance.
+      create(:components => @components,
+             :pipeline_template_uuid => @template[:uuid],
+             :active => true)
+    self
+  end
+
   def run
     moretodo = true
     while moretodo
       moretodo = false
-
-      @instance ||= PipelineInstance.new(:components => @components,
-                                         :pipeline_template_uuid => @template[:uuid],
-                                         :active => true)
       @components.each do |cname, c|
         job = nil
         if !c[:job] and
@@ -329,6 +381,7 @@ class WhRunPipelineInstance
           debuglog "component #{cname} ready to satisfy."
 
           c.delete :wait
+          second_place_job = nil # satisfies component, but not finished yet
           JobCache.where(:script => c[:script],
                          :script_parameters => c[:script_parameters],
                          :script_version_descends_from => c[:script_version_descends_from]).
@@ -350,10 +403,19 @@ class WhRunPipelineInstance
               next
             end
 
-            job = candidate_job
+            if candidate_job[:success]
+              job = candidate_job
+              debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
+              c[:job] = job
+            else
+              second_place_job ||= candidate_job
+            end
+            break
+          end
+          if not c[:job] and second_place_job
+            job = second_place_job
             debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
             c[:job] = job
-            break
           end
           if not c[:job]
             debuglog "component #{cname} not satisfied by any existing job."
@@ -405,8 +467,14 @@ class WhRunPipelineInstance
   end
 
   def cleanup
-    @instance[:active] = false
-    @instance.save
+    if @instance
+      @instance[:active] = false
+      @instance.save
+    end
+  end
+
+  def uuid
+    @instance[:uuid]
   end
 
   protected
@@ -433,6 +501,8 @@ class WhRunPipelineInstance
                       c[:job][:output]
                     elsif c[:job][:cancelled_at]
                       "cancelled #{c[:job][:cancelled_at]}"
+                    elsif c[:job][:finished_at]
+                      "failed #{c[:job][:finished_at]}"
                     elsif c[:job][:started_at]
                       "started #{c[:job][:started_at]}"
                     else
@@ -445,12 +515,22 @@ class WhRunPipelineInstance
   end
 end
 
-instance = WhRunPipelineInstance.new($options)
+runner = WhRunPipelineInstance.new($options)
 begin
-  instance.fetch_template($options[:template]).
-    apply_parameters(p.leftovers).
-    run
+  if $options[:template]
+    runner.fetch_template($options[:template])
+  else
+    runner.fetch_instance($options[:instance])
+  end
+  runner.apply_parameters(p.leftovers)
+  runner.setup_instance
+  if $options[:create_only]
+    runner.instance.save
+    puts runner.instance[:uuid]
+  else
+    runner.run
+  end
 rescue Exception => e
-  instance.cleanup
+  runner.cleanup
   raise e
 end