10053: Log timing metrics for Job model updates
[arvados.git] / services / api / app / models / job.rb
1 class Job < ArvadosModel
2   include HasUuid
3   include KindAndEtag
4   include CommonApiTemplate
5   extend CurrentApiClient
6   serialize :components, Hash
7   attr_protected :arvados_sdk_version, :docker_image_locator
8   serialize :script_parameters, Hash
9   serialize :runtime_constraints, Hash
10   serialize :tasks_summary, Hash
11   before_create :ensure_unique_submit_id
12   after_commit :trigger_crunch_dispatch_if_cancelled, :on => :update
13   before_validation :set_priority
14   before_validation :update_state_from_old_state_attrs
15   before_validation :update_script_parameters_digest
16   validate :ensure_script_version_is_commit
17   validate :find_docker_image_locator
18   validate :find_arvados_sdk_version
19   validate :validate_status
20   validate :validate_state_change
21   validate :ensure_no_collection_uuids_in_script_params
22   before_save :tag_version_in_internal_repository
23   before_save :update_timestamps_when_state_changes
24
25   has_many :commit_ancestors, :foreign_key => :descendant, :primary_key => :script_version
26   has_many(:nodes, foreign_key: :job_uuid, primary_key: :uuid)
27
28   class SubmitIdReused < StandardError
29   end
30
31   api_accessible :user, extend: :common do |t|
32     t.add :submit_id
33     t.add :priority
34     t.add :script
35     t.add :script_parameters
36     t.add :script_version
37     t.add :cancelled_at
38     t.add :cancelled_by_client_uuid
39     t.add :cancelled_by_user_uuid
40     t.add :started_at
41     t.add :finished_at
42     t.add :output
43     t.add :success
44     t.add :running
45     t.add :state
46     t.add :is_locked_by_uuid
47     t.add :log
48     t.add :runtime_constraints
49     t.add :tasks_summary
50     t.add :nondeterministic
51     t.add :repository
52     t.add :supplied_script_version
53     t.add :arvados_sdk_version
54     t.add :docker_image_locator
55     t.add :queue_position
56     t.add :node_uuids
57     t.add :description
58     t.add :components
59   end
60
61   # Supported states for a job
62   States = [
63             (Queued = 'Queued'),
64             (Running = 'Running'),
65             (Cancelled = 'Cancelled'),
66             (Failed = 'Failed'),
67             (Complete = 'Complete'),
68            ]
69
70   def assert_finished
71     update_attributes(finished_at: finished_at || db_current_time,
72                       success: success.nil? ? false : success,
73                       running: false)
74   end
75
76   def node_uuids
77     nodes.map(&:uuid)
78   end
79
80   def self.queue
81     self.where('state = ?', Queued).order('priority desc, created_at')
82   end
83
84   def queue_position
85     # We used to report this accurately, but the implementation made queue
86     # API requests O(n**2) for the size of the queue.  See #8800.
87     # We've soft-disabled it because it's not clear we even want this
88     # functionality: now that we have Node Manager with support for multiple
89     # node sizes, "queue position" tells you very little about when a job will
90     # run.
91     state == Queued ? 0 : nil
92   end
93
94   def self.running
95     self.where('running = ?', true).
96       order('priority desc, created_at')
97   end
98
99   def lock locked_by_uuid
100     with_lock do
101       unless self.state == Queued and self.is_locked_by_uuid.nil?
102         raise AlreadyLockedError
103       end
104       self.state = Running
105       self.is_locked_by_uuid = locked_by_uuid
106       self.save!
107     end
108   end
109
110   def update_script_parameters_digest
111     Job.benchmark("Job.update_script_parameters_digest #{self.uuid}") do
112       self.script_parameters_digest = self.class.sorted_hash_digest(script_parameters)
113     end
114   end
115
116   def self.searchable_columns operator
117     super - ["script_parameters_digest"]
118   end
119
120   def self.load_job_specific_filters attrs, orig_filters, read_users
121     # Convert Job-specific @filters entries into general SQL filters.
122     script_info = {"repository" => nil, "script" => nil}
123     git_filters = Hash.new do |hash, key|
124       hash[key] = {"max_version" => "HEAD", "exclude_versions" => []}
125     end
126     filters = []
127     orig_filters.each do |attr, operator, operand|
128       if (script_info.has_key? attr) and (operator == "=")
129         if script_info[attr].nil?
130           script_info[attr] = operand
131         elsif script_info[attr] != operand
132           raise ArgumentError.new("incompatible #{attr} filters")
133         end
134       end
135       case operator
136       when "in git"
137         git_filters[attr]["min_version"] = operand
138       when "not in git"
139         git_filters[attr]["exclude_versions"] += Array.wrap(operand)
140       when "in docker", "not in docker"
141         image_hashes = Array.wrap(operand).flat_map do |search_term|
142           image_search, image_tag = search_term.split(':', 2)
143           Collection.
144             find_all_for_docker_image(image_search, image_tag, read_users).
145             map(&:portable_data_hash)
146         end
147         filters << [attr, operator.sub(/ docker$/, ""), image_hashes]
148       else
149         filters << [attr, operator, operand]
150       end
151     end
152
153     # Build a real script_version filter from any "not? in git" filters.
154     git_filters.each_pair do |attr, filter|
155       case attr
156       when "script_version"
157         script_info.each_pair do |key, value|
158           if value.nil?
159             raise ArgumentError.new("script_version filter needs #{key} filter")
160           end
161         end
162         filter["repository"] = script_info["repository"]
163         if attrs[:script_version]
164           filter["max_version"] = attrs[:script_version]
165         else
166           # Using HEAD, set earlier by the hash default, is fine.
167         end
168       when "arvados_sdk_version"
169         filter["repository"] = "arvados"
170       else
171         raise ArgumentError.new("unknown attribute for git filter: #{attr}")
172       end
173       revisions = Commit.find_commit_range(filter["repository"],
174                                            filter["min_version"],
175                                            filter["max_version"],
176                                            filter["exclude_versions"])
177       if revisions.empty?
178         raise ArgumentError.
179           new("error searching #{filter['repository']} from " +
180               "'#{filter['min_version']}' to '#{filter['max_version']}', " +
181               "excluding #{filter['exclude_versions']}")
182       end
183       filters.append([attr, "in", revisions])
184     end
185
186     filters
187   end
188
189   def self.find_reusable attrs, params, filters, read_users
190     if filters.empty?  # Translate older creation parameters into filters.
191       filters =
192         [["repository", "=", attrs[:repository]],
193          ["script", "=", attrs[:script]],
194          ["script_version", "not in git", params[:exclude_script_versions]],
195         ].reject { |filter| filter.last.nil? or filter.last.empty? }
196       if !params[:minimum_script_version].blank?
197         filters << ["script_version", "in git",
198                      params[:minimum_script_version]]
199       else
200         filters += default_git_filters("script_version", attrs[:repository],
201                                        attrs[:script_version])
202       end
203       if image_search = attrs[:runtime_constraints].andand["docker_image"]
204         if image_tag = attrs[:runtime_constraints]["docker_image_tag"]
205           image_search += ":#{image_tag}"
206         end
207         image_locator = Collection.
208           for_latest_docker_image(image_search).andand.portable_data_hash
209       else
210         image_locator = nil
211       end
212       filters << ["docker_image_locator", "=", image_locator]
213       if sdk_version = attrs[:runtime_constraints].andand["arvados_sdk_version"]
214         filters += default_git_filters("arvados_sdk_version", "arvados", sdk_version)
215       end
216       filters = load_job_specific_filters(attrs, filters, read_users)
217     end
218
219     # Check specified filters for some reasonableness.
220     filter_names = filters.map { |f| f.first }.uniq
221     ["repository", "script"].each do |req_filter|
222       if not filter_names.include?(req_filter)
223         return send_error("#{req_filter} filter required")
224       end
225     end
226
227     # Search for a reusable Job, and return it if found.
228     candidates = Job.
229       readable_by(current_user).
230       where('state = ? or (owner_uuid = ? and state in (?))',
231             Job::Complete, current_user.uuid, [Job::Queued, Job::Running]).
232       where('script_parameters_digest = ?', Job.sorted_hash_digest(attrs[:script_parameters])).
233       where('nondeterministic is distinct from ?', true).
234       order('state desc, created_at') # prefer Running jobs over Queued
235     candidates = apply_filters candidates, filters
236     chosen = nil
237     incomplete_job = nil
238     candidates.each do |j|
239       if j.state != Job::Complete
240         # We'll use this if we don't find a job that has completed
241         incomplete_job ||= j
242         next
243       end
244
245       if chosen == false
246         # We have already decided not to reuse any completed job
247         next
248       elsif chosen
249         if chosen.output != j.output
250           # If two matching jobs produced different outputs, run a new
251           # job (or use one that's already running/queued) instead of
252           # choosing one arbitrarily.
253           chosen = false
254         end
255         # ...and that's the only thing we need to do once we've chosen
256         # a job to reuse.
257       elsif !Collection.readable_by(current_user).find_by_portable_data_hash(j.output)
258         # As soon as the output we will end up returning (if any) is
259         # decided, check whether it will be visible to the user; if
260         # not, any further investigation of reusable jobs is futile.
261         chosen = false
262       else
263         chosen = j
264       end
265     end
266     chosen || incomplete_job
267   end
268
269   def self.default_git_filters(attr_name, repo_name, refspec)
270     # Add a filter to @filters for `attr_name` = the latest commit available
271     # in `repo_name` at `refspec`.  No filter is added if refspec can't be
272     # resolved.
273     commits = Commit.find_commit_range(repo_name, nil, refspec, nil)
274     if commit_hash = commits.first
275       [[attr_name, "=", commit_hash]]
276     else
277       []
278     end
279   end
280
281   protected
282
283   def self.sorted_hash_digest h
284     Digest::MD5.hexdigest(Oj.dump(deep_sort_hash(h)))
285   end
286
287   def foreign_key_attributes
288     super + %w(output log)
289   end
290
291   def skip_uuid_read_permission_check
292     super + %w(cancelled_by_client_uuid)
293   end
294
295   def skip_uuid_existence_check
296     super + %w(output log)
297   end
298
299   def set_priority
300     if self.priority.nil?
301       self.priority = 0
302     end
303     true
304   end
305
306   def ensure_script_version_is_commit
307     Job.benchmark("Job.ensure_script_version_is_commit #{self.uuid}") do
308       if state == Running
309         # Apparently client has already decided to go for it. This is
310         # needed to run a local job using a local working directory
311         # instead of a commit-ish.
312         return true
313       end
314       if new_record? or repository_changed? or script_version_changed?
315         sha1 = Commit.find_commit_range(repository,
316                                         nil, script_version, nil).first
317         if not sha1
318           errors.add :script_version, "#{script_version} does not resolve to a commit"
319           return false
320         end
321         if supplied_script_version.nil? or supplied_script_version.empty?
322           self.supplied_script_version = script_version
323         end
324         self.script_version = sha1
325       end
326     end
327     true
328   end
329
330   def tag_version_in_internal_repository
331     Job.benchmark("Job.tag_version_in_internal_repository #{self.uuid}") do
332       if state == Running
333         # No point now. See ensure_script_version_is_commit.
334         true
335       elsif errors.any?
336         # Won't be saved, and script_version might not even be valid.
337         true
338       elsif new_record? or repository_changed? or script_version_changed?
339         uuid_was = uuid
340         begin
341           assign_uuid
342           Commit.tag_in_internal_repository repository, script_version, uuid
343         rescue
344           uuid = uuid_was
345           raise
346         end
347       end
348     end
349   end
350
351   def ensure_unique_submit_id
352     if !submit_id.nil?
353       if Job.where('submit_id=?',self.submit_id).first
354         raise SubmitIdReused.new
355       end
356     end
357     true
358   end
359
360   def resolve_runtime_constraint(key, attr_sym)
361     if ((runtime_constraints.is_a? Hash) and
362         (search = runtime_constraints[key]))
363       ok, result = yield search
364     else
365       ok, result = true, nil
366     end
367     if ok
368       send("#{attr_sym}=".to_sym, result)
369     else
370       errors.add(attr_sym, result)
371     end
372     ok
373   end
374
375   def find_arvados_sdk_version
376     Job.benchmark("Job.find_arvados_sdk_version #{self.uuid}") do
377       resolve_runtime_constraint("arvados_sdk_version",
378                                  :arvados_sdk_version) do |git_search|
379         commits = Commit.find_commit_range("arvados",
380                                            nil, git_search, nil)
381         if commits.empty?
382           [false, "#{git_search} does not resolve to a commit"]
383         elsif not runtime_constraints["docker_image"]
384           [false, "cannot be specified without a Docker image constraint"]
385         else
386           [true, commits.first]
387         end
388       end
389     end
390   end
391
392   def find_docker_image_locator
393     Job.benchmark("Job.find_docker_image_locator #{self.uuid}") do
394       runtime_constraints['docker_image'] =
395         Rails.configuration.default_docker_image_for_jobs if ((runtime_constraints.is_a? Hash) and
396                                                               (runtime_constraints['docker_image']).nil? and
397                                                               Rails.configuration.default_docker_image_for_jobs)
398       resolve_runtime_constraint("docker_image",
399                                  :docker_image_locator) do |image_search|
400         image_tag = runtime_constraints['docker_image_tag']
401         if coll = Collection.for_latest_docker_image(image_search, image_tag)
402           [true, coll.portable_data_hash]
403         else
404           [false, "not found for #{image_search}"]
405         end
406       end
407     end
408   end
409
410   def permission_to_update
411     if is_locked_by_uuid_was and !(current_user and
412                                    (current_user.uuid == is_locked_by_uuid_was or
413                                     current_user.uuid == system_user.uuid))
414       if script_changed? or
415           script_parameters_changed? or
416           script_version_changed? or
417           (!cancelled_at_was.nil? and
418            (cancelled_by_client_uuid_changed? or
419             cancelled_by_user_uuid_changed? or
420             cancelled_at_changed?)) or
421           started_at_changed? or
422           finished_at_changed? or
423           running_changed? or
424           success_changed? or
425           output_changed? or
426           log_changed? or
427           tasks_summary_changed? or
428           state_changed? or
429           components_changed?
430         logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}"
431         return false
432       end
433     end
434     if !is_locked_by_uuid_changed?
435       super
436     else
437       if !current_user
438         logger.warn "Anonymous user tried to change lock on #{self.class.to_s} #{uuid_was}"
439         false
440       elsif is_locked_by_uuid_was and is_locked_by_uuid_was != current_user.uuid
441         logger.warn "User #{current_user.uuid} tried to steal lock on #{self.class.to_s} #{uuid_was} from #{is_locked_by_uuid_was}"
442         false
443       elsif !is_locked_by_uuid.nil? and is_locked_by_uuid != current_user.uuid
444         logger.warn "User #{current_user.uuid} tried to lock #{self.class.to_s} #{uuid_was} with uuid #{is_locked_by_uuid}"
445         false
446       else
447         super
448       end
449     end
450   end
451
452   def update_modified_by_fields
453     if self.cancelled_at_changed?
454       # Ensure cancelled_at cannot be set to arbitrary non-now times,
455       # or changed once it is set.
456       if self.cancelled_at and not self.cancelled_at_was
457         self.cancelled_at = db_current_time
458         self.cancelled_by_user_uuid = current_user.uuid
459         self.cancelled_by_client_uuid = current_api_client.andand.uuid
460         @need_crunch_dispatch_trigger = true
461       else
462         self.cancelled_at = self.cancelled_at_was
463         self.cancelled_by_user_uuid = self.cancelled_by_user_uuid_was
464         self.cancelled_by_client_uuid = self.cancelled_by_client_uuid_was
465       end
466     end
467     super
468   end
469
470   def trigger_crunch_dispatch_if_cancelled
471     if @need_crunch_dispatch_trigger
472       File.open(Rails.configuration.crunch_refresh_trigger, 'wb') do
473         # That's all, just create/touch a file for crunch-job to see.
474       end
475     end
476   end
477
478   def update_timestamps_when_state_changes
479     return if not (state_changed? or new_record?)
480
481     case state
482     when Running
483       self.started_at ||= db_current_time
484     when Failed, Complete
485       self.finished_at ||= db_current_time
486     when Cancelled
487       self.cancelled_at ||= db_current_time
488     end
489
490     # TODO: Remove the following case block when old "success" and
491     # "running" attrs go away. Until then, this ensures we still
492     # expose correct success/running flags to older clients, even if
493     # some new clients are writing only the new state attribute.
494     case state
495     when Queued
496       self.running = false
497       self.success = nil
498     when Running
499       self.running = true
500       self.success = nil
501     when Cancelled, Failed
502       self.running = false
503       self.success = false
504     when Complete
505       self.running = false
506       self.success = true
507     end
508     self.running ||= false # Default to false instead of nil.
509
510     @need_crunch_dispatch_trigger = true
511
512     true
513   end
514
515   def update_state_from_old_state_attrs
516     # If a client has touched the legacy state attrs, update the
517     # "state" attr to agree with the updated values of the legacy
518     # attrs.
519     #
520     # TODO: Remove this method when old "success" and "running" attrs
521     # go away.
522     if cancelled_at_changed? or
523         success_changed? or
524         running_changed? or
525         state.nil?
526       if cancelled_at
527         self.state = Cancelled
528       elsif success == false
529         self.state = Failed
530       elsif success == true
531         self.state = Complete
532       elsif running == true
533         self.state = Running
534       else
535         self.state = Queued
536       end
537     end
538     true
539   end
540
541   def validate_status
542     if self.state.in?(States)
543       true
544     else
545       errors.add :state, "#{state.inspect} must be one of: #{States.inspect}"
546       false
547     end
548   end
549
550   def validate_state_change
551     ok = true
552     if self.state_changed?
553       ok = case self.state_was
554            when nil
555              # state isn't set yet
556              true
557            when Queued
558              # Permit going from queued to any state
559              true
560            when Running
561              # From running, may only transition to a finished state
562              [Complete, Failed, Cancelled].include? self.state
563            when Complete, Failed, Cancelled
564              # Once in a finished state, don't permit any more state changes
565              false
566            else
567              # Any other state transition is also invalid
568              false
569            end
570       if not ok
571         errors.add :state, "invalid change from #{self.state_was} to #{self.state}"
572       end
573     end
574     ok
575   end
576
577   def ensure_no_collection_uuids_in_script_params
578     # recursive_hash_search searches recursively through hashes and
579     # arrays in 'thing' for string fields matching regular expression
580     # 'pattern'.  Returns true if pattern is found, false otherwise.
581     def recursive_hash_search thing, pattern
582       if thing.is_a? Hash
583         thing.each do |k, v|
584           return true if recursive_hash_search v, pattern
585         end
586       elsif thing.is_a? Array
587         thing.each do |k|
588           return true if recursive_hash_search k, pattern
589         end
590       elsif thing.is_a? String
591         return true if thing.match pattern
592       end
593       false
594     end
595
596     Job.benchmark("Job.ensure_no_collection_uuids_in_script_params #{self.uuid}") do
597       # Fail validation if any script_parameters field includes a string containing a
598       # collection uuid pattern.
599       if self.script_parameters_changed?
600         if recursive_hash_search(self.script_parameters, Collection.uuid_regex)
601           self.errors.add :script_parameters, "must use portable_data_hash instead of collection uuid"
602           return false
603         end
604       end
605     end
606     true
607   end
608 end