Put back missing parentheses.
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index 56e655f34fe98e1bb4ab46cbd6ec467629c805c0..0b0553cb843e559a8f12af37fa1c2a9181acb6cc 100755 (executable)
@@ -74,6 +74,7 @@ $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
 
 begin
+  require 'arvados'
   require 'rubygems'
   require 'json'
   require 'pp'
@@ -84,7 +85,7 @@ rescue LoadError => l
   abort <<-EOS
 #{$0}: fatal: #{l.message}
 Some runtime dependencies may be missing.
-Try: gem install pp google-api-client json trollop
+Try: gem install arvados pp google-api-client json trollop
   EOS
 end
 
@@ -216,6 +217,7 @@ $client ||= Google::APIClient.
       :application_name => File.split($0).last,
       :application_version => $application_version.to_s)
 $arvados = $client.discovered_api('arvados', $arvados_api_version)
+$arv = Arvados.new api_version: 'v1'
 
 
 class PipelineInstance
@@ -310,19 +312,19 @@ class JobCache
       []
     end
   end
-  def self.create(attributes)
+  def self.create(job, create_params)
     @cache ||= {}
     result = $client.execute(:api_method => $arvados.jobs.create,
                              :parameters => {
                                :api_token => ENV['ARVADOS_API_TOKEN'],
-                               :job => attributes.to_json
-                             },
+                               :job => job.to_json
+                             }.merge(create_params),
                              :authenticated => false)
     j = JSON.parse result.body, :symbolize_names => true
     if j.is_a? Hash and j[:uuid]
       @cache[j[:uuid]] = j
     else
-      debuglog "create job: #{j[:errors] rescue nil} with attribute #{attributes}", 0
+      debuglog "create job: #{j[:errors] rescue nil} with attributes #{job}", 0
       nil
     end
   end
@@ -424,19 +426,38 @@ class WhRunPipelineInstance
       moretodo = false
       @components.each do |cname, c|
         job = nil
-
+        # Is the job satisfying this component already known to be
+        # finished? (Already meaning "before we query API server about
+        # the job's current state")
+        c_already_finished = (c[:job] &&
+                              c[:job][:uuid] &&
+                              !c[:job][:success].nil?)
         if !c[:job] and
             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
           # No job yet associated with this component and is component inputs
           # are fully specified (any output_of script_parameters are resolved
           # to real value)
-          job = JobCache.create({:script => c[:script],
-                            :script_parameters => c[:script_parameters],
-                            :script_version => c[:script_version],
-                            :minimum_script_version => c[:minimum_script_version],
-                            :exclude_script_versions => c[:exclude_minimum_script_versions],
-                            :nondeterministic => c[:nondeterministic],
-                            :no_reuse => @options[:no_reuse]})
+          job = JobCache.create({
+            :script => c[:script],
+            :script_parameters => c[:script_parameters],
+            :script_version => c[:script_version],
+            :repository => c[:repository],
+            :nondeterministic => c[:nondeterministic],
+            :output_is_persistent => c[:output_is_persistent] || false,
+            # TODO: Delete the following three attributes when
+            # supporting pre-20140418 API servers is no longer
+            # important. New API servers take these as flags that
+            # control behavior of create, rather than job attributes.
+            :minimum_script_version => c[:minimum_script_version],
+            :exclude_script_versions => c[:exclude_minimum_script_versions],
+            :no_reuse => @options[:no_reuse] || c[:nondeterministic],
+          }, {
+            # This is the right place to put these attributes when
+            # dealing with new API servers.
+            :minimum_script_version => c[:minimum_script_version],
+            :exclude_script_versions => c[:exclude_minimum_script_versions],
+            :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]),
+          })
           if job
             debuglog "component #{cname} new job #{job[:uuid]}"
             c[:job] = job
@@ -449,7 +470,7 @@ class WhRunPipelineInstance
           if (c[:job][:running] or
               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
             # Job is running so update copy of job record
-            c[:job] = JobCache.get(c[:job][:uuid])            
+            c[:job] = JobCache.get(c[:job][:uuid])
           end
 
           if c[:job][:success]
@@ -464,6 +485,43 @@ class WhRunPipelineInstance
                 end
               end
             end
+            unless c_already_finished
+              # This is my first time discovering that the job
+              # succeeded. (At the top of this loop, I was still
+              # waiting for it to finish.)
+              if c[:output_is_persistent]
+                # I need to make sure a resources/wants link is in
+                # place to protect the output from garbage
+                # collection. (Normally Crunch does this for me, but
+                # here I might be reusing the output of someone else's
+                # job and I need to make sure it's understood that the
+                # output is valuable to me, too.)
+                wanted = c[:job][:output]
+                debuglog "checking for existing persistence link for #{wanted}"
+                @my_user_uuid ||= $arv.user.current[:uuid]
+                links = $arv.link.list(limit: 1,
+                                       filters:
+                                       [%w(link_class = resources),
+                                        %w(name = wants),
+                                        %w(tail_uuid =) + [@my_user_uuid],
+                                        %w(head_uuid =) + [wanted]
+                                       ])[:items]
+                if links.any?
+                  debuglog "link already exists, uuid #{links.first[:uuid]}"
+                else
+                  newlink = $arv.link.create link: \
+                  {
+                    link_class: 'resources',
+                    name: 'wants',
+                    tail_kind: 'arvados#user',
+                    tail_uuid: @my_user_uuid,
+                    head_kind: 'arvados#collection',
+                    head_uuid: wanted
+                  }
+                  debuglog "added link, uuid #{newlink[:uuid]}"
+                end
+              end
+            end
           elsif c[:job][:running] ||
               (!c[:job][:started_at] && !c[:job][:cancelled_at])
             # Job is still running
@@ -500,14 +558,13 @@ class WhRunPipelineInstance
           ended += 1
           if c[:job][:success] == true
             succeeded += 1
-          end
-          if c[:job][:success] == false
+          elsif c[:job][:success] == false
             failed += 1
           end
         end
       end
     end
-    
+
     if ended == @components.length or failed > 0
       @instance[:active] = false
       @instance[:success] = (succeeded == @components.length)