11167: Merge branch 'master' into 11167-wb-remove-arvget
[arvados.git] / services / api / app / models / pipeline_instance.rb
index fbe36906373c5bbfd93b9e65cf001ab33ca5f8de..55efa0ae85b4e79d8a88d2d5d99ead15abb6b5b7 100644 (file)
@@ -1,5 +1,9 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 class PipelineInstance < ArvadosModel
-  include AssignUuid
+  include HasUuid
   include KindAndEtag
   include CommonApiTemplate
   serialize :components, Hash
@@ -8,35 +12,38 @@ class PipelineInstance < ArvadosModel
   belongs_to :pipeline_template, :foreign_key => :pipeline_template_uuid, :primary_key => :uuid
 
   before_validation :bootstrap_components
-  before_validation :update_success
+  before_validation :update_state
   before_validation :verify_status
+  before_validation :update_timestamps_when_state_changes
   before_create :set_state_before_save
   before_save :set_state_before_save
 
   api_accessible :user, extend: :common do |t|
     t.add :pipeline_template_uuid
-    t.add :pipeline_template, :if => :pipeline_template
     t.add :name
     t.add :components
-    t.add :success
-    t.add :active
-    t.add :dependencies
     t.add :properties
     t.add :state
     t.add :components_summary
+    t.add :description
+    t.add :started_at
+    t.add :finished_at
   end
 
   # Supported states for a pipeline instance
-  New = 'New'
-  Ready = 'Ready'
-  RunningOnServer = 'RunningOnServer'
-  RunningOnClient = 'RunningOnClient'
-  Paused = 'Paused'
-  Failed = 'Failed'
-  Complete = 'Complete'
-
-  def dependencies
-    dependency_search(self.components).keys
+  States =
+    [
+     (New = 'New'),
+     (Ready = 'Ready'),
+     (RunningOnServer = 'RunningOnServer'),
+     (RunningOnClient = 'RunningOnClient'),
+     (Paused = 'Paused'),
+     (Failed = 'Failed'),
+     (Complete = 'Complete'),
+    ]
+
+  def self.limit_index_columns_read
+    ["components"]
   end
 
   # if all components have input, the pipeline is Ready
@@ -47,7 +54,7 @@ class PipelineInstance < ArvadosModel
 
     all_components_have_input = true
     self.components.each do |name, component|
-      component['script_parameters'].each do |parametername, parameter|
+      component['script_parameters'].andand.each do |parametername, parameter|
         parameter = { 'value' => parameter } unless parameter.is_a? Hash
         if parameter['value'].nil? and parameter['required']
           if parameter['output_of']
@@ -77,7 +84,7 @@ class PipelineInstance < ArvadosModel
         else
           row << 0.0
           if step['failed']
-            self.success = false
+            self.state = Failed
           end
         end
         row << (step['warehousejob']['id'] rescue nil)
@@ -98,7 +105,34 @@ class PipelineInstance < ArvadosModel
   end
 
   def self.queue
-    self.where('active = true')
+    self.where("state = 'RunningOnServer'")
+  end
+
+  def cancel(cascade: false, need_transaction: true)
+    if need_transaction
+      ActiveRecord::Base.transaction do
+        cancel(cascade: cascade, need_transaction: false)
+      end
+      return
+    end
+
+    if self.state.in?([RunningOnServer, RunningOnClient])
+      self.state = Paused
+      self.save!
+    elsif self.state != Paused
+      raise InvalidStateTransitionError
+    end
+
+    return if !cascade
+
+    # cancel all child jobs
+    children = self.components.andand.collect{|_, c| c['job']}.compact.collect{|j| j['uuid']}.compact
+
+    return if children.empty?
+
+    Job.where(uuid: children, state: [Job::Queued, Job::Running]).each do |job|
+      job.cancel(cascade: cascade, need_transaction: false)
+    end
   end
 
   protected
@@ -108,96 +142,46 @@ class PipelineInstance < ArvadosModel
     end
   end
 
-  def update_success
+  def update_state
     if components and progress_ratio == 1.0
-      self.success = true
+      self.state = Complete
     end
   end
 
-  def dependency_search(haystack)
-    if haystack.is_a? String
-      if (re = haystack.match /^([0-9a-f]{32}(\+[^,]+)*)+/)
-        {re[1] => true}
-      else
-        {}
-      end
-    elsif haystack.is_a? Array
-      deps = {}
-      haystack.each do |value|
-        deps.merge! dependency_search(value)
-      end
-      deps
-    elsif haystack.respond_to? :keys
-      deps = {}
-      haystack.each do |key, value|
-        deps.merge! dependency_search(value)
+  def verify_status
+    changed_attributes = self.changed
+
+    if new_record? or 'components'.in? changed_attributes
+      self.state ||= New
+      if (self.state == New) and self.components_look_ready?
+        self.state = Ready
       end
-      deps
+    end
+
+    if self.state.in?(States)
+      true
     else
-      {}
+      errors.add :state, "'#{state.inspect} must be one of: [#{States.join ', '}]"
+      false
     end
   end
 
-  def verify_status
-    if active_changed?
-      if self.active
-        self.state = RunningOnServer
-      else
-        if self.components_look_ready?
-          self.state = Ready
-        else
-          self.state = New
-        end
-      end
-    elsif success_changed?
-      if self.success
-        self.active = false
-        self.state = Complete
-      else
-        self.active = false
-        self.state = Failed
-      end
-    elsif state_changed?
-      case self.state
-      when New, Ready, Paused
-        self.active = false
-        self.success = nil
-      when RunningOnServer
-        self.active = true
-        self.success = nil
-      when RunningOnClient
-        self.active = false
-        self.success = nil
-      when Failed
-        self.active = false
-        self.success = false
-        self.state = Failed   # before_validation will fail if false is returned in the previous line
-      when Complete
-        self.active = false
-        self.success = true
-      else
-        return false
-      end
-    elsif components_changed? 
-      if !self.state || self.state == New || !self.active
-        if self.components_look_ready?
-          self.state = Ready
-        else
-          self.state = New
-        end
-      end
+  def set_state_before_save
+    if self.components_look_ready? && (!self.state || self.state == New)
+      self.state = Ready
     end
   end
 
-  def set_state_before_save
-    if !self.state || self.state == New
-      if self.active
-        self.state = RunningOnServer
-      elsif self.components_look_ready?
-        self.state = Ready
-      else
-        self.state = New
-      end
+  def update_timestamps_when_state_changes
+    return if not (state_changed? or new_record?)
+
+    case state
+    when RunningOnServer, RunningOnClient
+      self.started_at ||= db_current_time
+    when Failed, Complete
+      current_time = db_current_time
+      self.started_at ||= current_time
+      self.finished_at ||= current_time
     end
   end