Merge branch '11629-groups-contents-memory'
[arvados.git] / services / api / app / models / job.rb
1 require 'safe_json'
2
3 class Job < ArvadosModel
4   include HasUuid
5   include KindAndEtag
6   include CommonApiTemplate
7   extend CurrentApiClient
8   serialize :components, Hash
9   attr_protected :arvados_sdk_version, :docker_image_locator
10   serialize :script_parameters, Hash
11   serialize :runtime_constraints, Hash
12   serialize :tasks_summary, Hash
13   before_create :ensure_unique_submit_id
14   after_commit :trigger_crunch_dispatch_if_cancelled, :on => :update
15   before_validation :set_priority
16   before_validation :update_state_from_old_state_attrs
17   before_validation :update_script_parameters_digest
18   validate :ensure_script_version_is_commit
19   validate :find_docker_image_locator
20   validate :find_arvados_sdk_version
21   validate :validate_status
22   validate :validate_state_change
23   validate :ensure_no_collection_uuids_in_script_params
24   before_save :tag_version_in_internal_repository
25   before_save :update_timestamps_when_state_changes
26
27   has_many :commit_ancestors, :foreign_key => :descendant, :primary_key => :script_version
28   has_many(:nodes, foreign_key: :job_uuid, primary_key: :uuid)
29
30   class SubmitIdReused < StandardError
31   end
32
33   api_accessible :user, extend: :common do |t|
34     t.add :submit_id
35     t.add :priority
36     t.add :script
37     t.add :script_parameters
38     t.add :script_version
39     t.add :cancelled_at
40     t.add :cancelled_by_client_uuid
41     t.add :cancelled_by_user_uuid
42     t.add :started_at
43     t.add :finished_at
44     t.add :output
45     t.add :success
46     t.add :running
47     t.add :state
48     t.add :is_locked_by_uuid
49     t.add :log
50     t.add :runtime_constraints
51     t.add :tasks_summary
52     t.add :nondeterministic
53     t.add :repository
54     t.add :supplied_script_version
55     t.add :arvados_sdk_version
56     t.add :docker_image_locator
57     t.add :queue_position
58     t.add :node_uuids
59     t.add :description
60     t.add :components
61   end
62
63   # Supported states for a job
64   States = [
65             (Queued = 'Queued'),
66             (Running = 'Running'),
67             (Cancelled = 'Cancelled'),
68             (Failed = 'Failed'),
69             (Complete = 'Complete'),
70            ]
71
72   after_initialize do
73     @need_crunch_dispatch_trigger = false
74   end
75
76   def self.limit_index_columns_read
77     ["components"]
78   end
79
80   def assert_finished
81     update_attributes(finished_at: finished_at || db_current_time,
82                       success: success.nil? ? false : success,
83                       running: false)
84   end
85
86   def node_uuids
87     nodes.map(&:uuid)
88   end
89
90   def self.queue
91     self.where('state = ?', Queued).order('priority desc, created_at')
92   end
93
94   def queue_position
95     # We used to report this accurately, but the implementation made queue
96     # API requests O(n**2) for the size of the queue.  See #8800.
97     # We've soft-disabled it because it's not clear we even want this
98     # functionality: now that we have Node Manager with support for multiple
99     # node sizes, "queue position" tells you very little about when a job will
100     # run.
101     state == Queued ? 0 : nil
102   end
103
104   def self.running
105     self.where('running = ?', true).
106       order('priority desc, created_at')
107   end
108
109   def lock locked_by_uuid
110     with_lock do
111       unless self.state == Queued and self.is_locked_by_uuid.nil?
112         raise AlreadyLockedError
113       end
114       self.state = Running
115       self.is_locked_by_uuid = locked_by_uuid
116       self.save!
117     end
118   end
119
120   def update_script_parameters_digest
121     self.script_parameters_digest = self.class.sorted_hash_digest(script_parameters)
122   end
123
124   def self.searchable_columns operator
125     super - ["script_parameters_digest"]
126   end
127
128   def self.full_text_searchable_columns
129     super - ["script_parameters_digest"]
130   end
131
132   def self.load_job_specific_filters attrs, orig_filters, read_users
133     # Convert Job-specific @filters entries into general SQL filters.
134     script_info = {"repository" => nil, "script" => nil}
135     git_filters = Hash.new do |hash, key|
136       hash[key] = {"max_version" => "HEAD", "exclude_versions" => []}
137     end
138     filters = []
139     orig_filters.each do |attr, operator, operand|
140       if (script_info.has_key? attr) and (operator == "=")
141         if script_info[attr].nil?
142           script_info[attr] = operand
143         elsif script_info[attr] != operand
144           raise ArgumentError.new("incompatible #{attr} filters")
145         end
146       end
147       case operator
148       when "in git"
149         git_filters[attr]["min_version"] = operand
150       when "not in git"
151         git_filters[attr]["exclude_versions"] += Array.wrap(operand)
152       when "in docker", "not in docker"
153         image_hashes = Array.wrap(operand).flat_map do |search_term|
154           image_search, image_tag = search_term.split(':', 2)
155           Collection.
156             find_all_for_docker_image(image_search, image_tag, read_users, filter_compatible_format: false).
157             map(&:portable_data_hash)
158         end
159         filters << [attr, operator.sub(/ docker$/, ""), image_hashes]
160       else
161         filters << [attr, operator, operand]
162       end
163     end
164
165     # Build a real script_version filter from any "not? in git" filters.
166     git_filters.each_pair do |attr, filter|
167       case attr
168       when "script_version"
169         script_info.each_pair do |key, value|
170           if value.nil?
171             raise ArgumentError.new("script_version filter needs #{key} filter")
172           end
173         end
174         filter["repository"] = script_info["repository"]
175         if attrs[:script_version]
176           filter["max_version"] = attrs[:script_version]
177         else
178           # Using HEAD, set earlier by the hash default, is fine.
179         end
180       when "arvados_sdk_version"
181         filter["repository"] = "arvados"
182       else
183         raise ArgumentError.new("unknown attribute for git filter: #{attr}")
184       end
185       revisions = Commit.find_commit_range(filter["repository"],
186                                            filter["min_version"],
187                                            filter["max_version"],
188                                            filter["exclude_versions"])
189       if revisions.empty?
190         raise ArgumentError.
191           new("error searching #{filter['repository']} from " +
192               "'#{filter['min_version']}' to '#{filter['max_version']}', " +
193               "excluding #{filter['exclude_versions']}")
194       end
195       filters.append([attr, "in", revisions])
196     end
197
198     filters
199   end
200
201   def self.find_reusable attrs, params, filters, read_users
202     if filters.empty?  # Translate older creation parameters into filters.
203       filters =
204         [["repository", "=", attrs[:repository]],
205          ["script", "=", attrs[:script]],
206          ["script_version", "not in git", params[:exclude_script_versions]],
207         ].reject { |filter| filter.last.nil? or filter.last.empty? }
208       if !params[:minimum_script_version].blank?
209         filters << ["script_version", "in git",
210                      params[:minimum_script_version]]
211       else
212         filters += default_git_filters("script_version", attrs[:repository],
213                                        attrs[:script_version])
214       end
215       if image_search = attrs[:runtime_constraints].andand["docker_image"]
216         if image_tag = attrs[:runtime_constraints]["docker_image_tag"]
217           image_search += ":#{image_tag}"
218         end
219         image_locator = Collection.
220           for_latest_docker_image(image_search).andand.portable_data_hash
221       else
222         image_locator = nil
223       end
224       filters << ["docker_image_locator", "=", image_locator]
225       if sdk_version = attrs[:runtime_constraints].andand["arvados_sdk_version"]
226         filters += default_git_filters("arvados_sdk_version", "arvados", sdk_version)
227       end
228       filters = load_job_specific_filters(attrs, filters, read_users)
229     end
230
231     # Check specified filters for some reasonableness.
232     filter_names = filters.map { |f| f.first }.uniq
233     ["repository", "script"].each do |req_filter|
234       if not filter_names.include?(req_filter)
235         return send_error("#{req_filter} filter required")
236       end
237     end
238
239     # Search for a reusable Job, and return it if found.
240     candidates = Job.
241       readable_by(current_user).
242       where('state = ? or (owner_uuid = ? and state in (?))',
243             Job::Complete, current_user.uuid, [Job::Queued, Job::Running]).
244       where('script_parameters_digest = ?', Job.sorted_hash_digest(attrs[:script_parameters])).
245       where('nondeterministic is distinct from ?', true).
246       order('state desc, created_at') # prefer Running jobs over Queued
247     candidates = apply_filters candidates, filters
248     chosen = nil
249     incomplete_job = nil
250     candidates.each do |j|
251       if j.state != Job::Complete
252         # We'll use this if we don't find a job that has completed
253         incomplete_job ||= j
254         next
255       end
256
257       if chosen == false
258         # We have already decided not to reuse any completed job
259         next
260       elsif chosen
261         if chosen.output != j.output
262           # If two matching jobs produced different outputs, run a new
263           # job (or use one that's already running/queued) instead of
264           # choosing one arbitrarily.
265           chosen = false
266         end
267         # ...and that's the only thing we need to do once we've chosen
268         # a job to reuse.
269       elsif !Collection.readable_by(current_user).find_by_portable_data_hash(j.output)
270         # As soon as the output we will end up returning (if any) is
271         # decided, check whether it will be visible to the user; if
272         # not, any further investigation of reusable jobs is futile.
273         chosen = false
274       else
275         chosen = j
276       end
277     end
278     chosen || incomplete_job
279   end
280
281   def self.default_git_filters(attr_name, repo_name, refspec)
282     # Add a filter to @filters for `attr_name` = the latest commit available
283     # in `repo_name` at `refspec`.  No filter is added if refspec can't be
284     # resolved.
285     commits = Commit.find_commit_range(repo_name, nil, refspec, nil)
286     if commit_hash = commits.first
287       [[attr_name, "=", commit_hash]]
288     else
289       []
290     end
291   end
292
293   def cancel(cascade: false, need_transaction: true)
294     if need_transaction
295       ActiveRecord::Base.transaction do
296         cancel(cascade: cascade, need_transaction: false)
297       end
298       return
299     end
300
301     if self.state.in?([Queued, Running])
302       self.state = Cancelled
303       self.save!
304     elsif self.state != Cancelled
305       raise InvalidStateTransitionError
306     end
307
308     return if !cascade
309
310     # cancel all children; they could be jobs or pipeline instances
311     children = self.components.andand.collect{|_, u| u}.compact
312
313     return if children.empty?
314
315     # cancel any child jobs
316     Job.where(uuid: children, state: [Queued, Running]).each do |job|
317       job.cancel(cascade: cascade, need_transaction: false)
318     end
319
320     # cancel any child pipelines
321     PipelineInstance.where(uuid: children, state: [PipelineInstance::RunningOnServer, PipelineInstance::RunningOnClient]).each do |pi|
322       pi.cancel(cascade: cascade, need_transaction: false)
323     end
324   end
325
326   protected
327
328   def self.sorted_hash_digest h
329     Digest::MD5.hexdigest(Oj.dump(deep_sort_hash(h)))
330   end
331
332   def foreign_key_attributes
333     super + %w(output log)
334   end
335
336   def skip_uuid_read_permission_check
337     super + %w(cancelled_by_client_uuid)
338   end
339
340   def skip_uuid_existence_check
341     super + %w(output log)
342   end
343
344   def set_priority
345     if self.priority.nil?
346       self.priority = 0
347     end
348     true
349   end
350
351   def ensure_script_version_is_commit
352     if state == Running
353       # Apparently client has already decided to go for it. This is
354       # needed to run a local job using a local working directory
355       # instead of a commit-ish.
356       return true
357     end
358     if new_record? or repository_changed? or script_version_changed?
359       sha1 = Commit.find_commit_range(repository,
360                                       nil, script_version, nil).first
361       if not sha1
362         errors.add :script_version, "#{script_version} does not resolve to a commit"
363         return false
364       end
365       if supplied_script_version.nil? or supplied_script_version.empty?
366         self.supplied_script_version = script_version
367       end
368       self.script_version = sha1
369     end
370     true
371   end
372
373   def tag_version_in_internal_repository
374     if state == Running
375       # No point now. See ensure_script_version_is_commit.
376       true
377     elsif errors.any?
378       # Won't be saved, and script_version might not even be valid.
379       true
380     elsif new_record? or repository_changed? or script_version_changed?
381       uuid_was = uuid
382       begin
383         assign_uuid
384         Commit.tag_in_internal_repository repository, script_version, uuid
385       rescue
386         self.uuid = uuid_was
387         raise
388       end
389     end
390   end
391
392   def ensure_unique_submit_id
393     if !submit_id.nil?
394       if Job.where('submit_id=?',self.submit_id).first
395         raise SubmitIdReused.new
396       end
397     end
398     true
399   end
400
401   def resolve_runtime_constraint(key, attr_sym)
402     if ((runtime_constraints.is_a? Hash) and
403         (search = runtime_constraints[key]))
404       ok, result = yield search
405     else
406       ok, result = true, nil
407     end
408     if ok
409       send("#{attr_sym}=".to_sym, result)
410     else
411       errors.add(attr_sym, result)
412     end
413     ok
414   end
415
416   def find_arvados_sdk_version
417     resolve_runtime_constraint("arvados_sdk_version",
418                                :arvados_sdk_version) do |git_search|
419       commits = Commit.find_commit_range("arvados",
420                                          nil, git_search, nil)
421       if commits.empty?
422         [false, "#{git_search} does not resolve to a commit"]
423       elsif not runtime_constraints["docker_image"]
424         [false, "cannot be specified without a Docker image constraint"]
425       else
426         [true, commits.first]
427       end
428     end
429   end
430
431   def find_docker_image_locator
432     if runtime_constraints.is_a? Hash
433       runtime_constraints['docker_image'] ||=
434         Rails.configuration.default_docker_image_for_jobs
435     end
436
437     resolve_runtime_constraint("docker_image",
438                                :docker_image_locator) do |image_search|
439       image_tag = runtime_constraints['docker_image_tag']
440       if coll = Collection.for_latest_docker_image(image_search, image_tag)
441         [true, coll.portable_data_hash]
442       else
443         [false, "not found for #{image_search}"]
444       end
445     end
446   end
447
448   def permission_to_update
449     if is_locked_by_uuid_was and !(current_user and
450                                    (current_user.uuid == is_locked_by_uuid_was or
451                                     current_user.uuid == system_user.uuid))
452       if script_changed? or
453           script_parameters_changed? or
454           script_version_changed? or
455           (!cancelled_at_was.nil? and
456            (cancelled_by_client_uuid_changed? or
457             cancelled_by_user_uuid_changed? or
458             cancelled_at_changed?)) or
459           started_at_changed? or
460           finished_at_changed? or
461           running_changed? or
462           success_changed? or
463           output_changed? or
464           log_changed? or
465           tasks_summary_changed? or
466           (state_changed? && state != Cancelled) or
467           components_changed?
468         logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}"
469         return false
470       end
471     end
472     if !is_locked_by_uuid_changed?
473       super
474     else
475       if !current_user
476         logger.warn "Anonymous user tried to change lock on #{self.class.to_s} #{uuid_was}"
477         false
478       elsif is_locked_by_uuid_was and is_locked_by_uuid_was != current_user.uuid
479         logger.warn "User #{current_user.uuid} tried to steal lock on #{self.class.to_s} #{uuid_was} from #{is_locked_by_uuid_was}"
480         false
481       elsif !is_locked_by_uuid.nil? and is_locked_by_uuid != current_user.uuid
482         logger.warn "User #{current_user.uuid} tried to lock #{self.class.to_s} #{uuid_was} with uuid #{is_locked_by_uuid}"
483         false
484       else
485         super
486       end
487     end
488   end
489
490   def update_modified_by_fields
491     if self.cancelled_at_changed?
492       # Ensure cancelled_at cannot be set to arbitrary non-now times,
493       # or changed once it is set.
494       if self.cancelled_at and not self.cancelled_at_was
495         self.cancelled_at = db_current_time
496         self.cancelled_by_user_uuid = current_user.uuid
497         self.cancelled_by_client_uuid = current_api_client.andand.uuid
498         @need_crunch_dispatch_trigger = true
499       else
500         self.cancelled_at = self.cancelled_at_was
501         self.cancelled_by_user_uuid = self.cancelled_by_user_uuid_was
502         self.cancelled_by_client_uuid = self.cancelled_by_client_uuid_was
503       end
504     end
505     super
506   end
507
508   def trigger_crunch_dispatch_if_cancelled
509     if @need_crunch_dispatch_trigger
510       File.open(Rails.configuration.crunch_refresh_trigger, 'wb') do
511         # That's all, just create/touch a file for crunch-job to see.
512       end
513     end
514   end
515
516   def update_timestamps_when_state_changes
517     return if not (state_changed? or new_record?)
518
519     case state
520     when Running
521       self.started_at ||= db_current_time
522     when Failed, Complete
523       self.finished_at ||= db_current_time
524     when Cancelled
525       self.cancelled_at ||= db_current_time
526     end
527
528     # TODO: Remove the following case block when old "success" and
529     # "running" attrs go away. Until then, this ensures we still
530     # expose correct success/running flags to older clients, even if
531     # some new clients are writing only the new state attribute.
532     case state
533     when Queued
534       self.running = false
535       self.success = nil
536     when Running
537       self.running = true
538       self.success = nil
539     when Cancelled, Failed
540       self.running = false
541       self.success = false
542     when Complete
543       self.running = false
544       self.success = true
545     end
546     self.running ||= false # Default to false instead of nil.
547
548     @need_crunch_dispatch_trigger = true
549
550     true
551   end
552
553   def update_state_from_old_state_attrs
554     # If a client has touched the legacy state attrs, update the
555     # "state" attr to agree with the updated values of the legacy
556     # attrs.
557     #
558     # TODO: Remove this method when old "success" and "running" attrs
559     # go away.
560     if cancelled_at_changed? or
561         success_changed? or
562         running_changed? or
563         state.nil?
564       if cancelled_at
565         self.state = Cancelled
566       elsif success == false
567         self.state = Failed
568       elsif success == true
569         self.state = Complete
570       elsif running == true
571         self.state = Running
572       else
573         self.state = Queued
574       end
575     end
576     true
577   end
578
579   def validate_status
580     if self.state.in?(States)
581       true
582     else
583       errors.add :state, "#{state.inspect} must be one of: #{States.inspect}"
584       false
585     end
586   end
587
588   def validate_state_change
589     ok = true
590     if self.state_changed?
591       ok = case self.state_was
592            when nil
593              # state isn't set yet
594              true
595            when Queued
596              # Permit going from queued to any state
597              true
598            when Running
599              # From running, may only transition to a finished state
600              [Complete, Failed, Cancelled].include? self.state
601            when Complete, Failed, Cancelled
602              # Once in a finished state, don't permit any more state changes
603              false
604            else
605              # Any other state transition is also invalid
606              false
607            end
608       if not ok
609         errors.add :state, "invalid change from #{self.state_was} to #{self.state}"
610       end
611     end
612     ok
613   end
614
615   def ensure_no_collection_uuids_in_script_params
616     # Fail validation if any script_parameters field includes a string containing a
617     # collection uuid pattern.
618     if self.script_parameters_changed?
619       if recursive_hash_search(self.script_parameters, Collection.uuid_regex)
620         self.errors.add :script_parameters, "must use portable_data_hash instead of collection uuid"
621         return false
622       end
623     end
624     true
625   end
626
627   # recursive_hash_search searches recursively through hashes and
628   # arrays in 'thing' for string fields matching regular expression
629   # 'pattern'.  Returns true if pattern is found, false otherwise.
630   def recursive_hash_search thing, pattern
631     if thing.is_a? Hash
632       thing.each do |k, v|
633         return true if recursive_hash_search v, pattern
634       end
635     elsif thing.is_a? Array
636       thing.each do |k|
637         return true if recursive_hash_search k, pattern
638       end
639     elsif thing.is_a? String
640       return true if thing.match pattern
641     end
642     false
643   end
644 end