X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/288ea0d2e35bf94d8a9f0bc609bac1467c1c5b9e..9997ada67ce36d2fbe831bce473aa61250727aff:/services/api/app/models/pipeline_instance.rb diff --git a/services/api/app/models/pipeline_instance.rb b/services/api/app/models/pipeline_instance.rb index ca4b69c62a..55efa0ae85 100644 --- a/services/api/app/models/pipeline_instance.rb +++ b/services/api/app/models/pipeline_instance.rb @@ -1,5 +1,9 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 + class PipelineInstance < ArvadosModel - include AssignUuid + include HasUuid include KindAndEtag include CommonApiTemplate serialize :components, Hash @@ -8,35 +12,38 @@ class PipelineInstance < ArvadosModel belongs_to :pipeline_template, :foreign_key => :pipeline_template_uuid, :primary_key => :uuid before_validation :bootstrap_components - before_validation :update_success + before_validation :update_state before_validation :verify_status + before_validation :update_timestamps_when_state_changes before_create :set_state_before_save before_save :set_state_before_save api_accessible :user, extend: :common do |t| t.add :pipeline_template_uuid - t.add :pipeline_template, :if => :pipeline_template t.add :name t.add :components - t.add :success - t.add :active - t.add :dependencies t.add :properties t.add :state t.add :components_summary + t.add :description + t.add :started_at + t.add :finished_at end # Supported states for a pipeline instance - New = 'New' - Ready = 'Ready' - RunningOnServer = 'RunningOnServer' - RunningOnClient = 'RunningOnClient' - Paused = 'Paused' - Failed = 'Failed' - Complete = 'Complete' - - def dependencies - dependency_search(self.components).keys + States = + [ + (New = 'New'), + (Ready = 'Ready'), + (RunningOnServer = 'RunningOnServer'), + (RunningOnClient = 'RunningOnClient'), + (Paused = 'Paused'), + (Failed = 'Failed'), + (Complete = 'Complete'), + ] + + def self.limit_index_columns_read + ["components"] end # if all components have input, the pipeline is Ready @@ -77,7 +84,7 @@ class PipelineInstance < ArvadosModel else row << 0.0 if step['failed'] - self.success = false + self.state = Failed end end row << (step['warehousejob']['id'] rescue nil) @@ -98,7 +105,34 @@ class PipelineInstance < ArvadosModel end def self.queue - self.where('active = true') + self.where("state = 'RunningOnServer'") + end + + def cancel(cascade: false, need_transaction: true) + if need_transaction + ActiveRecord::Base.transaction do + cancel(cascade: cascade, need_transaction: false) + end + return + end + + if self.state.in?([RunningOnServer, RunningOnClient]) + self.state = Paused + self.save! + elsif self.state != Paused + raise InvalidStateTransitionError + end + + return if !cascade + + # cancel all child jobs + children = self.components.andand.collect{|_, c| c['job']}.compact.collect{|j| j['uuid']}.compact + + return if children.empty? + + Job.where(uuid: children, state: [Job::Queued, Job::Running]).each do |job| + job.cancel(cascade: cascade, need_transaction: false) + end end protected @@ -108,96 +142,46 @@ class PipelineInstance < ArvadosModel end end - def update_success + def update_state if components and progress_ratio == 1.0 - self.success = true + self.state = Complete end end - def dependency_search(haystack) - if haystack.is_a? String - if (re = haystack.match /^([0-9a-f]{32}(\+[^,]+)*)+/) - {re[1] => true} - else - {} - end - elsif haystack.is_a? Array - deps = {} - haystack.each do |value| - deps.merge! dependency_search(value) - end - deps - elsif haystack.respond_to? :keys - deps = {} - haystack.each do |key, value| - deps.merge! dependency_search(value) + def verify_status + changed_attributes = self.changed + + if new_record? or 'components'.in? changed_attributes + self.state ||= New + if (self.state == New) and self.components_look_ready? + self.state = Ready end - deps + end + + if self.state.in?(States) + true else - {} + errors.add :state, "'#{state.inspect} must be one of: [#{States.join ', '}]" + false end end - def verify_status - if active_changed? - if self.active - self.state = RunningOnServer - else - if self.components_look_ready? - self.state = Ready - else - self.state = New - end - end - elsif success_changed? - if self.success - self.active = false - self.state = Complete - else - self.active = false - self.state = Failed - end - elsif state_changed? - case self.state - when New, Ready, Paused - self.active = false - self.success = nil - when RunningOnServer - self.active = true - self.success = nil - when RunningOnClient - self.active = false - self.success = nil - when Failed - self.active = false - self.success = false - self.state = Failed # before_validation will fail if false is returned in the previous line - when Complete - self.active = false - self.success = true - else - return false - end - elsif components_changed? - if !self.state || self.state == New || !self.active - if self.components_look_ready? - self.state = Ready - else - self.state = New - end - end + def set_state_before_save + if self.components_look_ready? && (!self.state || self.state == New) + self.state = Ready end end - def set_state_before_save - if !self.state || self.state == New - if self.active - self.state = RunningOnServer - elsif self.components_look_ready? - self.state = Ready - else - self.state = New - end + def update_timestamps_when_state_changes + return if not (state_changed? or new_record?) + + case state + when RunningOnServer, RunningOnClient + self.started_at ||= db_current_time + when Failed, Complete + current_time = db_current_time + self.started_at ||= current_time + self.finished_at ||= current_time end end