Merge branch '8800-queue-query'
[arvados.git] / services / api / app / models / job.rb
1 class Job < ArvadosModel
2   include HasUuid
3   include KindAndEtag
4   include CommonApiTemplate
5   attr_protected :arvados_sdk_version, :docker_image_locator
6   serialize :script_parameters, Hash
7   serialize :runtime_constraints, Hash
8   serialize :tasks_summary, Hash
9   before_create :ensure_unique_submit_id
10   after_commit :trigger_crunch_dispatch_if_cancelled, :on => :update
11   before_validation :set_priority
12   before_validation :update_state_from_old_state_attrs
13   validate :ensure_script_version_is_commit
14   validate :find_docker_image_locator
15   validate :find_arvados_sdk_version
16   validate :validate_status
17   validate :validate_state_change
18   validate :ensure_no_collection_uuids_in_script_params
19   before_save :tag_version_in_internal_repository
20   before_save :update_timestamps_when_state_changes
21
22   has_many :commit_ancestors, :foreign_key => :descendant, :primary_key => :script_version
23   has_many(:nodes, foreign_key: :job_uuid, primary_key: :uuid)
24
25   class SubmitIdReused < StandardError
26   end
27
28   api_accessible :user, extend: :common do |t|
29     t.add :submit_id
30     t.add :priority
31     t.add :script
32     t.add :script_parameters
33     t.add :script_version
34     t.add :cancelled_at
35     t.add :cancelled_by_client_uuid
36     t.add :cancelled_by_user_uuid
37     t.add :started_at
38     t.add :finished_at
39     t.add :output
40     t.add :success
41     t.add :running
42     t.add :state
43     t.add :is_locked_by_uuid
44     t.add :log
45     t.add :runtime_constraints
46     t.add :tasks_summary
47     t.add :nondeterministic
48     t.add :repository
49     t.add :supplied_script_version
50     t.add :arvados_sdk_version
51     t.add :docker_image_locator
52     t.add :queue_position
53     t.add :node_uuids
54     t.add :description
55   end
56
57   # Supported states for a job
58   States = [
59             (Queued = 'Queued'),
60             (Running = 'Running'),
61             (Cancelled = 'Cancelled'),
62             (Failed = 'Failed'),
63             (Complete = 'Complete'),
64            ]
65
66   def assert_finished
67     update_attributes(finished_at: finished_at || db_current_time,
68                       success: success.nil? ? false : success,
69                       running: false)
70   end
71
72   def node_uuids
73     nodes.map(&:uuid)
74   end
75
76   def self.queue
77     self.where('state = ?', Queued).order('priority desc, created_at')
78   end
79
80   def queue_position
81     # We used to report this accurately, but the implementation made queue
82     # API requests O(n**2) for the size of the queue.  See #8800.
83     # We've soft-disabled it because it's not clear we even want this
84     # functionality: now that we have Node Manager with support for multiple
85     # node sizes, "queue position" tells you very little about when a job will
86     # run.
87     state == Queued ? 0 : nil
88   end
89
90   def self.running
91     self.where('running = ?', true).
92       order('priority desc, created_at')
93   end
94
95   def lock locked_by_uuid
96     transaction do
97       self.reload
98       unless self.state == Queued and self.is_locked_by_uuid.nil?
99         raise AlreadyLockedError
100       end
101       self.state = Running
102       self.is_locked_by_uuid = locked_by_uuid
103       self.save!
104     end
105   end
106
107   protected
108
109   def foreign_key_attributes
110     super + %w(output log)
111   end
112
113   def skip_uuid_read_permission_check
114     super + %w(cancelled_by_client_uuid)
115   end
116
117   def skip_uuid_existence_check
118     super + %w(output log)
119   end
120
121   def set_priority
122     if self.priority.nil?
123       self.priority = 0
124     end
125     true
126   end
127
128   def ensure_script_version_is_commit
129     if state == Running
130       # Apparently client has already decided to go for it. This is
131       # needed to run a local job using a local working directory
132       # instead of a commit-ish.
133       return true
134     end
135     if new_record? or repository_changed? or script_version_changed?
136       sha1 = Commit.find_commit_range(repository,
137                                       nil, script_version, nil).first
138       if not sha1
139         errors.add :script_version, "#{script_version} does not resolve to a commit"
140         return false
141       end
142       if supplied_script_version.nil? or supplied_script_version.empty?
143         self.supplied_script_version = script_version
144       end
145       self.script_version = sha1
146     end
147     true
148   end
149
150   def tag_version_in_internal_repository
151     if state == Running
152       # No point now. See ensure_script_version_is_commit.
153       true
154     elsif errors.any?
155       # Won't be saved, and script_version might not even be valid.
156       true
157     elsif new_record? or repository_changed? or script_version_changed?
158       uuid_was = uuid
159       begin
160         assign_uuid
161         Commit.tag_in_internal_repository repository, script_version, uuid
162       rescue
163         uuid = uuid_was
164         raise
165       end
166     end
167   end
168
169   def ensure_unique_submit_id
170     if !submit_id.nil?
171       if Job.where('submit_id=?',self.submit_id).first
172         raise SubmitIdReused.new
173       end
174     end
175     true
176   end
177
178   def resolve_runtime_constraint(key, attr_sym)
179     if ((runtime_constraints.is_a? Hash) and
180         (search = runtime_constraints[key]))
181       ok, result = yield search
182     else
183       ok, result = true, nil
184     end
185     if ok
186       send("#{attr_sym}=".to_sym, result)
187     else
188       errors.add(attr_sym, result)
189     end
190     ok
191   end
192
193   def find_arvados_sdk_version
194     resolve_runtime_constraint("arvados_sdk_version",
195                                :arvados_sdk_version) do |git_search|
196       commits = Commit.find_commit_range("arvados",
197                                          nil, git_search, nil)
198       if commits.empty?
199         [false, "#{git_search} does not resolve to a commit"]
200       elsif not runtime_constraints["docker_image"]
201         [false, "cannot be specified without a Docker image constraint"]
202       else
203         [true, commits.first]
204       end
205     end
206   end
207
208   def find_docker_image_locator
209     runtime_constraints['docker_image'] =
210         Rails.configuration.default_docker_image_for_jobs if ((runtime_constraints.is_a? Hash) and
211                                                               (runtime_constraints['docker_image']).nil? and
212                                                               Rails.configuration.default_docker_image_for_jobs)
213     resolve_runtime_constraint("docker_image",
214                                :docker_image_locator) do |image_search|
215       image_tag = runtime_constraints['docker_image_tag']
216       if coll = Collection.for_latest_docker_image(image_search, image_tag)
217         [true, coll.portable_data_hash]
218       else
219         [false, "not found for #{image_search}"]
220       end
221     end
222   end
223
224   def permission_to_update
225     if is_locked_by_uuid_was and !(current_user and
226                                    (current_user.uuid == is_locked_by_uuid_was or
227                                     current_user.uuid == system_user.uuid))
228       if script_changed? or
229           script_parameters_changed? or
230           script_version_changed? or
231           (!cancelled_at_was.nil? and
232            (cancelled_by_client_uuid_changed? or
233             cancelled_by_user_uuid_changed? or
234             cancelled_at_changed?)) or
235           started_at_changed? or
236           finished_at_changed? or
237           running_changed? or
238           success_changed? or
239           output_changed? or
240           log_changed? or
241           tasks_summary_changed? or
242           state_changed?
243         logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}"
244         return false
245       end
246     end
247     if !is_locked_by_uuid_changed?
248       super
249     else
250       if !current_user
251         logger.warn "Anonymous user tried to change lock on #{self.class.to_s} #{uuid_was}"
252         false
253       elsif is_locked_by_uuid_was and is_locked_by_uuid_was != current_user.uuid
254         logger.warn "User #{current_user.uuid} tried to steal lock on #{self.class.to_s} #{uuid_was} from #{is_locked_by_uuid_was}"
255         false
256       elsif !is_locked_by_uuid.nil? and is_locked_by_uuid != current_user.uuid
257         logger.warn "User #{current_user.uuid} tried to lock #{self.class.to_s} #{uuid_was} with uuid #{is_locked_by_uuid}"
258         false
259       else
260         super
261       end
262     end
263   end
264
265   def update_modified_by_fields
266     if self.cancelled_at_changed?
267       # Ensure cancelled_at cannot be set to arbitrary non-now times,
268       # or changed once it is set.
269       if self.cancelled_at and not self.cancelled_at_was
270         self.cancelled_at = db_current_time
271         self.cancelled_by_user_uuid = current_user.uuid
272         self.cancelled_by_client_uuid = current_api_client.andand.uuid
273         @need_crunch_dispatch_trigger = true
274       else
275         self.cancelled_at = self.cancelled_at_was
276         self.cancelled_by_user_uuid = self.cancelled_by_user_uuid_was
277         self.cancelled_by_client_uuid = self.cancelled_by_client_uuid_was
278       end
279     end
280     super
281   end
282
283   def trigger_crunch_dispatch_if_cancelled
284     if @need_crunch_dispatch_trigger
285       File.open(Rails.configuration.crunch_refresh_trigger, 'wb') do
286         # That's all, just create/touch a file for crunch-job to see.
287       end
288     end
289   end
290
291   def update_timestamps_when_state_changes
292     return if not (state_changed? or new_record?)
293
294     case state
295     when Running
296       self.started_at ||= db_current_time
297     when Failed, Complete
298       self.finished_at ||= db_current_time
299     when Cancelled
300       self.cancelled_at ||= db_current_time
301     end
302
303     # TODO: Remove the following case block when old "success" and
304     # "running" attrs go away. Until then, this ensures we still
305     # expose correct success/running flags to older clients, even if
306     # some new clients are writing only the new state attribute.
307     case state
308     when Queued
309       self.running = false
310       self.success = nil
311     when Running
312       self.running = true
313       self.success = nil
314     when Cancelled, Failed
315       self.running = false
316       self.success = false
317     when Complete
318       self.running = false
319       self.success = true
320     end
321     self.running ||= false # Default to false instead of nil.
322
323     @need_crunch_dispatch_trigger = true
324
325     true
326   end
327
328   def update_state_from_old_state_attrs
329     # If a client has touched the legacy state attrs, update the
330     # "state" attr to agree with the updated values of the legacy
331     # attrs.
332     #
333     # TODO: Remove this method when old "success" and "running" attrs
334     # go away.
335     if cancelled_at_changed? or
336         success_changed? or
337         running_changed? or
338         state.nil?
339       if cancelled_at
340         self.state = Cancelled
341       elsif success == false
342         self.state = Failed
343       elsif success == true
344         self.state = Complete
345       elsif running == true
346         self.state = Running
347       else
348         self.state = Queued
349       end
350     end
351     true
352   end
353
354   def validate_status
355     if self.state.in?(States)
356       true
357     else
358       errors.add :state, "#{state.inspect} must be one of: #{States.inspect}"
359       false
360     end
361   end
362
363   def validate_state_change
364     ok = true
365     if self.state_changed?
366       ok = case self.state_was
367            when nil
368              # state isn't set yet
369              true
370            when Queued
371              # Permit going from queued to any state
372              true
373            when Running
374              # From running, may only transition to a finished state
375              [Complete, Failed, Cancelled].include? self.state
376            when Complete, Failed, Cancelled
377              # Once in a finished state, don't permit any more state changes
378              false
379            else
380              # Any other state transition is also invalid
381              false
382            end
383       if not ok
384         errors.add :state, "invalid change from #{self.state_was} to #{self.state}"
385       end
386     end
387     ok
388   end
389
390   def ensure_no_collection_uuids_in_script_params
391     # recursive_hash_search searches recursively through hashes and
392     # arrays in 'thing' for string fields matching regular expression
393     # 'pattern'.  Returns true if pattern is found, false otherwise.
394     def recursive_hash_search thing, pattern
395       if thing.is_a? Hash
396         thing.each do |k, v|
397           return true if recursive_hash_search v, pattern
398         end
399       elsif thing.is_a? Array
400         thing.each do |k|
401           return true if recursive_hash_search k, pattern
402         end
403       elsif thing.is_a? String
404         return true if thing.match pattern
405       end
406       false
407     end
408
409     # Fail validation if any script_parameters field includes a string containing a
410     # collection uuid pattern.
411     if self.script_parameters_changed?
412       if recursive_hash_search(self.script_parameters, Collection.uuid_regex)
413         self.errors.add :script_parameters, "must use portable_data_hash instead of collection uuid"
414         return false
415       end
416     end
417     true
418   end
419 end