Merge branch 'master' into 2352-remove-attrs
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index a3e2c4b8205b0464833fbde20fe57c9d70beca57..b86b217194ee16f0dc0498201583a8ffa0fcfc5e 100755 (executable)
@@ -2,8 +2,8 @@
 
 # == Synopsis
 #
-#  wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
-#  wh-run-pipeline-instance --instance pipeline-instance-uuid [options]
+#  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
+#  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
 #
 # Satisfy a pipeline template by finding or submitting a mapreduce job
 # for each pipeline component.
 #
 # [--template uuid] Use the specified pipeline template.
 #
+# [--template path] Load the pipeline template from the specified
+#                   local file.
+#
 # [--instance uuid] Use the specified pipeline instance.
 #
 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
 #                 to finish. Just find out whether jobs are finished,
 #                 queued, or running for each component
 #
-# [--create-only] Do not try to satisfy any components. Just create an
-#                 instance, print its UUID to stdout, and exit.
+# [--submit] Do not try to satisfy any components. Just
+#                          create an instance, print its UUID to
+#                          stdout, and exit.
 #
 # [--no-wait] Make only as much progress as possible without entering
 #             a sleep/poll loop.
 #
+# [--no-reuse] Do not reuse existing jobs to satisfy pipeline
+#              components. Submit a new job for every component.
+#
 # [--debug] Print extra debugging information on stderr.
 #
 # [--debug-level N] Increase amount of debugging information. Default
@@ -67,15 +74,18 @@ $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
 
 begin
+  require 'arvados'
   require 'rubygems'
-  require 'google/api_client'
   require 'json'
   require 'pp'
   require 'trollop'
-rescue LoadError
+  require 'google/api_client'
+rescue LoadError => l
+  puts $:
   abort <<-EOS
-#{$0}: fatal: some runtime dependencies are missing.
-Try: gem install pp google-api-client json trollop
+#{$0}: fatal: #{l.message}
+Some runtime dependencies may be missing.
+Try: gem install arvados pp google-api-client json trollop
   EOS
 end
 
@@ -119,6 +129,7 @@ end
 # this program, that is, not the pipeline component parameters).
 
 p = Trollop::Parser.new do
+  version __FILE__
   opt(:dry_run,
       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
       :type => :boolean,
@@ -137,6 +148,10 @@ p = Trollop::Parser.new do
       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
       :short => :none,
       :type => :boolean)
+  opt(:no_reuse,
+      "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
+      :short => :none,
+      :type => :boolean)
   opt(:debug,
       "Print extra debugging information on stderr.",
       :type => :boolean)
@@ -145,17 +160,21 @@ p = Trollop::Parser.new do
       :short => :none,
       :type => :integer)
   opt(:template,
-      "UUID of pipeline template.",
+      "UUID of pipeline template, or path to local pipeline template file.",
       :short => :none,
       :type => :string)
   opt(:instance,
       "UUID of pipeline instance.",
       :short => :none,
       :type => :string)
-  opt(:create_only,
+  opt(:submit,
       "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
       :short => :none,
       :type => :boolean)
+  opt(:run_here,
+      "Manage the pipeline in process.",
+      :short => :none,
+      :type => :boolean)
   stop_on [:'--']
 end
 $options = Trollop::with_standard_exception_handling p do
@@ -164,13 +183,33 @@ end
 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
 
 if $options[:instance]
-  if $options[:template] or $options[:create_only]
-    abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-only."
+  if $options[:template] or $options[:submit]
+    abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
   end
 elsif not $options[:template]
   abort "#{$0}: syntax error: you must supply a --template or --instance."
 end
 
+if $options[:run_here] == $options[:submit]
+  abort "#{$0}: syntax error: you must supply either --run-here or --submit."
+end
+
+# Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
+
+module Kernel
+  def suppress_warnings
+    original_verbosity = $VERBOSE
+    $VERBOSE = nil
+    result = yield
+    $VERBOSE = original_verbosity
+    return result
+  end
+end
+
+if ENV['ARVADOS_API_HOST_INSECURE']
+  suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
+end
+
 # Set up the API client.
 
 $client ||= Google::APIClient.
@@ -178,15 +217,18 @@ $client ||= Google::APIClient.
       :application_name => File.split($0).last,
       :application_version => $application_version.to_s)
 $arvados = $client.discovered_api('arvados', $arvados_api_version)
+$arv = Arvados.new api_version: 'v1'
 
 
 class PipelineInstance
   def self.find(uuid)
     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
                              :parameters => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
                                :uuid => uuid
                              },
+                             :body => {
+                               :api_token => ENV['ARVADOS_API_TOKEN']
+                             },
                              :authenticated => false)
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
@@ -199,9 +241,9 @@ class PipelineInstance
   end
   def self.create(attributes)
     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
-                             :parameters => {
+                             :body => {
                                :api_token => ENV['ARVADOS_API_TOKEN'],
-                               :pipeline_instance => attributes.to_json
+                               :pipeline_instance => attributes
                              },
                              :authenticated => false)
     j = JSON.parse result.body, :symbolize_names => true
@@ -214,8 +256,10 @@ class PipelineInstance
   def save
     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
                              :parameters => {
+                               :uuid => @pi[:uuid]
+                             },
+                             :body => {
                                :api_token => ENV['ARVADOS_API_TOKEN'],
-                               :uuid => @pi[:uuid],
                                :pipeline_instance => @attributes_to_update.to_json
                              },
                              :authenticated => false)
@@ -268,19 +312,19 @@ class JobCache
       []
     end
   end
-  def self.create(attributes)
+  def self.create(job, create_params)
     @cache ||= {}
     result = $client.execute(:api_method => $arvados.jobs.create,
                              :parameters => {
                                :api_token => ENV['ARVADOS_API_TOKEN'],
-                               :job => attributes.to_json
-                             },
+                               :job => job.to_json
+                             }.merge(create_params),
                              :authenticated => false)
     j = JSON.parse result.body, :symbolize_names => true
     if j.is_a? Hash and j[:uuid]
       @cache[j[:uuid]] = j
     else
-      debuglog "create job: #{j[:errors] rescue nil}"
+      debuglog "create job: #{j[:errors] rescue nil} with attributes #{job}", 0
       nil
     end
   end
@@ -293,16 +337,25 @@ class WhRunPipelineInstance
     @options = _options
   end
 
-  def fetch_template(template_uuid)
-    result = $client.execute(:api_method => $arvados.pipeline_templates.get,
-                             :parameters => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
-                               :uuid => template_uuid
-                             },
-                             :authenticated => false)
-    @template = JSON.parse result.body, :symbolize_names => true
-    if !@template[:uuid]
-      abort "#{$0}: fatal: failed to retrieve pipeline template #{template_uuid} #{@template[:errors].inspect rescue nil}"
+  def fetch_template(template)
+    if template.match /[^-0-9a-z]/
+      # Doesn't look like a uuid -- use it as a filename.
+      @template = JSON.parse File.read(template), :symbolize_names => true
+      if !@template[:components]
+        abort ("#{$0}: Template loaded from #{template} " +
+               "does not have a \"components\" key")
+      end
+    else
+      result = $client.execute(:api_method => $arvados.pipeline_templates.get,
+                               :parameters => {
+                                 :api_token => ENV['ARVADOS_API_TOKEN'],
+                                 :uuid => template
+                               },
+                               :authenticated => false)
+      @template = JSON.parse result.body, :symbolize_names => true
+      if !@template[:uuid]
+        abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
+      end
     end
     self
   end
@@ -317,7 +370,7 @@ class WhRunPipelineInstance
     params_args.shift if params_args[0] == '--'
     params = {}
     while !params_args.empty?
-      if (re = params_args[0].match /^(--)?([^-].*?)=(.)/)
+      if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
         params[re[2]] = re[3]
         params_args.shift
       elsif params_args.size > 1
@@ -360,86 +413,75 @@ class WhRunPipelineInstance
   end
 
   def setup_instance
-    @instance ||= PipelineInstance.
-      create(:components => @components,
+    if $options[:submit]
+      @instance ||= PipelineInstance.
+        create(:components => @components,
+              :pipeline_template_uuid => @template[:uuid],
+              :state => 'New')
+    else
+      @instance ||= PipelineInstance.
+        create(:components => @components,
              :pipeline_template_uuid => @template[:uuid],
-             :active => true)
+             :state => 'RunningOnClient')
+    end
     self
   end
 
   def run
     moretodo = true
+    interrupted = false
+
     while moretodo
       moretodo = false
       @components.each do |cname, c|
         job = nil
+        # Is the job satisfying this component already known to be
+        # finished? (Already meaning "before we query API server about
+        # the job's current state")
+        c_already_finished = (c[:job] &&
+                              c[:job][:uuid] &&
+                              !c[:job][:success].nil?)
         if !c[:job] and
-            c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
-          # Job is fully specified (all parameter values are present) but
-          # no particular job has been found.
-
-          debuglog "component #{cname} ready to satisfy."
-
-          c.delete :wait
-          second_place_job = nil # satisfies component, but not finished yet
-          JobCache.where(:script => c[:script],
-                         :script_parameters => c[:script_parameters],
-                         :script_version_descends_from => c[:script_version_descends_from]).
-            each do |candidate_job|
-            candidate_params_downcase = Hash[candidate_job[:script_parameters].
-                                             map { |k,v| [k.downcase,v] }]
-            c_params_downcase = Hash[c[:script_parameters].
-                                     map { |k,v| [k.downcase,v] }]
-
-            debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
-
-            unless candidate_params_downcase == c_params_downcase
-              next
-            end
-
-            unless candidate_job[:success] || candidate_job[:running] ||
-                (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
-              debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
-              next
-            end
-
-            if candidate_job[:success]
-              job = candidate_job
-              debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
-              c[:job] = job
-            else
-              second_place_job ||= candidate_job
-            end
-            break
-          end
-          if not c[:job] and second_place_job
-            job = second_place_job
-            debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
+            c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
+          # No job yet associated with this component and is component inputs
+          # are fully specified (any output_of script_parameters are resolved
+          # to real value)
+          job = JobCache.create({
+            :script => c[:script],
+            :script_parameters => c[:script_parameters],
+            :script_version => c[:script_version],
+            :repository => c[:repository],
+            :nondeterministic => c[:nondeterministic],
+            :output_is_persistent => c[:output_is_persistent] || false,
+            # TODO: Delete the following three attributes when
+            # supporting pre-20140418 API servers is no longer
+            # important. New API servers take these as flags that
+            # control behavior of create, rather than job attributes.
+            :minimum_script_version => c[:minimum_script_version],
+            :exclude_script_versions => c[:exclude_minimum_script_versions],
+            :no_reuse => @options[:no_reuse] || c[:nondeterministic],
+          }, {
+            # This is the right place to put these attributes when
+            # dealing with new API servers.
+            :minimum_script_version => c[:minimum_script_version],
+            :exclude_script_versions => c[:exclude_minimum_script_versions],
+            :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]),
+          })
+          if job
+            debuglog "component #{cname} new job #{job[:uuid]}"
             c[:job] = job
+          else
+            debuglog "component #{cname} new job failed"
           end
-          if not c[:job]
-            debuglog "component #{cname} not satisfied by any existing job."
-            if !@options[:dry_run]
-              debuglog "component #{cname} new job."
-              job = JobCache.create(:script => c[:script],
-                                    :script_parameters => c[:script_parameters],
-                                    :resource_limits => c[:resource_limits] || {},
-                                    :script_version => c[:script_version] || 'master')
-              if job
-                debuglog "component #{cname} new job #{job[:uuid]}"
-                c[:job] = job
-              else
-                debuglog "component #{cname} new job failed: #{job[:errors]}"
-              end
-            end
-          end
-        else
-          c[:wait] = true
         end
+
         if c[:job] and c[:job][:uuid]
-          if not c[:job][:finished_at] and not c[:job][:cancelled_at]
+          if (c[:job][:running] or
+              not (c[:job][:finished_at] or c[:job][:cancelled_at]))
+            # Job is running so update copy of job record
             c[:job] = JobCache.get(c[:job][:uuid])
           end
+
           if c[:job][:success]
             # Populate script_parameters of other components waiting for
             # this job
@@ -448,29 +490,114 @@ class WhRunPipelineInstance
                 if p.is_a? Hash and p[:output_of] == cname.to_s
                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
                   c2[:script_parameters][pname] = c[:job][:output]
+                  moretodo = true
+                end
+              end
+            end
+            unless c_already_finished
+              # This is my first time discovering that the job
+              # succeeded. (At the top of this loop, I was still
+              # waiting for it to finish.)
+              if c[:output_is_persistent]
+                # I need to make sure a resources/wants link is in
+                # place to protect the output from garbage
+                # collection. (Normally Crunch does this for me, but
+                # here I might be reusing the output of someone else's
+                # job and I need to make sure it's understood that the
+                # output is valuable to me, too.)
+                wanted = c[:job][:output]
+                debuglog "checking for existing persistence link for #{wanted}"
+                @my_user_uuid ||= $arv.user.current[:uuid]
+                links = $arv.link.list(limit: 1,
+                                       filters:
+                                       [%w(link_class = resources),
+                                        %w(name = wants),
+                                        %w(tail_uuid =) + [@my_user_uuid],
+                                        %w(head_uuid =) + [wanted]
+                                       ])[:items]
+                if links.any?
+                  debuglog "link already exists, uuid #{links.first[:uuid]}"
+                else
+                  newlink = $arv.link.create link: \
+                  {
+                    link_class: 'resources',
+                    name: 'wants',
+                    tail_kind: 'arvados#user',
+                    tail_uuid: @my_user_uuid,
+                    head_kind: 'arvados#collection',
+                    head_uuid: wanted
+                  }
+                  debuglog "added link, uuid #{newlink[:uuid]}"
                 end
               end
             end
           elsif c[:job][:running] ||
               (!c[:job][:started_at] && !c[:job][:cancelled_at])
-            moretodo ||= !@options[:no_wait]
+            # Job is still running
+            moretodo = true
           elsif c[:job][:cancelled_at]
             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
           end
         end
       end
       @instance[:components] = @components
-      @instance[:active] = moretodo
       report_status
-      sleep 10 if moretodo
+
+      if @options[:no_wait]
+        moretodo = false
+      end
+
+      if moretodo
+        begin
+          sleep 10
+        rescue Interrupt
+          debuglog "interrupt", 0
+          interrupted = true
+          break
+        end
+      end
+    end
+
+    ended = 0
+    succeeded = 0
+    failed = 0
+    @components.each do |cname, c|
+      if c[:job]
+        if c[:job][:finished_at]
+          ended += 1
+          if c[:job][:success] == true
+            succeeded += 1
+          elsif c[:job][:success] == false
+            failed += 1
+          end
+        end
+      end
     end
-    @instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty?
+
+    success = (succeeded == @components.length)
+
+    if interrupted
+     if success
+        @instance[:state] = 'Complete'
+     else
+        @instance[:state] = 'Paused'
+      end
+    else
+      if ended == @components.length or failed > 0
+        @instance[:state] = success ? 'Complete' : 'Failed'
+      end
+    end
+
+    # set components_summary
+    components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
+    @instance[:components_summary] = components_summary
+
     @instance.save
   end
 
   def cleanup
-    if @instance
-      @instance[:active] = false
+    if @instance and @instance[:state] == 'RunningOnClient'
+      @instance[:state] = 'Paused'
       @instance.save
     end
   end
@@ -492,6 +619,7 @@ class WhRunPipelineInstance
 
     if @options[:status_text] != '/dev/null'
       File.open(@options[:status_text], 'w') do |f|
+        f.puts ""
         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
         namewidth = @components.collect { |cname, c| cname.size }.max
         @components.each do |cname, c|
@@ -526,7 +654,7 @@ begin
   end
   runner.apply_parameters(p.leftovers)
   runner.setup_instance
-  if $options[:create_only]
+  if $options[:submit]
     runner.instance.save
     puts runner.instance[:uuid]
   else