8784: Fix test for latest firefox.
[arvados.git] / services / api / app / models / pipeline_instance.rb
1 class PipelineInstance < ArvadosModel
2   include HasUuid
3   include KindAndEtag
4   include CommonApiTemplate
5   serialize :components, Hash
6   serialize :properties, Hash
7   serialize :components_summary, Hash
8   belongs_to :pipeline_template, :foreign_key => :pipeline_template_uuid, :primary_key => :uuid
9
10   before_validation :bootstrap_components
11   before_validation :update_state
12   before_validation :verify_status
13   before_validation :update_timestamps_when_state_changes
14   before_create :set_state_before_save
15   before_save :set_state_before_save
16
17   api_accessible :user, extend: :common do |t|
18     t.add :pipeline_template_uuid
19     t.add :name
20     t.add :components
21     t.add :properties
22     t.add :state
23     t.add :components_summary
24     t.add :description
25     t.add :started_at
26     t.add :finished_at
27   end
28
29   # Supported states for a pipeline instance
30   States =
31     [
32      (New = 'New'),
33      (Ready = 'Ready'),
34      (RunningOnServer = 'RunningOnServer'),
35      (RunningOnClient = 'RunningOnClient'),
36      (Paused = 'Paused'),
37      (Failed = 'Failed'),
38      (Complete = 'Complete'),
39     ]
40
41   def self.limit_index_columns_read
42     ["components"]
43   end
44
45   # if all components have input, the pipeline is Ready
46   def components_look_ready?
47     if !self.components || self.components.empty?
48       return false
49     end
50
51     all_components_have_input = true
52     self.components.each do |name, component|
53       component['script_parameters'].andand.each do |parametername, parameter|
54         parameter = { 'value' => parameter } unless parameter.is_a? Hash
55         if parameter['value'].nil? and parameter['required']
56           if parameter['output_of']
57             next
58           end
59           all_components_have_input = false
60           break
61         end
62       end
63     end
64     return all_components_have_input
65   end
66
67   def progress_table
68     begin
69       # v0 pipeline format
70       nrow = -1
71       components['steps'].collect do |step|
72         nrow += 1
73         row = [nrow, step['name']]
74         if step['complete'] and step['complete'] != 0
75           if step['output_data_locator']
76             row << 1.0
77           else
78             row << 0.0
79           end
80         else
81           row << 0.0
82           if step['failed']
83             self.state = Failed
84           end
85         end
86         row << (step['warehousejob']['id'] rescue nil)
87         row << (step['warehousejob']['revision'] rescue nil)
88         row << step['output_data_locator']
89         row << (Time.parse(step['warehousejob']['finishtime']) rescue nil)
90         row
91       end
92     rescue
93       []
94     end
95   end
96
97   def progress_ratio
98     t = progress_table
99     return 0 if t.size < 1
100     t.collect { |r| r[2] }.inject(0.0) { |sum,a| sum += a } / t.size
101   end
102
103   def self.queue
104     self.where("state = 'RunningOnServer'")
105   end
106
107   def cancel(cascade: false, need_transaction: true)
108     if need_transaction
109       ActiveRecord::Base.transaction do
110         cancel(cascade: cascade, need_transaction: false)
111       end
112       return
113     end
114
115     if self.state.in?([RunningOnServer, RunningOnClient])
116       self.state = Paused
117       self.save!
118     elsif self.state != Paused
119       raise InvalidStateTransitionError
120     end
121
122     return if !cascade
123
124     # cancel all child jobs
125     children = self.components.andand.collect{|_, c| c['job']}.compact.collect{|j| j['uuid']}.compact
126
127     return if children.empty?
128
129     Job.where(uuid: children, state: [Job::Queued, Job::Running]).each do |job|
130       job.cancel(cascade: cascade, need_transaction: false)
131     end
132   end
133
134   protected
135   def bootstrap_components
136     if pipeline_template and (!components or components.empty?)
137       self.components = pipeline_template.components.deep_dup
138     end
139   end
140
141   def update_state
142     if components and progress_ratio == 1.0
143       self.state = Complete
144     end
145   end
146
147   def verify_status
148     changed_attributes = self.changed
149
150     if new_record? or 'components'.in? changed_attributes
151       self.state ||= New
152       if (self.state == New) and self.components_look_ready?
153         self.state = Ready
154       end
155     end
156
157     if self.state.in?(States)
158       true
159     else
160       errors.add :state, "'#{state.inspect} must be one of: [#{States.join ', '}]"
161       false
162     end
163   end
164
165   def set_state_before_save
166     if self.components_look_ready? && (!self.state || self.state == New)
167       self.state = Ready
168     end
169   end
170
171   def update_timestamps_when_state_changes
172     return if not (state_changed? or new_record?)
173
174     case state
175     when RunningOnServer, RunningOnClient
176       self.started_at ||= db_current_time
177     when Failed, Complete
178       current_time = db_current_time
179       self.started_at ||= current_time
180       self.finished_at ||= current_time
181     end
182   end
183
184 end