11843: Do not fail pipelines on transient API failures.
authorTom Clegg <tom@curoverse.com>
Wed, 5 Jul 2017 15:33:36 +0000 (11:33 -0400)
committerTom Clegg <tom@curoverse.com>
Wed, 5 Jul 2017 15:33:36 +0000 (11:33 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curoverse.com>

sdk/cli/bin/arv-run-pipeline-instance

index 7d16a69ae0fee817a42b3413441e7f007020959b..b66e9c0526e3a9b7926b381d9fee7ec8cbb6b901 100755 (executable)
@@ -260,31 +260,46 @@ class JobCache
       []
     end
   end
+
+  # create() returns [job, exception]. If both job and exception are
+  # nil, there was a non-retryable error and the call should not be
+  # attempted again.
   def self.create(pipeline, component, job, create_params)
     @cache ||= {}
 
     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
 
-    result = $client.execute(:api_method => $arvados.jobs.create,
-                             :body_object => body,
-                             :authenticated => false,
-                             :headers => {
-                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
-                             })
-    j = JSON.parse result.body, :symbolize_names => true
-    if j.is_a? Hash and j[:uuid]
+    result = nil
+    begin
+      result = $client.execute(
+        :api_method => $arvados.jobs.create,
+        :body_object => body,
+        :authenticated => false,
+        :headers => {
+          authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
+        })
+      if result.status == 429 || result.status >= 500
+        raise Exception.new("HTTP status #{result.status}")
+      end
+    rescue Exception => e
+      return nil, e
+    end
+    j = JSON.parse(result.body, :symbolize_names => true) rescue nil
+    if result.status == 200 && j.is_a?(Hash) && j[:uuid]
       @cache[j[:uuid]] = j
+      return j, nil
     else
-      debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
+      errors = j[:errors] rescue []
+      debuglog "create job: [#{result.status}] #{errors.inspect} with attributes #{body}", 0
 
       msg = ""
-      j[:errors].each do |err|
+      errors.each do |err|
         msg += "Error creating job for component #{component}: #{err}\n"
       end
       msg += "Job submission was: #{body.to_json}"
 
       pipeline.log_stderr(msg)
-      nil
+      return nil, nil
     end
   end
 
@@ -466,7 +481,7 @@ class WhRunPipelineInstance
           # are fully specified (any output_of script_parameters are resolved
           # to real value)
           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
-          job = JobCache.create(@instance, cname, {
+          job, err = JobCache.create(@instance, cname, {
             :script => c[:script],
             :script_parameters => Hash[c[:script_parameters].map do |key, spec|
                                          [key, spec[:value]]
@@ -493,9 +508,11 @@ class WhRunPipelineInstance
             c[:job] = job
             c[:run_in_process] = (@options[:run_jobs_here] and
                                   job[:submit_id] == my_submit_id)
-          else
+          elsif err.nil?
             debuglog "component #{cname} new job failed", 0
             job_creation_failed += 1
+          else
+            debuglog "component #{cname} new job failed, err=#{err}", 0
           end
         end