Merge remote-tracking branch 'refs/remotes/origin/2939-create-params' into origin...
[arvados.git] / services / api / app / models / pipeline_instance.rb
1 class PipelineInstance < ArvadosModel
2   include HasUuid
3   include KindAndEtag
4   include CommonApiTemplate
5   serialize :components, Hash
6   serialize :properties, Hash
7   serialize :components_summary, Hash
8   belongs_to :pipeline_template, :foreign_key => :pipeline_template_uuid, :primary_key => :uuid
9
10   before_validation :bootstrap_components
11   before_validation :update_success
12   before_validation :verify_status
13   before_create :set_state_before_save
14   before_save :set_state_before_save
15
16   api_accessible :user, extend: :common do |t|
17     t.add :pipeline_template_uuid
18     t.add :pipeline_template, :if => :pipeline_template
19     t.add :name
20     t.add :components
21     t.add :success
22     t.add :active
23     t.add :dependencies
24     t.add :properties
25     t.add :state
26     t.add :components_summary
27   end
28
29   # Supported states for a pipeline instance
30   States =
31     [
32      (New = 'New'),
33      (Ready = 'Ready'),
34      (RunningOnServer = 'RunningOnServer'),
35      (RunningOnClient = 'RunningOnClient'),
36      (Paused = 'Paused'),
37      (Failed = 'Failed'),
38      (Complete = 'Complete'),
39     ]
40
41   def dependencies
42     dependency_search(self.components).keys
43   end
44
45   # if all components have input, the pipeline is Ready
46   def components_look_ready?
47     if !self.components || self.components.empty?
48       return false
49     end
50
51     all_components_have_input = true
52     self.components.each do |name, component|
53       component['script_parameters'].andand.each do |parametername, parameter|
54         parameter = { 'value' => parameter } unless parameter.is_a? Hash
55         if parameter['value'].nil? and parameter['required']
56           if parameter['output_of']
57             next
58           end
59           all_components_have_input = false
60           break
61         end
62       end
63     end
64     return all_components_have_input
65   end
66
67   def progress_table
68     begin
69       # v0 pipeline format
70       nrow = -1
71       components['steps'].collect do |step|
72         nrow += 1
73         row = [nrow, step['name']]
74         if step['complete'] and step['complete'] != 0
75           if step['output_data_locator']
76             row << 1.0
77           else
78             row << 0.0
79           end
80         else
81           row << 0.0
82           if step['failed']
83             self.success = false
84           end
85         end
86         row << (step['warehousejob']['id'] rescue nil)
87         row << (step['warehousejob']['revision'] rescue nil)
88         row << step['output_data_locator']
89         row << (Time.parse(step['warehousejob']['finishtime']) rescue nil)
90         row
91       end
92     rescue
93       []
94     end
95   end
96
97   def progress_ratio
98     t = progress_table
99     return 0 if t.size < 1
100     t.collect { |r| r[2] }.inject(0.0) { |sum,a| sum += a } / t.size
101   end
102
103   def self.queue
104     self.where("state = 'RunningOnServer'")
105   end
106
107   protected
108   def bootstrap_components
109     if pipeline_template and (!components or components.empty?)
110       self.components = pipeline_template.components.deep_dup
111     end
112   end
113
114   def update_success
115     if components and progress_ratio == 1.0
116       self.success = true
117     end
118   end
119
120   def dependency_search(haystack)
121     if haystack.is_a? String
122       if (re = haystack.match /^([0-9a-f]{32}(\+[^,]+)*)+/)
123         {re[1] => true}
124       else
125         {}
126       end
127     elsif haystack.is_a? Array
128       deps = {}
129       haystack.each do |value|
130         deps.merge! dependency_search(value)
131       end
132       deps
133     elsif haystack.respond_to? :keys
134       deps = {}
135       haystack.each do |key, value|
136         deps.merge! dependency_search(value)
137       end
138       deps
139     else
140       {}
141     end
142   end
143
144   def verify_status
145     changed_attributes = self.changed
146
147     if 'state'.in? changed_attributes
148       case self.state
149       when New, Ready, Paused
150         self.active = nil
151         self.success = nil
152       when RunningOnServer
153         self.active = true
154         self.success = nil
155       when RunningOnClient
156         self.active = nil
157         self.success = nil
158       when Failed
159         self.active = false
160         self.success = false
161         self.state = Failed   # before_validation will fail if false is returned in the previous line
162       when Complete
163         self.active = false
164         self.success = true
165       else
166         return false
167       end
168     elsif 'success'.in? changed_attributes
169       logger.info "pipeline_instance changed_attributes has success for #{self.uuid}"
170       if self.success
171         self.active = false
172         self.state = Complete
173       else
174         self.active = false
175         self.state = Failed
176       end
177     elsif 'active'.in? changed_attributes
178       logger.info "pipeline_instance changed_attributes has active for #{self.uuid}"
179       if self.active
180         if self.state.in? [New, Ready, Paused]
181           self.state = RunningOnServer
182         end
183       else
184         if self.state == RunningOnServer # state was RunningOnServer
185           self.active = nil
186           self.state = Paused
187         elsif self.components_look_ready?
188           self.state = Ready
189         else
190           self.state = New
191         end
192       end
193     elsif new_record? and self.state.nil?
194       # No state, active, or success given
195       self.state = New
196     end
197
198     if new_record? or 'components'.in? changed_attributes
199       self.state ||= New
200       if self.state == New and self.components_look_ready?
201         self.state = Ready
202       end
203     end
204
205     if self.state.in?(States)
206       true
207     else
208       errors.add :state, "'#{state.inspect} must be one of: [#{States.join ', '}]"
209       false
210     end
211   end
212
213   def set_state_before_save
214     if !self.state || self.state == New || self.state == Ready || self.state == Paused
215       if self.active
216         self.state = RunningOnServer
217       elsif self.components_look_ready? && (!self.state || self.state == New)
218         self.state = Ready
219       end
220     end
221   end
222
223 end