Merge branch '11917-dont-clear-cache'
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index 6dc82c5a20b841b1aeb1400ecdaf7dd6c21d4ed5..b66e9c0526e3a9b7926b381d9fee7ec8cbb6b901 100755 (executable)
@@ -1,4 +1,7 @@
 #!/usr/bin/env ruby
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
 
 class WhRunPipelineInstance
 end
@@ -257,31 +260,46 @@ class JobCache
       []
     end
   end
+
+  # create() returns [job, exception]. If both job and exception are
+  # nil, there was a non-retryable error and the call should not be
+  # attempted again.
   def self.create(pipeline, component, job, create_params)
     @cache ||= {}
 
     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
 
-    result = $client.execute(:api_method => $arvados.jobs.create,
-                             :body_object => body,
-                             :authenticated => false,
-                             :headers => {
-                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
-                             })
-    j = JSON.parse result.body, :symbolize_names => true
-    if j.is_a? Hash and j[:uuid]
+    result = nil
+    begin
+      result = $client.execute(
+        :api_method => $arvados.jobs.create,
+        :body_object => body,
+        :authenticated => false,
+        :headers => {
+          authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
+        })
+      if result.status == 429 || result.status >= 500
+        raise Exception.new("HTTP status #{result.status}")
+      end
+    rescue Exception => e
+      return nil, e
+    end
+    j = JSON.parse(result.body, :symbolize_names => true) rescue nil
+    if result.status == 200 && j.is_a?(Hash) && j[:uuid]
       @cache[j[:uuid]] = j
+      return j, nil
     else
-      debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
+      errors = j[:errors] rescue []
+      debuglog "create job: [#{result.status}] #{errors.inspect} with attributes #{body}", 0
 
       msg = ""
-      j[:errors].each do |err|
+      errors.each do |err|
         msg += "Error creating job for component #{component}: #{err}\n"
       end
       msg += "Job submission was: #{body.to_json}"
 
       pipeline.log_stderr(msg)
-      nil
+      return nil, nil
     end
   end
 
@@ -364,22 +382,27 @@ class WhRunPipelineInstance
     @components.each do |componentname, component|
       component[:script_parameters].each do |parametername, parameter|
         parameter = { :value => parameter } unless parameter.is_a? Hash
-        value =
-          (params["#{componentname}::#{parametername}"] ||
-           parameter[:value] ||
-           (parameter[:output_of].nil? &&
-            (params[parametername.to_s] ||
-             parameter[:default])) ||
-           nil)
-        if value.nil? and
-            ![false,'false',0,'0'].index parameter[:required]
-          if parameter[:output_of]
-            if not @components[parameter[:output_of].intern]
-              errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
-            end
-            next
+        if params.has_key?("#{componentname}::#{parametername}")
+          value = params["#{componentname}::#{parametername}"]
+        elsif parameter.has_key?(:value)
+          value = parameter[:value]
+        elsif parameter.has_key?(:output_of)
+          if !@components[parameter[:output_of].intern]
+            errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
+          else
+            # value will be filled in later when the upstream
+            # component's output becomes known
           end
+          next
+        elsif params.has_key?(parametername.to_s)
+          value = params[parametername.to_s]
+        elsif parameter.has_key?(:default)
+          value = parameter[:default]
+        elsif [false, 'false', 0, '0'].index(parameter[:required])
+          value = nil
+        else
           errors << [componentname, parametername, "required parameter is missing"]
+          next
         end
         debuglog "parameter #{componentname}::#{parametername} == #{value}"
 
@@ -388,7 +411,10 @@ class WhRunPipelineInstance
       end
     end
     if !errors.empty?
-      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
+      all_errors = errors.collect do |c,p,e|
+        "#{c}::#{p} - #{e}\n"
+      end.join("")
+      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{all_errors}"
     end
     debuglog "options=" + @options.pretty_inspect
     self
@@ -455,7 +481,7 @@ class WhRunPipelineInstance
           # are fully specified (any output_of script_parameters are resolved
           # to real value)
           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
-          job = JobCache.create(@instance, cname, {
+          job, err = JobCache.create(@instance, cname, {
             :script => c[:script],
             :script_parameters => Hash[c[:script_parameters].map do |key, spec|
                                          [key, spec[:value]]
@@ -482,9 +508,11 @@ class WhRunPipelineInstance
             c[:job] = job
             c[:run_in_process] = (@options[:run_jobs_here] and
                                   job[:submit_id] == my_submit_id)
-          else
+          elsif err.nil?
             debuglog "component #{cname} new job failed", 0
             job_creation_failed += 1
+          else
+            debuglog "component #{cname} new job failed, err=#{err}", 0
           end
         end
 
@@ -649,7 +677,7 @@ class WhRunPipelineInstance
         @instance[:state] = 'Complete'
      else
         @instance[:state] = 'Paused'
-      end
+     end
     else
       if ended == @components.length or failed > 0
         @instance[:state] = success ? 'Complete' : 'Failed'