Merge branch '11917-dont-clear-cache'
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index 3e72658d476020bd6bf2cf8e58b5b1afb7a6b65a..b66e9c0526e3a9b7926b381d9fee7ec8cbb6b901 100755 (executable)
@@ -1,63 +1,8 @@
 #!/usr/bin/env ruby
-
-# == Synopsis
-#
-#  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
-#  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
-#
-# Satisfy a pipeline template by finding or submitting a mapreduce job
-# for each pipeline component.
-#
-# == Options
-#
-# [--template uuid] Use the specified pipeline template.
-#
-# [--template path] Load the pipeline template from the specified
-#                   local file.
-#
-# [--instance uuid] Use the specified pipeline instance.
-#
-# [-n, --dry-run] Do not start any new jobs or wait for existing jobs
-#                 to finish. Just find out whether jobs are finished,
-#                 queued, or running for each component
-#
-# [--submit] Do not try to satisfy any components. Just
-#                          create an instance, print its UUID to
-#                          stdout, and exit.
-#
-# [--no-wait] Make only as much progress as possible without entering
-#             a sleep/poll loop.
-#
-# [--no-reuse] Do not reuse existing jobs to satisfy pipeline
-#              components. Submit a new job for every component.
-#
-# [--debug] Print extra debugging information on stderr.
-#
-# [--debug-level N] Increase amount of debugging information. Default
-#                   1, possible range 0..3.
-#
-# [--status-text path] Print plain text status report to a file or
-#                      fifo. Default: /dev/stdout
-#
-# [--status-json path] Print JSON status report to a file or
-#                      fifo. Default: /dev/null
-#
-# [--description] Description for the pipeline instance.
-#
-# == Parameters
-#
-# [param_name=param_value]
-#
-# [param_name param_value] Set (or override) the default value for
-#                          every parameter with the given name.
-#
-# [component_name::param_name=param_value]
-# [component_name::param_name param_value]
-# [--component_name::param_name=param_value]
-# [--component_name::param_name param_value] Set the value of a
-#                                            parameter for a single
-#                                            component.
+# Copyright (C) The Arvados Authors. All rights reserved.
 #
+# SPDX-License-Identifier: Apache-2.0
+
 class WhRunPipelineInstance
 end
 
@@ -75,7 +20,7 @@ begin
   require 'trollop'
   require 'google/api_client'
 rescue LoadError => l
-  puts $:
+  $stderr.puts $:
   abort <<-EOS
 #{$0}: fatal: #{l.message}
 Some runtime dependencies may be missing.
@@ -92,6 +37,28 @@ end
 
 p = Trollop::Parser.new do
   version __FILE__
+  banner(<<EOF)
+
+Usage:
+  arv-run-pipeline-instance --template TEMPLATE_UUID [options] [--] [parameters]
+  arv-run-pipeline-instance --instance INSTANCE_UUID [options] [--] [parameters]
+
+Parameters:
+  param_name=param_value
+  param_name param_value
+                         Set (or override) the default value for every
+                         pipeline component parameter with the given
+                         name.
+
+  component_name::param_name=param_value
+  component_name::param_name param_value
+  --component_name::param_name=param_value
+  --component_name::param_name param_value
+                         Set the value of a parameter for a single
+                         pipeline component.
+
+Options:
+EOF
   opt(:dry_run,
       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
       :type => :boolean,
@@ -168,7 +135,7 @@ if $options[:instance]
     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
   end
 elsif not $options[:template]
-  puts "error: you must supply a --template or --instance."
+  $stderr.puts "error: you must supply a --template or --instance."
   p.educate
   abort
 end
@@ -293,31 +260,46 @@ class JobCache
       []
     end
   end
+
+  # create() returns [job, exception]. If both job and exception are
+  # nil, there was a non-retryable error and the call should not be
+  # attempted again.
   def self.create(pipeline, component, job, create_params)
     @cache ||= {}
 
     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
 
-    result = $client.execute(:api_method => $arvados.jobs.create,
-                             :body_object => body,
-                             :authenticated => false,
-                             :headers => {
-                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
-                             })
-    j = JSON.parse result.body, :symbolize_names => true
-    if j.is_a? Hash and j[:uuid]
+    result = nil
+    begin
+      result = $client.execute(
+        :api_method => $arvados.jobs.create,
+        :body_object => body,
+        :authenticated => false,
+        :headers => {
+          authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
+        })
+      if result.status == 429 || result.status >= 500
+        raise Exception.new("HTTP status #{result.status}")
+      end
+    rescue Exception => e
+      return nil, e
+    end
+    j = JSON.parse(result.body, :symbolize_names => true) rescue nil
+    if result.status == 200 && j.is_a?(Hash) && j[:uuid]
       @cache[j[:uuid]] = j
+      return j, nil
     else
-      debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
+      errors = j[:errors] rescue []
+      debuglog "create job: [#{result.status}] #{errors.inspect} with attributes #{body}", 0
 
       msg = ""
-      j[:errors].each do |err|
+      errors.each do |err|
         msg += "Error creating job for component #{component}: #{err}\n"
       end
       msg += "Job submission was: #{body.to_json}"
 
       pipeline.log_stderr(msg)
-      nil
+      return nil, nil
     end
   end
 
@@ -400,22 +382,27 @@ class WhRunPipelineInstance
     @components.each do |componentname, component|
       component[:script_parameters].each do |parametername, parameter|
         parameter = { :value => parameter } unless parameter.is_a? Hash
-        value =
-          (params["#{componentname}::#{parametername}"] ||
-           parameter[:value] ||
-           (parameter[:output_of].nil? &&
-            (params[parametername.to_s] ||
-             parameter[:default])) ||
-           nil)
-        if value.nil? and
-            ![false,'false',0,'0'].index parameter[:required]
-          if parameter[:output_of]
-            if not @components[parameter[:output_of].intern]
-              errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
-            end
-            next
+        if params.has_key?("#{componentname}::#{parametername}")
+          value = params["#{componentname}::#{parametername}"]
+        elsif parameter.has_key?(:value)
+          value = parameter[:value]
+        elsif parameter.has_key?(:output_of)
+          if !@components[parameter[:output_of].intern]
+            errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
+          else
+            # value will be filled in later when the upstream
+            # component's output becomes known
           end
+          next
+        elsif params.has_key?(parametername.to_s)
+          value = params[parametername.to_s]
+        elsif parameter.has_key?(:default)
+          value = parameter[:default]
+        elsif [false, 'false', 0, '0'].index(parameter[:required])
+          value = nil
+        else
           errors << [componentname, parametername, "required parameter is missing"]
+          next
         end
         debuglog "parameter #{componentname}::#{parametername} == #{value}"
 
@@ -424,7 +411,10 @@ class WhRunPipelineInstance
       end
     end
     if !errors.empty?
-      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
+      all_errors = errors.collect do |c,p,e|
+        "#{c}::#{p} - #{e}\n"
+      end.join("")
+      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{all_errors}"
     end
     debuglog "options=" + @options.pretty_inspect
     self
@@ -491,7 +481,7 @@ class WhRunPipelineInstance
           # are fully specified (any output_of script_parameters are resolved
           # to real value)
           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
-          job = JobCache.create(@instance, cname, {
+          job, err = JobCache.create(@instance, cname, {
             :script => c[:script],
             :script_parameters => Hash[c[:script_parameters].map do |key, spec|
                                          [key, spec[:value]]
@@ -518,9 +508,11 @@ class WhRunPipelineInstance
             c[:job] = job
             c[:run_in_process] = (@options[:run_jobs_here] and
                                   job[:submit_id] == my_submit_id)
-          else
+          elsif err.nil?
             debuglog "component #{cname} new job failed", 0
             job_creation_failed += 1
+          else
+            debuglog "component #{cname} new job failed, err=#{err}", 0
           end
         end
 
@@ -685,7 +677,7 @@ class WhRunPipelineInstance
         @instance[:state] = 'Complete'
      else
         @instance[:state] = 'Paused'
-      end
+     end
     else
       if ended == @components.length or failed > 0
         @instance[:state] = success ? 'Complete' : 'Failed'