Merge branch '13493-document-federation' refs #13493
[arvados.git] / services / api / app / models / container.rb
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 require 'log_reuse_info'
6 require 'whitelist_update'
7 require 'safe_json'
8 require 'update_priority'
9
10 class Container < ArvadosModel
11   include ArvadosModelUpdates
12   include HasUuid
13   include KindAndEtag
14   include CommonApiTemplate
15   include WhitelistUpdate
16   extend CurrentApiClient
17   extend DbCurrentTime
18   extend LogReuseInfo
19
20   serialize :environment, Hash
21   serialize :mounts, Hash
22   serialize :runtime_constraints, Hash
23   serialize :command, Array
24   serialize :scheduling_parameters, Hash
25   serialize :secret_mounts, Hash
26   serialize :runtime_status, Hash
27
28   before_validation :fill_field_defaults, :if => :new_record?
29   before_validation :set_timestamps
30   validates :command, :container_image, :output_path, :cwd, :priority, { presence: true }
31   validates :priority, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
32   validate :validate_runtime_status
33   validate :validate_state_change
34   validate :validate_change
35   validate :validate_lock
36   validate :validate_output
37   after_validation :assign_auth
38   before_save :sort_serialized_attrs
39   before_save :update_secret_mounts_md5
40   before_save :scrub_secret_mounts
41   before_save :clear_runtime_status_when_queued
42   after_save :handle_completed
43   after_save :propagate_priority
44   after_commit { UpdatePriority.run_update_thread }
45
46   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
47   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
48
49   api_accessible :user, extend: :common do |t|
50     t.add :command
51     t.add :container_image
52     t.add :cwd
53     t.add :environment
54     t.add :exit_code
55     t.add :finished_at
56     t.add :locked_by_uuid
57     t.add :log
58     t.add :mounts
59     t.add :output
60     t.add :output_path
61     t.add :priority
62     t.add :progress
63     t.add :runtime_constraints
64     t.add :runtime_status
65     t.add :started_at
66     t.add :state
67     t.add :auth_uuid
68     t.add :scheduling_parameters
69   end
70
71   # Supported states for a container
72   States =
73     [
74      (Queued = 'Queued'),
75      (Locked = 'Locked'),
76      (Running = 'Running'),
77      (Complete = 'Complete'),
78      (Cancelled = 'Cancelled')
79     ]
80
81   State_transitions = {
82     nil => [Queued],
83     Queued => [Locked, Cancelled],
84     Locked => [Queued, Running, Cancelled],
85     Running => [Complete, Cancelled]
86   }
87
88   def self.limit_index_columns_read
89     ["mounts"]
90   end
91
92   def self.full_text_searchable_columns
93     super - ["secret_mounts", "secret_mounts_md5"]
94   end
95
96   def self.searchable_columns *args
97     super - ["secret_mounts_md5"]
98   end
99
100   def logged_attributes
101     super.except('secret_mounts')
102   end
103
104   def state_transitions
105     State_transitions
106   end
107
108   # Container priority is the highest "computed priority" of any
109   # matching request. The computed priority of a container-submitted
110   # request is the priority of the submitting container. The computed
111   # priority of a user-submitted request is a function of
112   # user-assigned priority and request creation time.
113   def update_priority!
114     return if ![Queued, Locked, Running].include?(state)
115     p = ContainerRequest.
116         where('container_uuid=? and priority>0', uuid).
117         includes(:requesting_container).
118         lock(true).
119         map do |cr|
120       if cr.requesting_container
121         cr.requesting_container.priority
122       else
123         (cr.priority << 50) - (cr.created_at.to_time.to_f * 1000).to_i
124       end
125     end.max || 0
126     update_attributes!(priority: p)
127   end
128
129   def propagate_priority
130     return true unless priority_changed?
131     act_as_system_user do
132       # Update the priority of child container requests to match new
133       # priority of the parent container (ignoring requests with no
134       # container assigned, because their priority doesn't matter).
135       ContainerRequest.
136         where(requesting_container_uuid: self.uuid,
137               state: ContainerRequest::Committed).
138         where('container_uuid is not null').
139         includes(:container).
140         map(&:container).
141         map(&:update_priority!)
142     end
143   end
144
145   # Create a new container (or find an existing one) to satisfy the
146   # given container request.
147   def self.resolve(req)
148     c_attrs = {
149       command: req.command,
150       cwd: req.cwd,
151       environment: req.environment,
152       output_path: req.output_path,
153       container_image: resolve_container_image(req.container_image),
154       mounts: resolve_mounts(req.mounts),
155       runtime_constraints: resolve_runtime_constraints(req.runtime_constraints),
156       scheduling_parameters: req.scheduling_parameters,
157       secret_mounts: req.secret_mounts,
158     }
159     act_as_system_user do
160       if req.use_existing && (reusable = find_reusable(c_attrs))
161         reusable
162       else
163         Container.create!(c_attrs)
164       end
165     end
166   end
167
168   # Return a runtime_constraints hash that complies with requested but
169   # is suitable for saving in a container record, i.e., has specific
170   # values instead of ranges.
171   #
172   # Doing this as a step separate from other resolutions, like "git
173   # revision range to commit hash", makes sense only when there is no
174   # opportunity to reuse an existing container (e.g., container reuse
175   # is not implemented yet, or we have already found that no existing
176   # containers are suitable).
177   def self.resolve_runtime_constraints(runtime_constraints)
178     rc = {}
179     defaults = {
180       'keep_cache_ram' =>
181       Rails.configuration.container_default_keep_cache_ram,
182     }
183     defaults.merge(runtime_constraints).each do |k, v|
184       if v.is_a? Array
185         rc[k] = v[0]
186       else
187         rc[k] = v
188       end
189     end
190     rc
191   end
192
193   # Return a mounts hash suitable for a Container, i.e., with every
194   # readonly collection UUID resolved to a PDH.
195   def self.resolve_mounts(mounts)
196     c_mounts = {}
197     mounts.each do |k, mount|
198       mount = mount.dup
199       c_mounts[k] = mount
200       if mount['kind'] != 'collection'
201         next
202       end
203       if (uuid = mount.delete 'uuid')
204         c = Collection.
205           readable_by(current_user).
206           where(uuid: uuid).
207           select(:portable_data_hash).
208           first
209         if !c
210           raise ArvadosModel::UnresolvableContainerError.new "cannot mount collection #{uuid.inspect}: not found"
211         end
212         if mount['portable_data_hash'].nil?
213           # PDH not supplied by client
214           mount['portable_data_hash'] = c.portable_data_hash
215         elsif mount['portable_data_hash'] != c.portable_data_hash
216           # UUID and PDH supplied by client, but they don't agree
217           raise ArgumentError.new "cannot mount collection #{uuid.inspect}: current portable_data_hash #{c.portable_data_hash.inspect} does not match #{c['portable_data_hash'].inspect} in request"
218         end
219       end
220     end
221     return c_mounts
222   end
223
224   # Return a container_image PDH suitable for a Container.
225   def self.resolve_container_image(container_image)
226     coll = Collection.for_latest_docker_image(container_image)
227     if !coll
228       raise ArvadosModel::UnresolvableContainerError.new "docker image #{container_image.inspect} not found"
229     end
230     coll.portable_data_hash
231   end
232
233   def self.find_reusable(attrs)
234     log_reuse_info { "starting with #{Container.all.count} container records in database" }
235     candidates = Container.where_serialized(:command, attrs[:command], md5: true)
236     log_reuse_info(candidates) { "after filtering on command #{attrs[:command].inspect}" }
237
238     candidates = candidates.where('cwd = ?', attrs[:cwd])
239     log_reuse_info(candidates) { "after filtering on cwd #{attrs[:cwd].inspect}" }
240
241     candidates = candidates.where_serialized(:environment, attrs[:environment], md5: true)
242     log_reuse_info(candidates) { "after filtering on environment #{attrs[:environment].inspect}" }
243
244     candidates = candidates.where('output_path = ?', attrs[:output_path])
245     log_reuse_info(candidates) { "after filtering on output_path #{attrs[:output_path].inspect}" }
246
247     image = resolve_container_image(attrs[:container_image])
248     candidates = candidates.where('container_image = ?', image)
249     log_reuse_info(candidates) { "after filtering on container_image #{image.inspect} (resolved from #{attrs[:container_image].inspect})" }
250
251     candidates = candidates.where_serialized(:mounts, resolve_mounts(attrs[:mounts]), md5: true)
252     log_reuse_info(candidates) { "after filtering on mounts #{attrs[:mounts].inspect}" }
253
254     secret_mounts_md5 = Digest::MD5.hexdigest(SafeJSON.dump(self.deep_sort_hash(attrs[:secret_mounts])))
255     candidates = candidates.where('secret_mounts_md5 = ?', secret_mounts_md5)
256     log_reuse_info(candidates) { "after filtering on secret_mounts_md5 #{secret_mounts_md5.inspect}" }
257
258     candidates = candidates.where_serialized(:runtime_constraints, resolve_runtime_constraints(attrs[:runtime_constraints]), md5: true)
259     log_reuse_info(candidates) { "after filtering on runtime_constraints #{attrs[:runtime_constraints].inspect}" }
260
261     log_reuse_info { "checking for state=Complete with readable output and log..." }
262
263     select_readable_pdh = Collection.
264       readable_by(current_user).
265       select(:portable_data_hash).
266       to_sql
267
268     usable = candidates.where(state: Complete, exit_code: 0)
269     log_reuse_info(usable) { "with state=Complete, exit_code=0" }
270
271     usable = usable.where("log IN (#{select_readable_pdh})")
272     log_reuse_info(usable) { "with readable log" }
273
274     usable = usable.where("output IN (#{select_readable_pdh})")
275     log_reuse_info(usable) { "with readable output" }
276
277     usable = usable.order('finished_at ASC').limit(1).first
278     if usable
279       log_reuse_info { "done, reusing container #{usable.uuid} with state=Complete" }
280       return usable
281     end
282
283     # Check for non-failing Running candidates and return the most likely to finish sooner.
284     log_reuse_info { "checking for state=Running..." }
285     running = candidates.where(state: Running).
286               where("(runtime_status->'error') is null").
287               order('progress desc, started_at asc').
288               limit(1).first
289     if running
290       log_reuse_info { "done, reusing container #{running.uuid} with state=Running" }
291       return running
292     else
293       log_reuse_info { "have no containers in Running state" }
294     end
295
296     # Check for Locked or Queued ones and return the most likely to start first.
297     locked_or_queued = candidates.
298                        where("state IN (?)", [Locked, Queued]).
299                        order('state asc, priority desc, created_at asc').
300                        limit(1).first
301     if locked_or_queued
302       log_reuse_info { "done, reusing container #{locked_or_queued.uuid} with state=#{locked_or_queued.state}" }
303       return locked_or_queued
304     else
305       log_reuse_info { "have no containers in Locked or Queued state" }
306     end
307
308     log_reuse_info { "done, no reusable container found" }
309     nil
310   end
311
312   def check_lock_fail
313     if self.state != Queued
314       raise LockFailedError.new("cannot lock when #{self.state}")
315     elsif self.priority <= 0
316       raise LockFailedError.new("cannot lock when priority<=0")
317     end
318   end
319
320   def lock
321     # Check invalid state transitions once before getting the lock
322     # (because it's cheaper that way) and once after getting the lock
323     # (because state might have changed while acquiring the lock).
324     check_lock_fail
325     transaction do
326       reload
327       check_lock_fail
328       update_attributes!(state: Locked)
329     end
330   end
331
332   def check_unlock_fail
333     if self.state != Locked
334       raise InvalidStateTransitionError.new("cannot unlock when #{self.state}")
335     elsif self.locked_by_uuid != current_api_client_authorization.uuid
336       raise InvalidStateTransitionError.new("locked by a different token")
337     end
338   end
339
340   def unlock
341     # Check invalid state transitions twice (see lock)
342     check_unlock_fail
343     transaction do
344       reload(lock: 'FOR UPDATE')
345       check_unlock_fail
346       update_attributes!(state: Queued)
347     end
348   end
349
350   def self.readable_by(*users_list)
351     # Load optional keyword arguments, if they exist.
352     if users_list.last.is_a? Hash
353       kwargs = users_list.pop
354     else
355       kwargs = {}
356     end
357     Container.where(ContainerRequest.readable_by(*users_list).where("containers.uuid = container_requests.container_uuid").exists)
358   end
359
360   def final?
361     [Complete, Cancelled].include?(self.state)
362   end
363
364   protected
365
366   def fill_field_defaults
367     self.state ||= Queued
368     self.environment ||= {}
369     self.runtime_constraints ||= {}
370     self.mounts ||= {}
371     self.cwd ||= "."
372     self.priority ||= 0
373     self.scheduling_parameters ||= {}
374   end
375
376   def permission_to_create
377     current_user.andand.is_admin
378   end
379
380   def permission_to_update
381     # Override base permission check to allow auth_uuid to set progress and
382     # output (only).  Whether it is legal to set progress and output in the current
383     # state has already been checked in validate_change.
384     current_user.andand.is_admin ||
385       (!current_api_client_authorization.nil? and
386        [self.auth_uuid, self.locked_by_uuid].include? current_api_client_authorization.uuid)
387   end
388
389   def ensure_owner_uuid_is_permitted
390     # Override base permission check to allow auth_uuid to set progress and
391     # output (only).  Whether it is legal to set progress and output in the current
392     # state has already been checked in validate_change.
393     if !current_api_client_authorization.nil? and self.auth_uuid == current_api_client_authorization.uuid
394       check_update_whitelist [:progress, :output]
395     else
396       super
397     end
398   end
399
400   def set_timestamps
401     if self.state_changed? and self.state == Running
402       self.started_at ||= db_current_time
403     end
404
405     if self.state_changed? and [Complete, Cancelled].include? self.state
406       self.finished_at ||= db_current_time
407     end
408   end
409
410   # Check that well-known runtime status keys have desired data types
411   def validate_runtime_status
412     [
413       'error', 'errorDetail', 'warning', 'warningDetail', 'activity'
414     ].each do |k|
415       if self.runtime_status.andand.include?(k) && !self.runtime_status[k].is_a?(String)
416         errors.add(:runtime_status, "'#{k}' value must be a string")
417       end
418     end
419   end
420
421   def validate_change
422     permitted = [:state]
423
424     if self.new_record?
425       permitted.push(:owner_uuid, :command, :container_image, :cwd,
426                      :environment, :mounts, :output_path, :priority,
427                      :runtime_constraints, :scheduling_parameters,
428                      :secret_mounts)
429     end
430
431     case self.state
432     when Locked
433       permitted.push :priority, :runtime_status
434
435     when Queued
436       permitted.push :priority
437
438     when Running
439       permitted.push :priority, :progress, :output, :runtime_status
440       if self.state_changed?
441         permitted.push :started_at
442       end
443
444     when Complete
445       if self.state_was == Running
446         permitted.push :finished_at, :output, :log, :exit_code
447       end
448
449     when Cancelled
450       case self.state_was
451       when Running
452         permitted.push :finished_at, :output, :log
453       when Queued, Locked
454         permitted.push :finished_at, :log
455       end
456
457     else
458       # The state_transitions check will add an error message for this
459       return false
460     end
461
462     check_update_whitelist permitted
463   end
464
465   def validate_lock
466     if [Locked, Running].include? self.state
467       # If the Container was already locked, locked_by_uuid must not
468       # changes. Otherwise, the current auth gets the lock.
469       need_lock = locked_by_uuid_was || current_api_client_authorization.andand.uuid
470     else
471       need_lock = nil
472     end
473
474     # The caller can provide a new value for locked_by_uuid, but only
475     # if it's exactly what we expect. This allows a caller to perform
476     # an update like {"state":"Unlocked","locked_by_uuid":null}.
477     if self.locked_by_uuid_changed?
478       if self.locked_by_uuid != need_lock
479         return errors.add :locked_by_uuid, "can only change to #{need_lock}"
480       end
481     end
482     self.locked_by_uuid = need_lock
483   end
484
485   def validate_output
486     # Output must exist and be readable by the current user.  This is so
487     # that a container cannot "claim" a collection that it doesn't otherwise
488     # have access to just by setting the output field to the collection PDH.
489     if output_changed?
490       c = Collection.
491             readable_by(current_user, {include_trash: true}).
492             where(portable_data_hash: self.output).
493             first
494       if !c
495         errors.add :output, "collection must exist and be readable by current user."
496       end
497     end
498   end
499
500   def assign_auth
501     if self.auth_uuid_changed?
502       return errors.add :auth_uuid, 'is readonly'
503     end
504     if not [Locked, Running].include? self.state
505       # don't need one
506       self.auth.andand.update_attributes(expires_at: db_current_time)
507       self.auth = nil
508       return
509     elsif self.auth
510       # already have one
511       return
512     end
513     cr = ContainerRequest.
514       where('container_uuid=? and priority>0', self.uuid).
515       order('priority desc').
516       first
517     if !cr
518       return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
519     end
520     self.auth = ApiClientAuthorization.
521       create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
522               api_client_id: 0)
523   end
524
525   def sort_serialized_attrs
526     if self.environment_changed?
527       self.environment = self.class.deep_sort_hash(self.environment)
528     end
529     if self.mounts_changed?
530       self.mounts = self.class.deep_sort_hash(self.mounts)
531     end
532     if self.runtime_constraints_changed?
533       self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
534     end
535     if self.scheduling_parameters_changed?
536       self.scheduling_parameters = self.class.deep_sort_hash(self.scheduling_parameters)
537     end
538   end
539
540   def update_secret_mounts_md5
541     if self.secret_mounts_changed?
542       self.secret_mounts_md5 = Digest::MD5.hexdigest(
543         SafeJSON.dump(self.class.deep_sort_hash(self.secret_mounts)))
544     end
545   end
546
547   def scrub_secret_mounts
548     # this runs after update_secret_mounts_md5, so the
549     # secret_mounts_md5 will still reflect the secrets that are being
550     # scrubbed here.
551     if self.state_changed? && self.final?
552       self.secret_mounts = {}
553     end
554   end
555
556   def clear_runtime_status_when_queued
557     # Avoid leaking status messages between different dispatch attempts
558     if self.state_was == Locked && self.state == Queued
559       self.runtime_status = {}
560     end
561   end
562
563   def handle_completed
564     # This container is finished so finalize any associated container requests
565     # that are associated with this container.
566     if self.state_changed? and self.final?
567       act_as_system_user do
568
569         if self.state == Cancelled
570           retryable_requests = ContainerRequest.where("container_uuid = ? and priority > 0 and state = 'Committed' and container_count < container_count_max", uuid)
571         else
572           retryable_requests = []
573         end
574
575         if retryable_requests.any?
576           c_attrs = {
577             command: self.command,
578             cwd: self.cwd,
579             environment: self.environment,
580             output_path: self.output_path,
581             container_image: self.container_image,
582             mounts: self.mounts,
583             runtime_constraints: self.runtime_constraints,
584             scheduling_parameters: self.scheduling_parameters
585           }
586           c = Container.create! c_attrs
587           retryable_requests.each do |cr|
588             cr.with_lock do
589               leave_modified_by_user_alone do
590                 # Use row locking because this increments container_count
591                 cr.container_uuid = c.uuid
592                 cr.save!
593               end
594             end
595           end
596         end
597
598         # Notify container requests associated with this container
599         ContainerRequest.where(container_uuid: uuid,
600                                state: ContainerRequest::Committed).each do |cr|
601           leave_modified_by_user_alone do
602             cr.finalize!
603           end
604         end
605
606         # Cancel outstanding container requests made by this container.
607         ContainerRequest.
608           includes(:container).
609           where(requesting_container_uuid: uuid,
610                 state: ContainerRequest::Committed).each do |cr|
611           leave_modified_by_user_alone do
612             cr.update_attributes!(priority: 0)
613             cr.container.reload
614             if cr.container.state == Container::Queued || cr.container.state == Container::Locked
615               # If the child container hasn't started yet, finalize the
616               # child CR now instead of leaving it "on hold", i.e.,
617               # Queued with priority 0.  (OTOH, if the child is already
618               # running, leave it alone so it can get cancelled the
619               # usual way, get a copy of the log collection, etc.)
620               cr.update_attributes!(state: ContainerRequest::Final)
621             end
622           end
623         end
624       end
625     end
626   end
627 end