651f36d30f16350f2dee5fde0b47bca824184d8f
[arvados.git] / services / api / app / models / pipeline_instance.rb
1 class PipelineInstance < ArvadosModel
2   include HasUuid
3   include KindAndEtag
4   include CommonApiTemplate
5   serialize :components, Hash
6   serialize :properties, Hash
7   serialize :components_summary, Hash
8   belongs_to :pipeline_template, :foreign_key => :pipeline_template_uuid, :primary_key => :uuid
9
10   before_validation :bootstrap_components
11   before_validation :update_state
12   before_validation :verify_status
13   before_validation :update_timestamps_when_state_changes
14   before_create :set_state_before_save
15   before_save :set_state_before_save
16
17   api_accessible :user, extend: :common do |t|
18     t.add :pipeline_template_uuid
19     t.add :name
20     t.add :components
21     t.add :properties
22     t.add :state
23     t.add :components_summary
24     t.add :description
25     t.add :started_at
26     t.add :finished_at
27   end
28
29   # Supported states for a pipeline instance
30   States =
31     [
32      (New = 'New'),
33      (Ready = 'Ready'),
34      (RunningOnServer = 'RunningOnServer'),
35      (RunningOnClient = 'RunningOnClient'),
36      (Paused = 'Paused'),
37      (Failed = 'Failed'),
38      (Complete = 'Complete'),
39     ]
40
41   # if all components have input, the pipeline is Ready
42   def components_look_ready?
43     if !self.components || self.components.empty?
44       return false
45     end
46
47     all_components_have_input = true
48     self.components.each do |name, component|
49       component['script_parameters'].andand.each do |parametername, parameter|
50         parameter = { 'value' => parameter } unless parameter.is_a? Hash
51         if parameter['value'].nil? and parameter['required']
52           if parameter['output_of']
53             next
54           end
55           all_components_have_input = false
56           break
57         end
58       end
59     end
60     return all_components_have_input
61   end
62
63   def progress_table
64     begin
65       # v0 pipeline format
66       nrow = -1
67       components['steps'].collect do |step|
68         nrow += 1
69         row = [nrow, step['name']]
70         if step['complete'] and step['complete'] != 0
71           if step['output_data_locator']
72             row << 1.0
73           else
74             row << 0.0
75           end
76         else
77           row << 0.0
78           if step['failed']
79             self.state = Failed
80           end
81         end
82         row << (step['warehousejob']['id'] rescue nil)
83         row << (step['warehousejob']['revision'] rescue nil)
84         row << step['output_data_locator']
85         row << (Time.parse(step['warehousejob']['finishtime']) rescue nil)
86         row
87       end
88     rescue
89       []
90     end
91   end
92
93   def progress_ratio
94     t = progress_table
95     return 0 if t.size < 1
96     t.collect { |r| r[2] }.inject(0.0) { |sum,a| sum += a } / t.size
97   end
98
99   def self.queue
100     self.where("state = 'RunningOnServer'")
101   end
102
103   def cancel cascade=nil
104     if self.state.in?([RunningOnServer, RunningOnClient])
105       self.state = Paused
106       self.save!
107     elsif self.state != Paused
108       raise InvalidStateTransitionError
109     end
110
111     return if !cascade
112
113     # cancel all child jobs
114     children = self.components.andand.collect{|_, c| c['job']}.compact.collect{|j| j['uuid']}.compact
115
116     return if children.empty?
117
118     Job.where(uuid: children).each do |job|
119       job.cancel cascade if job.state.in?([Job::Queued, Job::Running])
120     end
121   end
122
123   protected
124   def bootstrap_components
125     if pipeline_template and (!components or components.empty?)
126       self.components = pipeline_template.components.deep_dup
127     end
128   end
129
130   def update_state
131     if components and progress_ratio == 1.0
132       self.state = Complete
133     end
134   end
135
136   def verify_status
137     changed_attributes = self.changed
138
139     if new_record? or 'components'.in? changed_attributes
140       self.state ||= New
141       if (self.state == New) and self.components_look_ready?
142         self.state = Ready
143       end
144     end
145
146     if self.state.in?(States)
147       true
148     else
149       errors.add :state, "'#{state.inspect} must be one of: [#{States.join ', '}]"
150       false
151     end
152   end
153
154   def set_state_before_save
155     if self.components_look_ready? && (!self.state || self.state == New)
156       self.state = Ready
157     end
158   end
159
160   def update_timestamps_when_state_changes
161     return if not (state_changed? or new_record?)
162
163     case state
164     when RunningOnServer, RunningOnClient
165       self.started_at ||= db_current_time
166     when Failed, Complete
167       current_time = db_current_time
168       self.started_at ||= current_time
169       self.finished_at ||= current_time
170     end
171   end
172
173 end