Merge branch 'master' into 2681-new-inactive-user-notification
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index 3e8bdbfcaef6030afaa8e33b7751c0443c8f88d4..fc636dff507213ac09bdfc2a9c90c4d7fd62a126 100755 (executable)
@@ -226,10 +226,10 @@ class PipelineInstance
                              :parameters => {
                                :uuid => uuid
                              },
-                             :body => {
-                               :api_token => ENV['ARVADOS_API_TOKEN']
-                             },
-                             :authenticated => false)
+                             :authenticated => false,
+                             :headers => {
+                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                             })
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
@@ -242,10 +242,12 @@ class PipelineInstance
   def self.create(attributes)
     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
                              :body => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
                                :pipeline_instance => attributes
                              },
-                             :authenticated => false)
+                             :authenticated => false,
+                             :headers => {
+                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                             })
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
       abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
@@ -259,10 +261,12 @@ class PipelineInstance
                                :uuid => @pi[:uuid]
                              },
                              :body => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
                                :pipeline_instance => @attributes_to_update.to_json
                              },
-                             :authenticated => false)
+                             :authenticated => false,
+                             :headers => {
+                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                             })
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
@@ -291,20 +295,24 @@ class JobCache
     @cache ||= {}
     result = $client.execute(:api_method => $arvados.jobs.get,
                              :parameters => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
                                :uuid => uuid
                              },
-                             :authenticated => false)
+                             :authenticated => false,
+                             :headers => {
+                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                             })
     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
   end
   def self.where(conditions)
     result = $client.execute(:api_method => $arvados.jobs.list,
                              :parameters => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
                                :limit => 10000,
                                :where => conditions.to_json
                              },
-                             :authenticated => false)
+                             :authenticated => false,
+                             :headers => {
+                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                             })
     list = JSON.parse result.body, :symbolize_names => true
     if list and list[:items].is_a? Array
       list[:items]
@@ -315,11 +323,13 @@ class JobCache
   def self.create(job, create_params)
     @cache ||= {}
     result = $client.execute(:api_method => $arvados.jobs.create,
-                             :parameters => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
+                             :body => {
                                :job => job.to_json
                              }.merge(create_params),
-                             :authenticated => false)
+                             :authenticated => false,
+                             :headers => {
+                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                             })
     j = JSON.parse result.body, :symbolize_names => true
     if j.is_a? Hash and j[:uuid]
       @cache[j[:uuid]] = j
@@ -348,10 +358,12 @@ class WhRunPipelineInstance
     else
       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
                                :parameters => {
-                                 :api_token => ENV['ARVADOS_API_TOKEN'],
                                  :uuid => template
                                },
-                               :authenticated => false)
+                               :authenticated => false,
+                               :headers => {
+                                 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               })
       @template = JSON.parse result.body, :symbolize_names => true
       if !@template[:uuid]
         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
@@ -413,19 +425,29 @@ class WhRunPipelineInstance
   end
 
   def setup_instance
-    @instance ||= PipelineInstance.
-      create(:components => @components,
+    if $options[:submit]
+      @instance ||= PipelineInstance.
+        create(:components => @components,
+              :pipeline_template_uuid => @template[:uuid],
+              :state => 'New')
+    else
+      @instance ||= PipelineInstance.
+        create(:components => @components,
              :pipeline_template_uuid => @template[:uuid],
-             :active => true)
+             :state => 'RunningOnClient')
+    end
     self
   end
 
   def run
     moretodo = true
+    interrupted = false
+
     while moretodo
       moretodo = false
       @components.each do |cname, c|
         job = nil
+        owner_uuid = @instance[:owner_uuid]
         # Is the job satisfying this component already known to be
         # finished? (Already meaning "before we query API server about
         # the job's current state")
@@ -437,13 +459,14 @@ class WhRunPipelineInstance
           # No job yet associated with this component and is component inputs
           # are fully specified (any output_of script_parameters are resolved
           # to real value)
-          job = JobCache.create {
+          job = JobCache.create({
             :script => c[:script],
             :script_parameters => c[:script_parameters],
             :script_version => c[:script_version],
             :repository => c[:repository],
             :nondeterministic => c[:nondeterministic],
             :output_is_persistent => c[:output_is_persistent] || false,
+            :owner_uuid => owner_uuid,
             # TODO: Delete the following three attributes when
             # supporting pre-20140418 API servers is no longer
             # important. New API servers take these as flags that
@@ -457,7 +480,7 @@ class WhRunPipelineInstance
             :minimum_script_version => c[:minimum_script_version],
             :exclude_script_versions => c[:exclude_minimum_script_versions],
             :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]),
-          }
+          })
           if job
             debuglog "component #{cname} new job #{job[:uuid]}"
             c[:job] = job
@@ -516,7 +539,8 @@ class WhRunPipelineInstance
                     tail_kind: 'arvados#user',
                     tail_uuid: @my_user_uuid,
                     head_kind: 'arvados#collection',
-                    head_uuid: wanted
+                    head_uuid: wanted,
+                    owner_uuid: owner_uuid
                   }
                   debuglog "added link, uuid #{newlink[:uuid]}"
                 end
@@ -532,7 +556,6 @@ class WhRunPipelineInstance
         end
       end
       @instance[:components] = @components
-      @instance[:active] = moretodo
       report_status
 
       if @options[:no_wait]
@@ -544,7 +567,8 @@ class WhRunPipelineInstance
           sleep 10
         rescue Interrupt
           debuglog "interrupt", 0
-          abort
+          interrupted = true
+          break
         end
       end
     end
@@ -565,17 +589,30 @@ class WhRunPipelineInstance
       end
     end
 
-    if ended == @components.length or failed > 0
-      @instance[:active] = false
-      @instance[:success] = (succeeded == @components.length)
+    success = (succeeded == @components.length)
+
+    if interrupted
+     if success
+        @instance[:state] = 'Complete'
+     else
+        @instance[:state] = 'Paused'
+      end
+    else
+      if ended == @components.length or failed > 0
+        @instance[:state] = success ? 'Complete' : 'Failed'
+      end
     end
 
+    # set components_summary
+    components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
+    @instance[:components_summary] = components_summary
+
     @instance.save
   end
 
   def cleanup
-    if @instance
-      @instance[:active] = false
+    if @instance and @instance[:state] == 'RunningOnClient'
+      @instance[:state] = 'Paused'
       @instance.save
     end
   end