+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
class PipelineInstance < ArvadosModel
include HasUuid
include KindAndEtag
before_validation :bootstrap_components
before_validation :update_state
before_validation :verify_status
+ before_validation :update_timestamps_when_state_changes
before_create :set_state_before_save
before_save :set_state_before_save
api_accessible :user, extend: :common do |t|
t.add :pipeline_template_uuid
- t.add :pipeline_template, :if => :pipeline_template
t.add :name
t.add :components
- t.add :dependencies
t.add :properties
t.add :state
t.add :components_summary
+ t.add :description
t.add :started_at
t.add :finished_at
end
(Complete = 'Complete'),
]
- def dependencies
- dependency_search(self.components).keys
+ def self.limit_index_columns_read
+ ["components"]
end
# if all components have input, the pipeline is Ready
self.where("state = 'RunningOnServer'")
end
+ def cancel(cascade: false, need_transaction: true)
+ if need_transaction
+ ActiveRecord::Base.transaction do
+ cancel(cascade: cascade, need_transaction: false)
+ end
+ return
+ end
+
+ if self.state.in?([RunningOnServer, RunningOnClient])
+ self.state = Paused
+ self.save!
+ elsif self.state != Paused
+ raise InvalidStateTransitionError
+ end
+
+ return if !cascade
+
+ # cancel all child jobs
+ children = self.components.andand.collect{|_, c| c['job']}.compact.collect{|j| j['uuid']}.compact
+
+ return if children.empty?
+
+ Job.where(uuid: children, state: [Job::Queued, Job::Running]).each do |job|
+ job.cancel(cascade: cascade, need_transaction: false)
+ end
+ end
+
protected
def bootstrap_components
if pipeline_template and (!components or components.empty?)
end
end
- def dependency_search(haystack)
- if haystack.is_a? String
- if (re = haystack.match /^([0-9a-f]{32}(\+[^,]+)*)+/)
- {re[1] => true}
- else
- {}
- end
- elsif haystack.is_a? Array
- deps = {}
- haystack.each do |value|
- deps.merge! dependency_search(value)
- end
- deps
- elsif haystack.respond_to? :keys
- deps = {}
- haystack.each do |key, value|
- deps.merge! dependency_search(value)
- end
- deps
- else
- {}
- end
- end
-
def verify_status
changed_attributes = self.changed
end
end
+ def update_timestamps_when_state_changes
+ return if not (state_changed? or new_record?)
+
+ case state
+ when RunningOnServer, RunningOnClient
+ self.started_at ||= db_current_time
+ when Failed, Complete
+ current_time = db_current_time
+ self.started_at ||= current_time
+ self.finished_at ||= current_time
+ end
+ end
+
end