Merge branch '11843-arpi-transient-error'
authorTom Clegg <tom@curoverse.com>
Fri, 7 Jul 2017 13:09:41 +0000 (09:09 -0400)
committerTom Clegg <tom@curoverse.com>
Fri, 7 Jul 2017 13:13:57 +0000 (09:13 -0400)
refs #11843

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curoverse.com>

sdk/cli/bin/arv-run-pipeline-instance

index bccfdac0b8cb43daa2884ae9b318b6a1ac105506..b66e9c0526e3a9b7926b381d9fee7ec8cbb6b901 100755 (executable)
@@ -260,31 +260,46 @@ class JobCache
       []
     end
   end
+
+  # create() returns [job, exception]. If both job and exception are
+  # nil, there was a non-retryable error and the call should not be
+  # attempted again.
   def self.create(pipeline, component, job, create_params)
     @cache ||= {}
 
     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
 
-    result = $client.execute(:api_method => $arvados.jobs.create,
-                             :body_object => body,
-                             :authenticated => false,
-                             :headers => {
-                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
-                             })
-    j = JSON.parse result.body, :symbolize_names => true
-    if j.is_a? Hash and j[:uuid]
+    result = nil
+    begin
+      result = $client.execute(
+        :api_method => $arvados.jobs.create,
+        :body_object => body,
+        :authenticated => false,
+        :headers => {
+          authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
+        })
+      if result.status == 429 || result.status >= 500
+        raise Exception.new("HTTP status #{result.status}")
+      end
+    rescue Exception => e
+      return nil, e
+    end
+    j = JSON.parse(result.body, :symbolize_names => true) rescue nil
+    if result.status == 200 && j.is_a?(Hash) && j[:uuid]
       @cache[j[:uuid]] = j
+      return j, nil
     else
-      debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
+      errors = j[:errors] rescue []
+      debuglog "create job: [#{result.status}] #{errors.inspect} with attributes #{body}", 0
 
       msg = ""
-      j[:errors].each do |err|
+      errors.each do |err|
         msg += "Error creating job for component #{component}: #{err}\n"
       end
       msg += "Job submission was: #{body.to_json}"
 
       pipeline.log_stderr(msg)
-      nil
+      return nil, nil
     end
   end
 
@@ -396,7 +411,10 @@ class WhRunPipelineInstance
       end
     end
     if !errors.empty?
-      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
+      all_errors = errors.collect do |c,p,e|
+        "#{c}::#{p} - #{e}\n"
+      end.join("")
+      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{all_errors}"
     end
     debuglog "options=" + @options.pretty_inspect
     self
@@ -463,7 +481,7 @@ class WhRunPipelineInstance
           # are fully specified (any output_of script_parameters are resolved
           # to real value)
           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
-          job = JobCache.create(@instance, cname, {
+          job, err = JobCache.create(@instance, cname, {
             :script => c[:script],
             :script_parameters => Hash[c[:script_parameters].map do |key, spec|
                                          [key, spec[:value]]
@@ -490,9 +508,11 @@ class WhRunPipelineInstance
             c[:job] = job
             c[:run_in_process] = (@options[:run_jobs_here] and
                                   job[:submit_id] == my_submit_id)
-          else
+          elsif err.nil?
             debuglog "component #{cname} new job failed", 0
             job_creation_failed += 1
+          else
+            debuglog "component #{cname} new job failed, err=#{err}", 0
           end
         end
 
@@ -657,7 +677,7 @@ class WhRunPipelineInstance
         @instance[:state] = 'Complete'
      else
         @instance[:state] = 'Paused'
-      end
+     end
     else
       if ended == @components.length or failed > 0
         @instance[:state] = success ? 'Complete' : 'Failed'