Merge branch '8800-queue-query'
[arvados.git] / services / api / app / models / pipeline_instance.rb
1 class PipelineInstance < ArvadosModel
2   include HasUuid
3   include KindAndEtag
4   include CommonApiTemplate
5   serialize :components, Hash
6   serialize :properties, Hash
7   serialize :components_summary, Hash
8   belongs_to :pipeline_template, :foreign_key => :pipeline_template_uuid, :primary_key => :uuid
9
10   before_validation :bootstrap_components
11   before_validation :update_state
12   before_validation :verify_status
13   before_create :set_state_before_save
14   before_save :set_state_before_save
15
16   api_accessible :user, extend: :common do |t|
17     t.add :pipeline_template_uuid
18     t.add :name
19     t.add :components
20     t.add :properties
21     t.add :state
22     t.add :components_summary
23     t.add :description
24     t.add :started_at
25     t.add :finished_at
26   end
27
28   # Supported states for a pipeline instance
29   States =
30     [
31      (New = 'New'),
32      (Ready = 'Ready'),
33      (RunningOnServer = 'RunningOnServer'),
34      (RunningOnClient = 'RunningOnClient'),
35      (Paused = 'Paused'),
36      (Failed = 'Failed'),
37      (Complete = 'Complete'),
38     ]
39
40   # if all components have input, the pipeline is Ready
41   def components_look_ready?
42     if !self.components || self.components.empty?
43       return false
44     end
45
46     all_components_have_input = true
47     self.components.each do |name, component|
48       component['script_parameters'].andand.each do |parametername, parameter|
49         parameter = { 'value' => parameter } unless parameter.is_a? Hash
50         if parameter['value'].nil? and parameter['required']
51           if parameter['output_of']
52             next
53           end
54           all_components_have_input = false
55           break
56         end
57       end
58     end
59     return all_components_have_input
60   end
61
62   def progress_table
63     begin
64       # v0 pipeline format
65       nrow = -1
66       components['steps'].collect do |step|
67         nrow += 1
68         row = [nrow, step['name']]
69         if step['complete'] and step['complete'] != 0
70           if step['output_data_locator']
71             row << 1.0
72           else
73             row << 0.0
74           end
75         else
76           row << 0.0
77           if step['failed']
78             self.state = Failed
79           end
80         end
81         row << (step['warehousejob']['id'] rescue nil)
82         row << (step['warehousejob']['revision'] rescue nil)
83         row << step['output_data_locator']
84         row << (Time.parse(step['warehousejob']['finishtime']) rescue nil)
85         row
86       end
87     rescue
88       []
89     end
90   end
91
92   def progress_ratio
93     t = progress_table
94     return 0 if t.size < 1
95     t.collect { |r| r[2] }.inject(0.0) { |sum,a| sum += a } / t.size
96   end
97
98   def self.queue
99     self.where("state = 'RunningOnServer'")
100   end
101
102   protected
103   def bootstrap_components
104     if pipeline_template and (!components or components.empty?)
105       self.components = pipeline_template.components.deep_dup
106     end
107   end
108
109   def update_state
110     if components and progress_ratio == 1.0
111       self.state = Complete
112     end
113   end
114
115   def verify_status
116     changed_attributes = self.changed
117
118     if new_record? or 'components'.in? changed_attributes
119       self.state ||= New
120       if (self.state == New) and self.components_look_ready?
121         self.state = Ready
122       end
123     end
124
125     if self.state.in?(States)
126       true
127     else
128       errors.add :state, "'#{state.inspect} must be one of: [#{States.join ', '}]"
129       false
130     end
131   end
132
133   def set_state_before_save
134     if self.components_look_ready? && (!self.state || self.state == New)
135       self.state = Ready
136     end
137   end
138
139 end