Merge pull request #1 from arvados/master
[arvados.git] / services / api / app / models / pipeline_instance.rb
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 class PipelineInstance < ArvadosModel
6   include HasUuid
7   include KindAndEtag
8   include CommonApiTemplate
9   serialize :components, Hash
10   serialize :properties, Hash
11   serialize :components_summary, Hash
12   belongs_to :pipeline_template, :foreign_key => :pipeline_template_uuid, :primary_key => :uuid
13
14   before_validation :bootstrap_components
15   before_validation :update_state
16   before_validation :verify_status
17   before_validation :update_timestamps_when_state_changes
18   before_create :set_state_before_save
19   before_save :set_state_before_save
20   before_create :create_disabled
21   before_update :update_disabled
22
23   api_accessible :user, extend: :common do |t|
24     t.add :pipeline_template_uuid
25     t.add :name
26     t.add :components
27     t.add :properties
28     t.add :state
29     t.add :components_summary
30     t.add :description
31     t.add :started_at
32     t.add :finished_at
33   end
34
35   # Supported states for a pipeline instance
36   States =
37     [
38      (New = 'New'),
39      (Ready = 'Ready'),
40      (RunningOnServer = 'RunningOnServer'),
41      (RunningOnClient = 'RunningOnClient'),
42      (Paused = 'Paused'),
43      (Failed = 'Failed'),
44      (Complete = 'Complete'),
45     ]
46
47   def self.limit_index_columns_read
48     ["components"]
49   end
50
51   # if all components have input, the pipeline is Ready
52   def components_look_ready?
53     if !self.components || self.components.empty?
54       return false
55     end
56
57     all_components_have_input = true
58     self.components.each do |name, component|
59       component['script_parameters'].andand.each do |parametername, parameter|
60         parameter = { 'value' => parameter } unless parameter.is_a? Hash
61         if parameter['value'].nil? and parameter['required']
62           if parameter['output_of']
63             next
64           end
65           all_components_have_input = false
66           break
67         end
68       end
69     end
70     return all_components_have_input
71   end
72
73   def progress_table
74     begin
75       # v0 pipeline format
76       nrow = -1
77       components['steps'].collect do |step|
78         nrow += 1
79         row = [nrow, step['name']]
80         if step['complete'] and step['complete'] != 0
81           if step['output_data_locator']
82             row << 1.0
83           else
84             row << 0.0
85           end
86         else
87           row << 0.0
88           if step['failed']
89             self.state = Failed
90           end
91         end
92         row << (step['warehousejob']['id'] rescue nil)
93         row << (step['warehousejob']['revision'] rescue nil)
94         row << step['output_data_locator']
95         row << (Time.parse(step['warehousejob']['finishtime']) rescue nil)
96         row
97       end
98     rescue
99       []
100     end
101   end
102
103   def progress_ratio
104     t = progress_table
105     return 0 if t.size < 1
106     t.collect { |r| r[2] }.inject(0.0) { |sum,a| sum += a } / t.size
107   end
108
109   def self.queue
110     self.where("state = 'RunningOnServer'")
111   end
112
113   def cancel(cascade: false, need_transaction: true)
114     raise "No longer supported"
115   end
116
117   protected
118   def bootstrap_components
119     if pipeline_template and (!components or components.empty?)
120       self.components = pipeline_template.components.deep_dup
121     end
122   end
123
124   def update_state
125     if components and progress_ratio == 1.0
126       self.state = Complete
127     end
128   end
129
130   def verify_status
131     changed_attributes = self.changed
132
133     if new_record? or 'components'.in? changed_attributes
134       self.state ||= New
135       if (self.state == New) and self.components_look_ready?
136         self.state = Ready
137       end
138     end
139
140     if !self.state.in?(States)
141       errors.add :state, "'#{state.inspect} must be one of: [#{States.join ', '}]"
142       throw(:abort)
143     end
144   end
145
146   def set_state_before_save
147     if self.components_look_ready? && (!self.state || self.state == New)
148       self.state = Ready
149     end
150   end
151
152   def update_timestamps_when_state_changes
153     return if not (state_changed? or new_record?)
154
155     case state
156     when RunningOnServer, RunningOnClient
157       self.started_at ||= db_current_time
158     when Failed, Complete
159       current_time = db_current_time
160       self.started_at ||= current_time
161       self.finished_at ||= current_time
162     end
163   end
164
165
166   def create_disabled
167     raise "Disabled"
168   end
169
170   def update_disabled
171     raise "Disabled"
172   end
173 end