+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
class PipelineInstance < ArvadosModel
- include AssignUuid
+ include HasUuid
include KindAndEtag
include CommonApiTemplate
serialize :components, Hash
belongs_to :pipeline_template, :foreign_key => :pipeline_template_uuid, :primary_key => :uuid
before_validation :bootstrap_components
- before_validation :update_success
- before_create :verify_status
- before_save :verify_status
+ before_validation :update_state
+ before_validation :verify_status
+ before_validation :update_timestamps_when_state_changes
+ before_create :set_state_before_save
+ before_save :set_state_before_save
api_accessible :user, extend: :common do |t|
t.add :pipeline_template_uuid
- t.add :pipeline_template, :if => :pipeline_template
t.add :name
t.add :components
- t.add :success
- t.add :active
- t.add :dependencies
t.add :properties
t.add :state
t.add :components_summary
+ t.add :description
+ t.add :started_at
+ t.add :finished_at
end
# Supported states for a pipeline instance
- New = 'New'
- Ready = 'Ready'
- RunningOnServer = 'RunningOnServer'
- RunningOnClient = 'RunningOnClient'
- Paused = 'Paused'
- Failed = 'Failed'
- Complete = 'Complete'
-
- def dependencies
- dependency_search(self.components).keys
+ States =
+ [
+ (New = 'New'),
+ (Ready = 'Ready'),
+ (RunningOnServer = 'RunningOnServer'),
+ (RunningOnClient = 'RunningOnClient'),
+ (Paused = 'Paused'),
+ (Failed = 'Failed'),
+ (Complete = 'Complete'),
+ ]
+
+ def self.limit_index_columns_read
+ ["components"]
end
# if all components have input, the pipeline is Ready
- def self.is_ready components
- if !components || components.empty? # is this correct?
- return true
+ def components_look_ready?
+ if !self.components || self.components.empty?
+ return false
end
all_components_have_input = true
- components.each do |name, component|
- component['script_parameters'].each do |parametername, parameter|
+ self.components.each do |name, component|
+ component['script_parameters'].andand.each do |parametername, parameter|
parameter = { 'value' => parameter } unless parameter.is_a? Hash
- if parameter['value'].nil? and
- ![false,'false',0,'0'].index parameter['required']
+ if parameter['value'].nil? and parameter['required']
if parameter['output_of']
next
end
else
row << 0.0
if step['failed']
- self.success = false
+ self.state = Failed
end
end
row << (step['warehousejob']['id'] rescue nil)
end
def self.queue
- self.where('active = true')
+ self.where("state = 'RunningOnServer'")
+ end
+
+ def cancel(cascade: false, need_transaction: true)
+ if need_transaction
+ ActiveRecord::Base.transaction do
+ cancel(cascade: cascade, need_transaction: false)
+ end
+ return
+ end
+
+ if self.state.in?([RunningOnServer, RunningOnClient])
+ self.state = Paused
+ self.save!
+ elsif self.state != Paused
+ raise InvalidStateTransitionError
+ end
+
+ return if !cascade
+
+ # cancel all child jobs
+ children = self.components.andand.collect{|_, c| c['job']}.compact.collect{|j| j['uuid']}.compact
+
+ return if children.empty?
+
+ Job.where(uuid: children, state: [Job::Queued, Job::Running]).each do |job|
+ job.cancel(cascade: cascade, need_transaction: false)
+ end
end
protected
end
end
- def update_success
+ def update_state
if components and progress_ratio == 1.0
- self.success = true
+ self.state = Complete
end
end
- def dependency_search(haystack)
- if haystack.is_a? String
- if (re = haystack.match /^([0-9a-f]{32}(\+[^,]+)*)+/)
- {re[1] => true}
- else
- {}
- end
- elsif haystack.is_a? Array
- deps = {}
- haystack.each do |value|
- deps.merge! dependency_search(value)
- end
- deps
- elsif haystack.respond_to? :keys
- deps = {}
- haystack.each do |key, value|
- deps.merge! dependency_search(value)
+ def verify_status
+ changed_attributes = self.changed
+
+ if new_record? or 'components'.in? changed_attributes
+ self.state ||= New
+ if (self.state == New) and self.components_look_ready?
+ self.state = Ready
end
- deps
+ end
+
+ if self.state.in?(States)
+ true
else
- {}
+ errors.add :state, "'#{state.inspect} must be one of: [#{States.join ', '}]"
+ false
end
end
- def verify_status
- if active_changed?
- if self.active
- self.state = RunningOnServer
- else
- if PipelineInstance.is_ready self.components
- self.state = Ready
- else
- self.state = New
- end
- end
- elsif success_changed?
- if self.success
- self.active = false
- self.state = Complete
- else
- self.active = false
- self.state = Failed
- end
- elsif state_changed?
- case self.state
- when New, Ready
- self.active = false
- self.success = nil
- when RunningOnServer
- self.active = true
- self.success = nil
- when RunningOnClient
- self.active = false
- self.success = nil
- when Failed
- self.active = false
- self.success = false
- when Complete
- self.active = false
- self.success = true
- end
- else # new object create or save
- if !self.state || self.state == New || !self.active
- if PipelineInstance.is_ready self.components
- self.state = Ready
- else
- self.state = New
- end
- end
+ def set_state_before_save
+ if self.components_look_ready? && (!self.state || self.state == New)
+ self.state = Ready
+ end
+ end
+
+ def update_timestamps_when_state_changes
+ return if not (state_changed? or new_record?)
+
+ case state
+ when RunningOnServer, RunningOnClient
+ self.started_at ||= db_current_time
+ when Failed, Complete
+ current_time = db_current_time
+ self.started_at ||= current_time
+ self.finished_at ||= current_time
end
end