Merge branch '10181-incremental-log'
[arvados.git] / services / api / app / models / pipeline_instance.rb
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 class PipelineInstance < ArvadosModel
6   include HasUuid
7   include KindAndEtag
8   include CommonApiTemplate
9   serialize :components, Hash
10   serialize :properties, Hash
11   serialize :components_summary, Hash
12   belongs_to :pipeline_template, :foreign_key => :pipeline_template_uuid, :primary_key => :uuid
13
14   before_validation :bootstrap_components
15   before_validation :update_state
16   before_validation :verify_status
17   before_validation :update_timestamps_when_state_changes
18   before_create :set_state_before_save
19   before_save :set_state_before_save
20
21   api_accessible :user, extend: :common do |t|
22     t.add :pipeline_template_uuid
23     t.add :name
24     t.add :components
25     t.add :properties
26     t.add :state
27     t.add :components_summary
28     t.add :description
29     t.add :started_at
30     t.add :finished_at
31   end
32
33   # Supported states for a pipeline instance
34   States =
35     [
36      (New = 'New'),
37      (Ready = 'Ready'),
38      (RunningOnServer = 'RunningOnServer'),
39      (RunningOnClient = 'RunningOnClient'),
40      (Paused = 'Paused'),
41      (Failed = 'Failed'),
42      (Complete = 'Complete'),
43     ]
44
45   def self.limit_index_columns_read
46     ["components"]
47   end
48
49   # if all components have input, the pipeline is Ready
50   def components_look_ready?
51     if !self.components || self.components.empty?
52       return false
53     end
54
55     all_components_have_input = true
56     self.components.each do |name, component|
57       component['script_parameters'].andand.each do |parametername, parameter|
58         parameter = { 'value' => parameter } unless parameter.is_a? Hash
59         if parameter['value'].nil? and parameter['required']
60           if parameter['output_of']
61             next
62           end
63           all_components_have_input = false
64           break
65         end
66       end
67     end
68     return all_components_have_input
69   end
70
71   def progress_table
72     begin
73       # v0 pipeline format
74       nrow = -1
75       components['steps'].collect do |step|
76         nrow += 1
77         row = [nrow, step['name']]
78         if step['complete'] and step['complete'] != 0
79           if step['output_data_locator']
80             row << 1.0
81           else
82             row << 0.0
83           end
84         else
85           row << 0.0
86           if step['failed']
87             self.state = Failed
88           end
89         end
90         row << (step['warehousejob']['id'] rescue nil)
91         row << (step['warehousejob']['revision'] rescue nil)
92         row << step['output_data_locator']
93         row << (Time.parse(step['warehousejob']['finishtime']) rescue nil)
94         row
95       end
96     rescue
97       []
98     end
99   end
100
101   def progress_ratio
102     t = progress_table
103     return 0 if t.size < 1
104     t.collect { |r| r[2] }.inject(0.0) { |sum,a| sum += a } / t.size
105   end
106
107   def self.queue
108     self.where("state = 'RunningOnServer'")
109   end
110
111   def cancel(cascade: false, need_transaction: true)
112     if need_transaction
113       ActiveRecord::Base.transaction do
114         cancel(cascade: cascade, need_transaction: false)
115       end
116       return
117     end
118
119     if self.state.in?([RunningOnServer, RunningOnClient])
120       self.state = Paused
121       self.save!
122     elsif self.state != Paused
123       raise InvalidStateTransitionError
124     end
125
126     return if !cascade
127
128     # cancel all child jobs
129     children = self.components.andand.collect{|_, c| c['job']}.compact.collect{|j| j['uuid']}.compact
130
131     return if children.empty?
132
133     Job.where(uuid: children, state: [Job::Queued, Job::Running]).each do |job|
134       job.cancel(cascade: cascade, need_transaction: false)
135     end
136   end
137
138   protected
139   def bootstrap_components
140     if pipeline_template and (!components or components.empty?)
141       self.components = pipeline_template.components.deep_dup
142     end
143   end
144
145   def update_state
146     if components and progress_ratio == 1.0
147       self.state = Complete
148     end
149   end
150
151   def verify_status
152     changed_attributes = self.changed
153
154     if new_record? or 'components'.in? changed_attributes
155       self.state ||= New
156       if (self.state == New) and self.components_look_ready?
157         self.state = Ready
158       end
159     end
160
161     if self.state.in?(States)
162       true
163     else
164       errors.add :state, "'#{state.inspect} must be one of: [#{States.join ', '}]"
165       false
166     end
167   end
168
169   def set_state_before_save
170     if self.components_look_ready? && (!self.state || self.state == New)
171       self.state = Ready
172     end
173   end
174
175   def update_timestamps_when_state_changes
176     return if not (state_changed? or new_record?)
177
178     case state
179     when RunningOnServer, RunningOnClient
180       self.started_at ||= db_current_time
181     when Failed, Complete
182       current_time = db_current_time
183       self.started_at ||= current_time
184       self.finished_at ||= current_time
185     end
186   end
187
188 end