Merge branch '2492-docker-crunch-jobs'
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index 7578abc7b0099f8cbba81aaf4775e0ac15741ec0..0b0553cb843e559a8f12af37fa1c2a9181acb6cc 100755 (executable)
 # [--no-wait] Make only as much progress as possible without entering
 #             a sleep/poll loop.
 #
-# [--no-reuse-finished] Do not reuse existing outputs to satisfy
-#                       pipeline components. Always submit a new job
-#                       or use an existing job which has not yet
-#                       finished.
-#
 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
 #              components. Submit a new job for every component.
 #
@@ -153,10 +148,6 @@ p = Trollop::Parser.new do
       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
       :short => :none,
       :type => :boolean)
-  opt(:no_reuse_finished,
-      "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
-      :short => :none,
-      :type => :boolean)
   opt(:no_reuse,
       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
       :short => :none,
@@ -321,19 +312,19 @@ class JobCache
       []
     end
   end
-  def self.create(attributes)
+  def self.create(job, create_params)
     @cache ||= {}
     result = $client.execute(:api_method => $arvados.jobs.create,
                              :parameters => {
                                :api_token => ENV['ARVADOS_API_TOKEN'],
-                               :job => attributes.to_json
-                             },
+                               :job => job.to_json
+                             }.merge(create_params),
                              :authenticated => false)
     j = JSON.parse result.body, :symbolize_names => true
     if j.is_a? Hash and j[:uuid]
       @cache[j[:uuid]] = j
     else
-      debuglog "create job: #{j[:errors] rescue nil}", 0
+      debuglog "create job: #{j[:errors] rescue nil} with attributes #{job}", 0
       nil
     end
   end
@@ -435,88 +426,53 @@ class WhRunPipelineInstance
       moretodo = false
       @components.each do |cname, c|
         job = nil
+        # Is the job satisfying this component already known to be
+        # finished? (Already meaning "before we query API server about
+        # the job's current state")
         c_already_finished = (c[:job] &&
                               c[:job][:uuid] &&
                               !c[:job][:success].nil?)
         if !c[:job] and
-            c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
-          # Job is fully specified (all parameter values are present) but
-          # no particular job has been found.
-
-          debuglog "component #{cname} ready to satisfy."
-
-          c.delete :wait
-          second_place_job = nil # satisfies component, but not finished yet
-
-          (@options[:no_reuse] ? [] : JobCache.
-           where(script: c[:script],
-                 script_parameters: c[:script_parameters],
-                 script_version_descends_from: c[:script_version])
-           ).each do |candidate_job|
-            candidate_params_downcase = Hash[candidate_job[:script_parameters].
-                                             map { |k,v| [k.downcase,v] }]
-            c_params_downcase = Hash[c[:script_parameters].
-                                     map { |k,v| [k.downcase,v] }]
-
-            debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
-
-            unless candidate_params_downcase == c_params_downcase
-              next
-            end
-
-            if c[:script_version] !=
-                candidate_job[:script_version][0,c[:script_version].length]
-              debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2
-              next
-            end
-
-            unless candidate_job[:success] || candidate_job[:running] ||
-                (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
-              debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
-              next
-            end
-
-            if candidate_job[:success]
-              unless @options[:no_reuse_finished]
-                job = candidate_job
-                $stderr.puts "using #{job[:uuid]} (finished at #{job[:finished_at]}) for component #{cname}"
-                c[:job] = job
-              end
-            else
-              second_place_job ||= candidate_job
-            end
-            break
-          end
-          if not c[:job] and second_place_job
-            job = second_place_job
-            $stderr.puts "using #{job[:uuid]} (running since #{job[:started_at]}) for component #{cname}"
+            c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
+          # No job yet associated with this component and is component inputs
+          # are fully specified (any output_of script_parameters are resolved
+          # to real value)
+          job = JobCache.create({
+            :script => c[:script],
+            :script_parameters => c[:script_parameters],
+            :script_version => c[:script_version],
+            :repository => c[:repository],
+            :nondeterministic => c[:nondeterministic],
+            :output_is_persistent => c[:output_is_persistent] || false,
+            # TODO: Delete the following three attributes when
+            # supporting pre-20140418 API servers is no longer
+            # important. New API servers take these as flags that
+            # control behavior of create, rather than job attributes.
+            :minimum_script_version => c[:minimum_script_version],
+            :exclude_script_versions => c[:exclude_minimum_script_versions],
+            :no_reuse => @options[:no_reuse] || c[:nondeterministic],
+          }, {
+            # This is the right place to put these attributes when
+            # dealing with new API servers.
+            :minimum_script_version => c[:minimum_script_version],
+            :exclude_script_versions => c[:exclude_minimum_script_versions],
+            :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]),
+          })
+          if job
+            debuglog "component #{cname} new job #{job[:uuid]}"
             c[:job] = job
+          else
+            debuglog "component #{cname} new job failed"
           end
-          if not c[:job]
-            debuglog "component #{cname} not satisfied by any existing job."
-            if !@options[:dry_run]
-              debuglog "component #{cname} new job."
-              job = JobCache.create(:script => c[:script],
-                                    :script_parameters => c[:script_parameters],
-                                    :runtime_constraints => c[:runtime_constraints] || {},
-                                    :script_version => c[:script_version] || 'master',
-                                    :output_is_persistent => c[:output_is_persistent] || false)
-              if job
-                debuglog "component #{cname} new job #{job[:uuid]}"
-                c[:job] = job
-              else
-                debuglog "component #{cname} new job failed"
-              end
-            end
-          end
-        else
-          c[:wait] = true
         end
+
         if c[:job] and c[:job][:uuid]
           if (c[:job][:running] or
               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
+            # Job is running so update copy of job record
             c[:job] = JobCache.get(c[:job][:uuid])
           end
+
           if c[:job][:success]
             # Populate script_parameters of other components waiting for
             # this job
@@ -530,10 +486,12 @@ class WhRunPipelineInstance
               end
             end
             unless c_already_finished
+              # This is my first time discovering that the job
+              # succeeded. (At the top of this loop, I was still
+              # waiting for it to finish.)
               if c[:output_is_persistent]
-                # This is my first time discovering that the job
-                # succeeded. I need to make sure a resources/wants
-                # link is in place to protect the output from garbage
+                # I need to make sure a resources/wants link is in
+                # place to protect the output from garbage
                 # collection. (Normally Crunch does this for me, but
                 # here I might be reusing the output of someone else's
                 # job and I need to make sure it's understood that the
@@ -566,6 +524,7 @@ class WhRunPipelineInstance
             end
           elsif c[:job][:running] ||
               (!c[:job][:started_at] && !c[:job][:cancelled_at])
+            # Job is still running
             moretodo = true
           elsif c[:job][:cancelled_at]
             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
@@ -605,7 +564,7 @@ class WhRunPipelineInstance
         end
       end
     end
-    
+
     if ended == @components.length or failed > 0
       @instance[:active] = false
       @instance[:success] = (succeeded == @components.length)