Fail more gracefully on some common error conditions
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index a3e2c4b8205b0464833fbde20fe57c9d70beca57..740823ba0d963aa287fedda8bd1aea2c18394c26 100755 (executable)
 #
 # [--template uuid] Use the specified pipeline template.
 #
+# [--template path] Load the pipeline template from the specified
+#                   local file.
+#
 # [--instance uuid] Use the specified pipeline instance.
 #
 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
 #                 to finish. Just find out whether jobs are finished,
 #                 queued, or running for each component
 #
-# [--create-only] Do not try to satisfy any components. Just create an
-#                 instance, print its UUID to stdout, and exit.
+# [--create-instance-only] Do not try to satisfy any components. Just
+#                          create an instance, print its UUID to
+#                          stdout, and exit.
 #
 # [--no-wait] Make only as much progress as possible without entering
 #             a sleep/poll loop.
 #
+# [--no-reuse-finished] Do not reuse existing outputs to satisfy
+#                       pipeline components. Always submit a new job
+#                       or use an existing job which has not yet
+#                       finished.
+#
+# [--no-reuse] Do not reuse existing jobs to satisfy pipeline
+#              components. Submit a new job for every component.
+#
 # [--debug] Print extra debugging information on stderr.
 #
 # [--debug-level N] Increase amount of debugging information. Default
@@ -137,6 +149,14 @@ p = Trollop::Parser.new do
       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
       :short => :none,
       :type => :boolean)
+  opt(:no_reuse_finished,
+      "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
+      :short => :none,
+      :type => :boolean)
+  opt(:no_reuse,
+      "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
+      :short => :none,
+      :type => :boolean)
   opt(:debug,
       "Print extra debugging information on stderr.",
       :type => :boolean)
@@ -145,14 +165,14 @@ p = Trollop::Parser.new do
       :short => :none,
       :type => :integer)
   opt(:template,
-      "UUID of pipeline template.",
+      "UUID of pipeline template, or path to local pipeline template file.",
       :short => :none,
       :type => :string)
   opt(:instance,
       "UUID of pipeline instance.",
       :short => :none,
       :type => :string)
-  opt(:create_only,
+  opt(:create_instance_only,
       "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
       :short => :none,
       :type => :boolean)
@@ -164,8 +184,8 @@ end
 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
 
 if $options[:instance]
-  if $options[:template] or $options[:create_only]
-    abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-only."
+  if $options[:template] or $options[:create_instance_only]
+    abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-instance-only."
   end
 elsif not $options[:template]
   abort "#{$0}: syntax error: you must supply a --template or --instance."
@@ -184,9 +204,11 @@ class PipelineInstance
   def self.find(uuid)
     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
                              :parameters => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
                                :uuid => uuid
                              },
+                             :body => {
+                               :api_token => ENV['ARVADOS_API_TOKEN']
+                             },
                              :authenticated => false)
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
@@ -199,9 +221,9 @@ class PipelineInstance
   end
   def self.create(attributes)
     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
-                             :parameters => {
+                             :body => {
                                :api_token => ENV['ARVADOS_API_TOKEN'],
-                               :pipeline_instance => attributes.to_json
+                               :pipeline_instance => attributes
                              },
                              :authenticated => false)
     j = JSON.parse result.body, :symbolize_names => true
@@ -214,8 +236,10 @@ class PipelineInstance
   def save
     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
                              :parameters => {
+                               :uuid => @pi[:uuid]
+                             },
+                             :body => {
                                :api_token => ENV['ARVADOS_API_TOKEN'],
-                               :uuid => @pi[:uuid],
                                :pipeline_instance => @attributes_to_update.to_json
                              },
                              :authenticated => false)
@@ -280,7 +304,7 @@ class JobCache
     if j.is_a? Hash and j[:uuid]
       @cache[j[:uuid]] = j
     else
-      debuglog "create job: #{j[:errors] rescue nil}"
+      debuglog "create job: #{j[:errors] rescue nil}", 0
       nil
     end
   end
@@ -293,16 +317,25 @@ class WhRunPipelineInstance
     @options = _options
   end
 
-  def fetch_template(template_uuid)
-    result = $client.execute(:api_method => $arvados.pipeline_templates.get,
-                             :parameters => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
-                               :uuid => template_uuid
-                             },
-                             :authenticated => false)
-    @template = JSON.parse result.body, :symbolize_names => true
-    if !@template[:uuid]
-      abort "#{$0}: fatal: failed to retrieve pipeline template #{template_uuid} #{@template[:errors].inspect rescue nil}"
+  def fetch_template(template)
+    if template.match /[^-0-9a-z]/
+      # Doesn't look like a uuid -- use it as a filename.
+      @template = JSON.parse File.read(template), :symbolize_names => true
+      if !@template[:components]
+        abort ("#{$0}: Template loaded from #{template} " +
+               "does not have a \"components\" key")
+      end
+    else
+      result = $client.execute(:api_method => $arvados.pipeline_templates.get,
+                               :parameters => {
+                                 :api_token => ENV['ARVADOS_API_TOKEN'],
+                                 :uuid => template
+                               },
+                               :authenticated => false)
+      @template = JSON.parse result.body, :symbolize_names => true
+      if !@template[:uuid]
+        abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
+      end
     end
     self
   end
@@ -317,7 +350,7 @@ class WhRunPipelineInstance
     params_args.shift if params_args[0] == '--'
     params = {}
     while !params_args.empty?
-      if (re = params_args[0].match /^(--)?([^-].*?)=(.)/)
+      if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
         params[re[2]] = re[3]
         params_args.shift
       elsif params_args.size > 1
@@ -382,10 +415,12 @@ class WhRunPipelineInstance
 
           c.delete :wait
           second_place_job = nil # satisfies component, but not finished yet
-          JobCache.where(:script => c[:script],
-                         :script_parameters => c[:script_parameters],
-                         :script_version_descends_from => c[:script_version_descends_from]).
-            each do |candidate_job|
+
+          (@options[:no_reuse] ? [] : JobCache.
+           where(script: c[:script],
+                 script_parameters: c[:script_parameters],
+                 script_version_descends_from: c[:script_version])
+           ).each do |candidate_job|
             candidate_params_downcase = Hash[candidate_job[:script_parameters].
                                              map { |k,v| [k.downcase,v] }]
             c_params_downcase = Hash[c[:script_parameters].
@@ -397,6 +432,12 @@ class WhRunPipelineInstance
               next
             end
 
+            if c[:script_version] !=
+                candidate_job[:script_version][0,c[:script_version].length]
+              debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2
+              next
+            end
+
             unless candidate_job[:success] || candidate_job[:running] ||
                 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
               debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
@@ -404,9 +445,11 @@ class WhRunPipelineInstance
             end
 
             if candidate_job[:success]
-              job = candidate_job
-              debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
-              c[:job] = job
+              unless @options[:no_reuse_finished]
+                job = candidate_job
+                debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
+                c[:job] = job
+              end
             else
               second_place_job ||= candidate_job
             end
@@ -423,13 +466,13 @@ class WhRunPipelineInstance
               debuglog "component #{cname} new job."
               job = JobCache.create(:script => c[:script],
                                     :script_parameters => c[:script_parameters],
-                                    :resource_limits => c[:resource_limits] || {},
+                                    :runtime_constraints => c[:runtime_constraints] || {},
                                     :script_version => c[:script_version] || 'master')
               if job
                 debuglog "component #{cname} new job #{job[:uuid]}"
                 c[:job] = job
               else
-                debuglog "component #{cname} new job failed: #{job[:errors]}"
+                debuglog "component #{cname} new job failed"
               end
             end
           end
@@ -437,7 +480,8 @@ class WhRunPipelineInstance
           c[:wait] = true
         end
         if c[:job] and c[:job][:uuid]
-          if not c[:job][:finished_at] and not c[:job][:cancelled_at]
+          if (c[:job][:running] or
+              not (c[:job][:finished_at] or c[:job][:cancelled_at]))
             c[:job] = JobCache.get(c[:job][:uuid])
           end
           if c[:job][:success]
@@ -462,7 +506,14 @@ class WhRunPipelineInstance
       @instance[:components] = @components
       @instance[:active] = moretodo
       report_status
-      sleep 10 if moretodo
+      if moretodo
+        begin
+          sleep 10
+        rescue Interrupt
+          debuglog "interrupt", 0
+          abort
+        end
+      end
     end
     @instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty?
     @instance.save
@@ -526,7 +577,7 @@ begin
   end
   runner.apply_parameters(p.leftovers)
   runner.setup_instance
-  if $options[:create_only]
+  if $options[:create_instance_only]
     runner.instance.save
     puts runner.instance[:uuid]
   else