1 class PipelineInstance < ArvadosModel
4 include CommonApiTemplate
5 serialize :components, Hash
6 serialize :properties, Hash
7 serialize :components_summary, Hash
8 belongs_to :pipeline_template, :foreign_key => :pipeline_template_uuid, :primary_key => :uuid
10 before_validation :bootstrap_components
11 before_validation :update_state
12 before_validation :verify_status
13 before_validation :update_timestamps_when_state_changes
14 before_create :set_state_before_save
15 before_save :set_state_before_save
17 api_accessible :user, extend: :common do |t|
18 t.add :pipeline_template_uuid
23 t.add :components_summary
29 # Supported states for a pipeline instance
34 (RunningOnServer = 'RunningOnServer'),
35 (RunningOnClient = 'RunningOnClient'),
38 (Complete = 'Complete'),
41 # if all components have input, the pipeline is Ready
42 def components_look_ready?
43 if !self.components || self.components.empty?
47 all_components_have_input = true
48 self.components.each do |name, component|
49 component['script_parameters'].andand.each do |parametername, parameter|
50 parameter = { 'value' => parameter } unless parameter.is_a? Hash
51 if parameter['value'].nil? and parameter['required']
52 if parameter['output_of']
55 all_components_have_input = false
60 return all_components_have_input
67 components['steps'].collect do |step|
69 row = [nrow, step['name']]
70 if step['complete'] and step['complete'] != 0
71 if step['output_data_locator']
82 row << (step['warehousejob']['id'] rescue nil)
83 row << (step['warehousejob']['revision'] rescue nil)
84 row << step['output_data_locator']
85 row << (Time.parse(step['warehousejob']['finishtime']) rescue nil)
95 return 0 if t.size < 1
96 t.collect { |r| r[2] }.inject(0.0) { |sum,a| sum += a } / t.size
100 self.where("state = 'RunningOnServer'")
103 def cancel cascade=nil
104 if self.state.in?([RunningOnServer, RunningOnClient])
107 elsif self.state != Paused
108 raise InvalidStateTransitionError
113 # cancel all child jobs
114 children = self.components.andand.collect{|_, c| c['job']}.compact.collect{|j| j['uuid']}.compact
116 return if children.empty?
118 Job.where(uuid: children).each do |job|
119 job.cancel cascade if job.state.in?([Job::Queued, Job::Running])
124 def bootstrap_components
125 if pipeline_template and (!components or components.empty?)
126 self.components = pipeline_template.components.deep_dup
131 if components and progress_ratio == 1.0
132 self.state = Complete
137 changed_attributes = self.changed
139 if new_record? or 'components'.in? changed_attributes
141 if (self.state == New) and self.components_look_ready?
146 if self.state.in?(States)
149 errors.add :state, "'#{state.inspect} must be one of: [#{States.join ', '}]"
154 def set_state_before_save
155 if self.components_look_ready? && (!self.state || self.state == New)
160 def update_timestamps_when_state_changes
161 return if not (state_changed? or new_record?)
164 when RunningOnServer, RunningOnClient
165 self.started_at ||= db_current_time
166 when Failed, Complete
167 current_time = db_current_time
168 self.started_at ||= current_time
169 self.finished_at ||= current_time