Touch the "crunch_refresh_trigger" file when the state changes. This notifies
[arvados.git] / services / api / app / models / pipeline_instance.rb
1 class PipelineInstance < ArvadosModel
2   include HasUuid
3   include KindAndEtag
4   include CommonApiTemplate
5   serialize :components, Hash
6   serialize :properties, Hash
7   serialize :components_summary, Hash
8   belongs_to :pipeline_template, :foreign_key => :pipeline_template_uuid, :primary_key => :uuid
9
10   before_validation :bootstrap_components
11   before_validation :update_state
12   before_validation :verify_status
13   before_create :set_state_before_save
14   before_save :set_state_before_save
15
16   api_accessible :user, extend: :common do |t|
17     t.add :pipeline_template_uuid
18     t.add :pipeline_template, :if => :pipeline_template
19     t.add :name
20     t.add :components
21     t.add :dependencies
22     t.add :properties
23     t.add :state
24     t.add :components_summary
25     t.add :description
26     t.add :started_at
27     t.add :finished_at
28   end
29
30   # Supported states for a pipeline instance
31   States =
32     [
33      (New = 'New'),
34      (Ready = 'Ready'),
35      (RunningOnServer = 'RunningOnServer'),
36      (RunningOnClient = 'RunningOnClient'),
37      (Paused = 'Paused'),
38      (Failed = 'Failed'),
39      (Complete = 'Complete'),
40     ]
41
42   def dependencies
43     dependency_search(self.components).keys
44   end
45
46   # if all components have input, the pipeline is Ready
47   def components_look_ready?
48     if !self.components || self.components.empty?
49       return false
50     end
51
52     all_components_have_input = true
53     self.components.each do |name, component|
54       component['script_parameters'].andand.each do |parametername, parameter|
55         parameter = { 'value' => parameter } unless parameter.is_a? Hash
56         if parameter['value'].nil? and parameter['required']
57           if parameter['output_of']
58             next
59           end
60           all_components_have_input = false
61           break
62         end
63       end
64     end
65     return all_components_have_input
66   end
67
68   def progress_table
69     begin
70       # v0 pipeline format
71       nrow = -1
72       components['steps'].collect do |step|
73         nrow += 1
74         row = [nrow, step['name']]
75         if step['complete'] and step['complete'] != 0
76           if step['output_data_locator']
77             row << 1.0
78           else
79             row << 0.0
80           end
81         else
82           row << 0.0
83           if step['failed']
84             self.state = Failed
85           end
86         end
87         row << (step['warehousejob']['id'] rescue nil)
88         row << (step['warehousejob']['revision'] rescue nil)
89         row << step['output_data_locator']
90         row << (Time.parse(step['warehousejob']['finishtime']) rescue nil)
91         row
92       end
93     rescue
94       []
95     end
96   end
97
98   def progress_ratio
99     t = progress_table
100     return 0 if t.size < 1
101     t.collect { |r| r[2] }.inject(0.0) { |sum,a| sum += a } / t.size
102   end
103
104   def self.queue
105     self.where("state = 'RunningOnServer'")
106   end
107
108   protected
109   def bootstrap_components
110     if pipeline_template and (!components or components.empty?)
111       self.components = pipeline_template.components.deep_dup
112     end
113   end
114
115   def update_state
116     if components and progress_ratio == 1.0
117       self.state = Complete
118     end
119   end
120
121   def dependency_search(haystack)
122     if haystack.is_a? String
123       if (re = haystack.match /^([0-9a-f]{32}(\+[^,]+)*)+/)
124         {re[1] => true}
125       else
126         {}
127       end
128     elsif haystack.is_a? Array
129       deps = {}
130       haystack.each do |value|
131         deps.merge! dependency_search(value)
132       end
133       deps
134     elsif haystack.respond_to? :keys
135       deps = {}
136       haystack.each do |key, value|
137         deps.merge! dependency_search(value)
138       end
139       deps
140     else
141       {}
142     end
143   end
144
145   def verify_status
146     changed_attributes = self.changed
147
148     if new_record? or 'components'.in? changed_attributes
149       self.state ||= New
150       if (self.state == New) and self.components_look_ready?
151         self.state = Ready
152       end
153     end
154
155     if self.state.in?(States)
156       true
157     else
158       errors.add :state, "'#{state.inspect} must be one of: [#{States.join ', '}]"
159       false
160     end
161   end
162
163   def set_state_before_save
164     if self.components_look_ready? && (!self.state || self.state == New)
165       self.state = Ready
166     end
167   end
168
169 end