refs #3889: Merge monkey patch code updates to detect tests that are reusing ActionCo...
[arvados.git] / services / api / app / models / job.rb
1 class Job < ArvadosModel
2   include HasUuid
3   include KindAndEtag
4   include CommonApiTemplate
5   attr_protected :docker_image_locator
6   serialize :script_parameters, Hash
7   serialize :runtime_constraints, Hash
8   serialize :tasks_summary, Hash
9   before_create :ensure_unique_submit_id
10   after_commit :trigger_crunch_dispatch_if_cancelled, :on => :update
11   before_validation :set_priority
12   before_validation :update_timestamps_when_state_changes
13   before_validation :update_state_from_old_state_attrs
14   validate :ensure_script_version_is_commit
15   validate :find_docker_image_locator
16   validate :validate_status
17
18   has_many :commit_ancestors, :foreign_key => :descendant, :primary_key => :script_version
19   has_many(:nodes, foreign_key: :job_uuid, primary_key: :uuid)
20
21   class SubmitIdReused < StandardError
22   end
23
24   api_accessible :user, extend: :common do |t|
25     t.add :submit_id
26     t.add :priority
27     t.add :script
28     t.add :script_parameters
29     t.add :script_version
30     t.add :cancelled_at
31     t.add :cancelled_by_client_uuid
32     t.add :cancelled_by_user_uuid
33     t.add :started_at
34     t.add :finished_at
35     t.add :output
36     t.add :success
37     t.add :running
38     t.add :state
39     t.add :is_locked_by_uuid
40     t.add :log
41     t.add :runtime_constraints
42     t.add :tasks_summary
43     t.add :dependencies
44     t.add :nondeterministic
45     t.add :repository
46     t.add :supplied_script_version
47     t.add :docker_image_locator
48     t.add :queue_position
49     t.add :node_uuids
50     t.add :description
51   end
52
53   # Supported states for a job
54   States = [
55             (Queued = 'Queued'),
56             (Running = 'Running'),
57             (Cancelled = 'Cancelled'),
58             (Failed = 'Failed'),
59             (Complete = 'Complete'),
60            ]
61
62   def assert_finished
63     update_attributes(finished_at: finished_at || Time.now,
64                       success: success.nil? ? false : success,
65                       running: false)
66   end
67
68   def node_uuids
69     nodes.map(&:uuid)
70   end
71
72   def self.queue
73     self.where('started_at is ? and is_locked_by_uuid is ? and cancelled_at is ? and success is ?',
74                nil, nil, nil, nil).
75       order('priority desc, created_at')
76   end
77
78   def queue_position
79     i = 0
80     Job::queue.each do |j|
81       if j[:uuid] == self.uuid
82         return i
83       end
84     end
85     nil
86   end
87
88   def self.running
89     self.where('running = ?', true).
90       order('priority desc, created_at')
91   end
92
93   protected
94
95   def foreign_key_attributes
96     super + %w(output log)
97   end
98
99   def skip_uuid_read_permission_check
100     super + %w(cancelled_by_client_uuid)
101   end
102
103   def skip_uuid_existence_check
104     super + %w(output log)
105   end
106
107   def set_priority
108     if self.priority.nil?
109       self.priority = 0
110     end
111     true
112   end
113
114   def ensure_script_version_is_commit
115     if self.is_locked_by_uuid and self.started_at
116       # Apparently client has already decided to go for it. This is
117       # needed to run a local job using a local working directory
118       # instead of a commit-ish.
119       return true
120     end
121     if new_record? or script_version_changed?
122       sha1 = Commit.find_commit_range(current_user, self.repository, nil, self.script_version, nil)[0] rescue nil
123       if sha1
124         self.supplied_script_version = self.script_version if self.supplied_script_version.nil? or self.supplied_script_version.empty?
125         self.script_version = sha1
126       else
127         self.errors.add :script_version, "#{self.script_version} does not resolve to a commit"
128         return false
129       end
130     end
131   end
132
133   def ensure_unique_submit_id
134     if !submit_id.nil?
135       if Job.where('submit_id=?',self.submit_id).first
136         raise SubmitIdReused.new
137       end
138     end
139     true
140   end
141
142   def find_docker_image_locator
143     # Find the Collection that holds the Docker image specified in the
144     # runtime constraints, and store its locator in docker_image_locator.
145     unless runtime_constraints.is_a? Hash
146       # We're still in validation stage, so we can't assume
147       # runtime_constraints isn't something horrible like an array or
148       # a string. Treat those cases as "no docker image supplied";
149       # other validations will fail anyway.
150       self.docker_image_locator = nil
151       return true
152     end
153     image_search = runtime_constraints['docker_image']
154     image_tag = runtime_constraints['docker_image_tag']
155     if image_search.nil?
156       self.docker_image_locator = nil
157       true
158     elsif coll = Collection.for_latest_docker_image(image_search, image_tag)
159       self.docker_image_locator = coll.portable_data_hash
160       true
161     else
162       errors.add(:docker_image_locator, "not found for #{image_search}")
163       false
164     end
165   end
166
167   def dependencies
168     deps = {}
169     queue = self.script_parameters.values
170     while not queue.empty?
171       queue = queue.flatten.compact.collect do |v|
172         if v.is_a? Hash
173           v.values
174         elsif v.is_a? String
175           v.match(/^(([0-9a-f]{32})\b(\+[^,]+)?,?)*$/) do |locator|
176             deps[locator.to_s] = true
177           end
178           nil
179         end
180       end
181     end
182     deps.keys
183   end
184
185   def permission_to_update
186     if is_locked_by_uuid_was and !(current_user and
187                                    (current_user.uuid == is_locked_by_uuid_was or
188                                     current_user.uuid == system_user.uuid))
189       if script_changed? or
190           script_parameters_changed? or
191           script_version_changed? or
192           (!cancelled_at_was.nil? and
193            (cancelled_by_client_uuid_changed? or
194             cancelled_by_user_uuid_changed? or
195             cancelled_at_changed?)) or
196           started_at_changed? or
197           finished_at_changed? or
198           running_changed? or
199           success_changed? or
200           output_changed? or
201           log_changed? or
202           tasks_summary_changed?
203         logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}"
204         return false
205       end
206     end
207     if !is_locked_by_uuid_changed?
208       super
209     else
210       if !current_user
211         logger.warn "Anonymous user tried to change lock on #{self.class.to_s} #{uuid_was}"
212         false
213       elsif is_locked_by_uuid_was and is_locked_by_uuid_was != current_user.uuid
214         logger.warn "User #{current_user.uuid} tried to steal lock on #{self.class.to_s} #{uuid_was} from #{is_locked_by_uuid_was}"
215         false
216       elsif !is_locked_by_uuid.nil? and is_locked_by_uuid != current_user.uuid
217         logger.warn "User #{current_user.uuid} tried to lock #{self.class.to_s} #{uuid_was} with uuid #{is_locked_by_uuid}"
218         false
219       else
220         super
221       end
222     end
223   end
224
225   def update_modified_by_fields
226     if self.cancelled_at_changed?
227       # Ensure cancelled_at cannot be set to arbitrary non-now times,
228       # or changed once it is set.
229       if self.cancelled_at and not self.cancelled_at_was
230         self.cancelled_at = Time.now
231         self.cancelled_by_user_uuid = current_user.uuid
232         self.cancelled_by_client_uuid = current_api_client.andand.uuid
233         @need_crunch_dispatch_trigger = true
234       else
235         self.cancelled_at = self.cancelled_at_was
236         self.cancelled_by_user_uuid = self.cancelled_by_user_uuid_was
237         self.cancelled_by_client_uuid = self.cancelled_by_client_uuid_was
238       end
239     end
240     super
241   end
242
243   def trigger_crunch_dispatch_if_cancelled
244     if @need_crunch_dispatch_trigger
245       File.open(Rails.configuration.crunch_refresh_trigger, 'wb') do
246         # That's all, just create/touch a file for crunch-job to see.
247       end
248     end
249   end
250
251   def update_timestamps_when_state_changes
252     return if not (state_changed? or new_record?)
253     case state
254     when Running
255       self.started_at ||= Time.now
256     when Failed, Complete
257       self.finished_at ||= Time.now
258     when Cancelled
259       self.cancelled_at ||= Time.now
260     end
261
262     # TODO: Remove the following case block when old "success" and
263     # "running" attrs go away. Until then, this ensures we still
264     # expose correct success/running flags to older clients, even if
265     # some new clients are writing only the new state attribute.
266     case state
267     when Queued
268       self.running = false
269       self.success = nil
270     when Running
271       self.running = true
272       self.success = nil
273     when Cancelled, Failed
274       self.running = false
275       self.success = false
276     when Complete
277       self.running = false
278       self.success = true
279     end
280     self.running ||= false # Default to false instead of nil.
281
282     true
283   end
284
285   def update_state_from_old_state_attrs
286     # If a client has touched the legacy state attrs, update the
287     # "state" attr to agree with the updated values of the legacy
288     # attrs.
289     #
290     # TODO: Remove this method when old "success" and "running" attrs
291     # go away.
292     if cancelled_at_changed? or
293         success_changed? or
294         running_changed? or
295         state.nil?
296       if cancelled_at
297         self.state = Cancelled
298       elsif success == false
299         self.state = Failed
300       elsif success == true
301         self.state = Complete
302       elsif running == true
303         self.state = Running
304       else
305         self.state = Queued
306       end
307     end
308     true
309   end
310
311   def validate_status
312     if self.state.in?(States)
313       true
314     else
315       errors.add :state, "#{state.inspect} must be one of: #{States.inspect}"
316       false
317     end
318   end
319
320 end