3187: Added algorithm to calculate wall clock run time from a set of overlapping...
[arvados.git] / apps / workbench / app / helpers / pipeline_instances_helper.rb
1 module PipelineInstancesHelper
2
3   def pipeline_jobs object=nil
4     object ||= @object
5     if object.components[:steps].is_a? Array
6       pipeline_jobs_oldschool object
7     elsif object.components.is_a? Hash
8       pipeline_jobs_newschool object
9     end
10   end
11
12   def render_pipeline_jobs
13     pipeline_jobs.collect do |pj|
14       render_pipeline_job pj
15     end
16   end
17
18   def render_pipeline_job pj
19     pj[:progress_bar] = render partial: 'job_progress', locals: {:j => pj[:job]}
20     pj[:output_link] = link_to_if_arvados_object pj[:output]
21     pj[:job_link] = link_to_if_arvados_object pj[:job][:uuid]
22     pj
23   end
24
25   protected
26
27   def pipeline_jobs_newschool object
28     ret = []
29     i = -1
30
31     jobuuids = object.components.values.map { |c|
32       c[:job][:uuid] if c.is_a?(Hash) and c[:job].is_a?(Hash)
33     }.compact
34     job = {}
35     Job.where(uuid: jobuuids).each do |j|
36       job[j[:uuid]] = j
37     end
38
39     object.components.each do |cname, c|
40       i += 1
41       pj = {index: i, name: cname}
42       if not c.is_a?(Hash)
43         ret << pj
44         next
45       end
46       if c[:job] and c[:job][:uuid] and job[c[:job][:uuid]]
47         pj[:job] = job[c[:job][:uuid]]
48       else
49         pj[:job] = c[:job].is_a?(Hash) ? c[:job] : {}
50       end
51       pj[:percent_done] = 0
52       pj[:percent_running] = 0
53       if pj[:job][:success]
54         if pj[:job][:output]
55           pj[:progress] = 1.0
56           pj[:percent_done] = 100
57         else
58           pj[:progress] = 0.0
59         end
60       else
61         if pj[:job][:tasks_summary]
62           begin
63             ts = pj[:job][:tasks_summary]
64             denom = ts[:done].to_f + ts[:running].to_f + ts[:todo].to_f
65             pj[:progress] = (ts[:done].to_f + ts[:running].to_f/2) / denom
66             pj[:percent_done] = 100.0 * ts[:done].to_f / denom
67             pj[:percent_running] = 100.0 * ts[:running].to_f / denom
68             pj[:progress_detail] = "#{ts[:done]} done #{ts[:running]} run #{ts[:todo]} todo"
69           rescue
70             pj[:progress] = 0.5
71             pj[:percent_done] = 0.0
72             pj[:percent_running] = 100.0
73           end
74         else
75           pj[:progress] = 0.0
76         end
77       end
78       if pj[:job][:success]
79         pj[:result] = 'complete'
80         pj[:labeltype] = 'success'
81         pj[:complete] = true
82         pj[:progress] = 1.0
83       elsif pj[:job][:finished_at]
84         pj[:result] = 'failed'
85         pj[:labeltype] = 'danger'
86         pj[:failed] = true
87       elsif pj[:job][:started_at]
88         pj[:result] = 'running'
89         pj[:labeltype] = 'primary'
90       elsif pj[:job][:uuid]
91         pj[:result] = 'queued'
92         pj[:labeltype] = 'default'
93       else
94         pj[:result] = 'none'
95         pj[:labeltype] = 'default'
96       end
97
98       pj[:job_id] = pj[:job][:uuid]
99       pj[:script] = pj[:job][:script] || c[:script]
100       pj[:repository] = pj[:job][:script] || c[:repository]
101       pj[:script_parameters] = pj[:job][:script_parameters] || c[:script_parameters]
102       pj[:script_version] = pj[:job][:script_version] || c[:script_version]
103       pj[:nondeterministic] = pj[:job][:nondeterministic] || c[:nondeterministic]
104       pj[:output] = pj[:job][:output]
105       pj[:output_uuid] = c[:output_uuid]
106       pj[:finished_at] = (Time.parse(pj[:job][:finished_at]) rescue nil)
107       ret << pj
108     end
109     ret
110   end
111
112   def pipeline_jobs_oldschool object
113     ret = []
114     object.components[:steps].each_with_index do |step, i|
115       pj = {index: i, name: step[:name]}
116       if step[:complete] and step[:complete] != 0
117         if step[:output_data_locator]
118           pj[:progress] = 1.0
119         else
120           pj[:progress] = 0.0
121         end
122       else
123         if step[:progress] and
124             (re = step[:progress].match /^(\d+)\+(\d+)\/(\d+)$/)
125           pj[:progress] = (((re[1].to_f + re[2].to_f/2) / re[3].to_f) rescue 0.5)
126         else
127           pj[:progress] = 0.0
128         end
129         if step[:failed]
130           pj[:result] = 'failed'
131           pj[:failed] = true
132         end
133       end
134       if step[:warehousejob]
135         if step[:complete]
136           pj[:result] = 'complete'
137           pj[:complete] = true
138           pj[:progress] = 1.0
139         elsif step[:warehousejob][:finishtime]
140           pj[:result] = 'failed'
141           pj[:failed] = true
142         elsif step[:warehousejob][:starttime]
143           pj[:result] = 'running'
144         else
145           pj[:result] = 'queued'
146         end
147       end
148       pj[:progress_detail] = (step[:progress] rescue nil)
149       pj[:job_id] = (step[:warehousejob][:id] rescue nil)
150       pj[:job_link] = pj[:job_id]
151       pj[:script] = step[:function]
152       pj[:script_version] = (step[:warehousejob][:revision] rescue nil)
153       pj[:output] = step[:output_data_locator]
154       pj[:finished_at] = (Time.parse(step[:warehousejob][:finishtime]) rescue nil)
155       ret << pj
156     end
157     ret
158   end
159
160   def runtime duration, long
161     hours = 0
162     minutes = 0
163     seconds = 0
164     if duration >= 3600
165       hours = (duration / 3600).floor
166       duration -= hours * 3600
167     end
168     if duration >= 60
169       minutes = (duration / 60).floor
170       duration -= minutes * 60
171     end
172     duration = duration.floor
173
174     if long
175       s = ""
176       if hours > 0 then
177         s += "#{hours} hour#{'s' if hours != 1} "
178       end
179       if minutes > 0 then
180         s += "#{minutes} minute#{'s' if minutes != 1} "
181       end
182       s += "#{duration} second#{'s' if duration != 1}"
183     else
184       s = "#{hours}:#{minutes.to_s.rjust(2, '0')}:#{duration.to_s.rjust(2, '0')}"
185     end
186     s
187   end
188
189   def determine_wallclock_runtime jobs
190     timestamps = []
191     jobs.each do |j|
192       insert_at = 0
193       timestamps.each_index do |i|
194         if started_at = j[:started_at]
195           finished_at = (if j[:finished_at] then j[:finished_at] else Time.now end)
196
197           if started_at >= timestamps[i][0] and finished_at <= timestamps[i][1]
198             # 'j' started and ended during 'i'
199             insert_at = -1
200             break
201           end
202
203           if started_at < timestamps[i][0] and finished_at >= timestamps[i][0] and finished_at <= timestamps[i][1]
204             # 'j' started before 'i' and ended during 'i'
205             # move start time of 'i' back
206             timestamps[i][0] = started_at
207             insert_at = -1
208             break
209           end
210
211           if started_at >= timestamps[i][0] and started_at <= timestamps[i][1]
212             # 'j' started during 'i'
213             # move end time of 'i' back
214             timestamps[i][1] = finished_at
215             insert_at = -1
216             break
217           end
218
219           if finished_at < timestamps[i][0]
220             # 'j' finished before 'i' started, so insert before 'i'
221             insert_at = i
222             break
223           end
224
225           if started_at > timestamps[i][1]
226             # 'j' started after 'i' finished, so insert after 'i'
227             insert_at = i
228             break
229           end
230
231         end
232       end
233       if insert_at > -1
234         timestamps.insert insert_at, [started_at, finished_at]
235       end
236     end
237     timestamps.map { |t| t[1] - t[0] }.reduce(:+)
238   end
239 end