Merge branch '2488-jobs-pipeline-doc'
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index 9f1663d1a32656c688bcd057dedb9a074341912e..7ddea30cf76ce1ebd27c9083e6d6ecef1a1776a8 100755 (executable)
@@ -74,6 +74,7 @@ $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
 
 begin
+  require 'arvados'
   require 'rubygems'
   require 'json'
   require 'pp'
@@ -84,7 +85,7 @@ rescue LoadError => l
   abort <<-EOS
 #{$0}: fatal: #{l.message}
 Some runtime dependencies may be missing.
-Try: gem install pp google-api-client json trollop
+Try: gem install arvados pp google-api-client json trollop
   EOS
 end
 
@@ -216,6 +217,7 @@ $client ||= Google::APIClient.
       :application_name => File.split($0).last,
       :application_version => $application_version.to_s)
 $arvados = $client.discovered_api('arvados', $arvados_api_version)
+$arv = Arvados.new api_version: 'v1'
 
 
 class PipelineInstance
@@ -424,7 +426,12 @@ class WhRunPipelineInstance
       moretodo = false
       @components.each do |cname, c|
         job = nil
-
+        # Is the job satisfying this component already known to be
+        # finished? (Already meaning "before we query API server about
+        # the job's current state")
+        c_already_finished = (c[:job] &&
+                              c[:job][:uuid] &&
+                              !c[:job][:success].nil?)
         if !c[:job] and
             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
           # No job yet associated with this component and is component inputs
@@ -433,10 +440,12 @@ class WhRunPipelineInstance
           job = JobCache.create({:script => c[:script],
                             :script_parameters => c[:script_parameters],
                             :script_version => c[:script_version],
+                            :repository => c[:repository],
                             :minimum_script_version => c[:minimum_script_version],
                             :exclude_script_versions => c[:exclude_minimum_script_versions],
                             :nondeterministic => c[:nondeterministic],
-                            :no_reuse => @options[:no_reuse]})
+                            :no_reuse => @options[:no_reuse],
+                            :output_is_persistent => c[:output_is_persistent] || false})
           if job
             debuglog "component #{cname} new job #{job[:uuid]}"
             c[:job] = job
@@ -449,7 +458,7 @@ class WhRunPipelineInstance
           if (c[:job][:running] or
               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
             # Job is running so update copy of job record
-            c[:job] = JobCache.get(c[:job][:uuid])            
+            c[:job] = JobCache.get(c[:job][:uuid])
           end
 
           if c[:job][:success]
@@ -464,6 +473,43 @@ class WhRunPipelineInstance
                 end
               end
             end
+            unless c_already_finished
+              # This is my first time discovering that the job
+              # succeeded. (At the top of this loop, I was still
+              # waiting for it to finish.)
+              if c[:output_is_persistent]
+                # I need to make sure a resources/wants link is in
+                # place to protect the output from garbage
+                # collection. (Normally Crunch does this for me, but
+                # here I might be reusing the output of someone else's
+                # job and I need to make sure it's understood that the
+                # output is valuable to me, too.)
+                wanted = c[:job][:output]
+                debuglog "checking for existing persistence link for #{wanted}"
+                @my_user_uuid ||= $arv.user.current[:uuid]
+                links = $arv.link.list(limit: 1,
+                                       filters:
+                                       [%w(link_class = resources),
+                                        %w(name = wants),
+                                        %w(tail_uuid =) + [@my_user_uuid],
+                                        %w(head_uuid =) + [wanted]
+                                       ])[:items]
+                if links.any?
+                  debuglog "link already exists, uuid #{links.first[:uuid]}"
+                else
+                  newlink = $arv.link.create link: \
+                  {
+                    link_class: 'resources',
+                    name: 'wants',
+                    tail_kind: 'arvados#user',
+                    tail_uuid: @my_user_uuid,
+                    head_kind: 'arvados#collection',
+                    head_uuid: wanted
+                  }
+                  debuglog "added link, uuid #{newlink[:uuid]}"
+                end
+              end
+            end
           elsif c[:job][:running] ||
               (!c[:job][:started_at] && !c[:job][:cancelled_at])
             # Job is still running
@@ -506,7 +552,7 @@ class WhRunPipelineInstance
         end
       end
     end
-    
+
     if ended == @components.length or failed > 0
       @instance[:active] = false
       @instance[:success] = (succeeded == @components.length)