Merge branch '10684-crunch-run-ca-certs' closes #10684
[arvados.git] / services / api / app / models / container.rb
1 require 'whitelist_update'
2
3 class Container < ArvadosModel
4   include HasUuid
5   include KindAndEtag
6   include CommonApiTemplate
7   include WhitelistUpdate
8   extend CurrentApiClient
9
10   serialize :environment, Hash
11   serialize :mounts, Hash
12   serialize :runtime_constraints, Hash
13   serialize :command, Array
14   serialize :scheduling_parameters, Hash
15
16   before_validation :fill_field_defaults, :if => :new_record?
17   before_validation :set_timestamps
18   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
19   validate :validate_state_change
20   validate :validate_change
21   validate :validate_lock
22   validate :validate_output
23   after_validation :assign_auth
24   before_save :sort_serialized_attrs
25   after_save :handle_completed
26
27   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
28   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
29
30   api_accessible :user, extend: :common do |t|
31     t.add :command
32     t.add :container_image
33     t.add :cwd
34     t.add :environment
35     t.add :exit_code
36     t.add :finished_at
37     t.add :locked_by_uuid
38     t.add :log
39     t.add :mounts
40     t.add :output
41     t.add :output_path
42     t.add :priority
43     t.add :progress
44     t.add :runtime_constraints
45     t.add :started_at
46     t.add :state
47     t.add :auth_uuid
48     t.add :scheduling_parameters
49   end
50
51   # Supported states for a container
52   States =
53     [
54      (Queued = 'Queued'),
55      (Locked = 'Locked'),
56      (Running = 'Running'),
57      (Complete = 'Complete'),
58      (Cancelled = 'Cancelled')
59     ]
60
61   State_transitions = {
62     nil => [Queued],
63     Queued => [Locked, Cancelled],
64     Locked => [Queued, Running, Cancelled],
65     Running => [Complete, Cancelled]
66   }
67
68   def state_transitions
69     State_transitions
70   end
71
72   def update_priority!
73     if [Queued, Locked, Running].include? self.state
74       # Update the priority of this container to the maximum priority of any of
75       # its committed container requests and save the record.
76       self.priority = ContainerRequest.
77         where(container_uuid: uuid,
78               state: ContainerRequest::Committed).
79         maximum('priority')
80       self.save!
81     end
82   end
83
84   def self.find_reusable(attrs)
85     candidates = Container.
86       where('command = ?', attrs[:command].to_yaml).
87       where('cwd = ?', attrs[:cwd]).
88       where('environment = ?', self.deep_sort_hash(attrs[:environment]).to_yaml).
89       where('output_path = ?', attrs[:output_path]).
90       where('container_image = ?', attrs[:container_image]).
91       where('mounts = ?', self.deep_sort_hash(attrs[:mounts]).to_yaml).
92       where('runtime_constraints = ?', self.deep_sort_hash(attrs[:runtime_constraints]).to_yaml)
93
94     # Check for Completed candidates that had consistent outputs.
95     completed = candidates.where(state: Complete).where(exit_code: 0)
96     outputs = completed.select('output').group('output').limit(2)
97     if outputs.count.count != 1
98       Rails.logger.debug("Found #{outputs.count.length} different outputs")
99     elsif Collection.
100         readable_by(current_user).
101         where(portable_data_hash: outputs.first.output).
102         count < 1
103       Rails.logger.info("Found reusable container(s) " +
104                         "but output #{outputs.first} is not readable " +
105                         "by user #{current_user.uuid}")
106     else
107       # Return the oldest eligible container whose log is still
108       # present and readable by current_user.
109       readable_pdh = Collection.
110         readable_by(current_user).
111         select('portable_data_hash')
112       completed = completed.
113         where("log in (#{readable_pdh.to_sql})").
114         order('finished_at asc').
115         limit(1)
116       if completed.first
117         return completed.first
118       else
119         Rails.logger.info("Found reusable container(s) but none with a log " +
120                           "readable by user #{current_user.uuid}")
121       end
122     end
123
124     # Check for Running candidates and return the most likely to finish sooner.
125     running = candidates.where(state: Running).
126       order('progress desc, started_at asc').limit(1).first
127     return running if not running.nil?
128
129     # Check for Locked or Queued ones and return the most likely to start first.
130     locked_or_queued = candidates.where("state IN (?)", [Locked, Queued]).
131       order('state asc, priority desc, created_at asc').limit(1).first
132     return locked_or_queued if not locked_or_queued.nil?
133
134     # No suitable candidate found.
135     nil
136   end
137
138   def lock
139     with_lock do
140       if self.state == Locked
141         raise AlreadyLockedError
142       end
143       self.state = Locked
144       self.save!
145     end
146   end
147
148   def unlock
149     with_lock do
150       if self.state == Queued
151         raise InvalidStateTransitionError
152       end
153       self.state = Queued
154       self.save!
155     end
156   end
157
158   def self.readable_by(*users_list)
159     if users_list.select { |u| u.is_admin }.any?
160       return self
161     end
162     user_uuids = users_list.map { |u| u.uuid }
163     uuid_list = user_uuids + users_list.flat_map { |u| u.groups_i_can(:read) }
164     uuid_list.uniq!
165     permitted = "(SELECT head_uuid FROM links WHERE link_class='permission' AND tail_uuid IN (:uuids))"
166     joins(:container_requests).
167       where("container_requests.uuid IN #{permitted} OR "+
168             "container_requests.owner_uuid IN (:uuids)",
169             uuids: uuid_list)
170   end
171
172   def final?
173     [Complete, Cancelled].include?(self.state)
174   end
175
176   protected
177
178   def fill_field_defaults
179     self.state ||= Queued
180     self.environment ||= {}
181     self.runtime_constraints ||= {}
182     self.mounts ||= {}
183     self.cwd ||= "."
184     self.priority ||= 1
185     self.scheduling_parameters ||= {}
186   end
187
188   def permission_to_create
189     current_user.andand.is_admin
190   end
191
192   def permission_to_update
193     # Override base permission check to allow auth_uuid to set progress and
194     # output (only).  Whether it is legal to set progress and output in the current
195     # state has already been checked in validate_change.
196     current_user.andand.is_admin ||
197       (!current_api_client_authorization.nil? and
198        [self.auth_uuid, self.locked_by_uuid].include? current_api_client_authorization.uuid)
199   end
200
201   def ensure_owner_uuid_is_permitted
202     # Override base permission check to allow auth_uuid to set progress and
203     # output (only).  Whether it is legal to set progress and output in the current
204     # state has already been checked in validate_change.
205     if !current_api_client_authorization.nil? and self.auth_uuid == current_api_client_authorization.uuid
206       check_update_whitelist [:progress, :output]
207     else
208       super
209     end
210   end
211
212   def set_timestamps
213     if self.state_changed? and self.state == Running
214       self.started_at ||= db_current_time
215     end
216
217     if self.state_changed? and [Complete, Cancelled].include? self.state
218       self.finished_at ||= db_current_time
219     end
220   end
221
222   def validate_change
223     permitted = [:state]
224
225     if self.new_record?
226       permitted.push(:owner_uuid, :command, :container_image, :cwd,
227                      :environment, :mounts, :output_path, :priority,
228                      :runtime_constraints, :scheduling_parameters)
229     end
230
231     case self.state
232     when Queued, Locked
233       permitted.push :priority
234
235     when Running
236       permitted.push :priority, :progress, :output
237       if self.state_changed?
238         permitted.push :started_at
239       end
240
241     when Complete
242       if self.state_was == Running
243         permitted.push :finished_at, :output, :log, :exit_code
244       end
245
246     when Cancelled
247       case self.state_was
248       when Running
249         permitted.push :finished_at, :output, :log
250       when Queued, Locked
251         permitted.push :finished_at
252       end
253
254     else
255       # The state_transitions check will add an error message for this
256       return false
257     end
258
259     check_update_whitelist permitted
260   end
261
262   def validate_lock
263     if [Locked, Running].include? self.state
264       # If the Container was already locked, locked_by_uuid must not
265       # changes. Otherwise, the current auth gets the lock.
266       need_lock = locked_by_uuid_was || current_api_client_authorization.andand.uuid
267     else
268       need_lock = nil
269     end
270
271     # The caller can provide a new value for locked_by_uuid, but only
272     # if it's exactly what we expect. This allows a caller to perform
273     # an update like {"state":"Unlocked","locked_by_uuid":null}.
274     if self.locked_by_uuid_changed?
275       if self.locked_by_uuid != need_lock
276         return errors.add :locked_by_uuid, "can only change to #{need_lock}"
277       end
278     end
279     self.locked_by_uuid = need_lock
280   end
281
282   def validate_output
283     # Output must exist and be readable by the current user.  This is so
284     # that a container cannot "claim" a collection that it doesn't otherwise
285     # have access to just by setting the output field to the collection PDH.
286     if output_changed?
287       c = Collection.
288           readable_by(current_user).
289           where(portable_data_hash: self.output).
290           first
291       if !c
292         errors.add :output, "collection must exist and be readable by current user."
293       end
294     end
295   end
296
297   def assign_auth
298     if self.auth_uuid_changed?
299       return errors.add :auth_uuid, 'is readonly'
300     end
301     if not [Locked, Running].include? self.state
302       # don't need one
303       self.auth.andand.update_attributes(expires_at: db_current_time)
304       self.auth = nil
305       return
306     elsif self.auth
307       # already have one
308       return
309     end
310     cr = ContainerRequest.
311       where('container_uuid=? and priority>0', self.uuid).
312       order('priority desc').
313       first
314     if !cr
315       return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
316     end
317     self.auth = ApiClientAuthorization.
318       create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
319               api_client_id: 0)
320   end
321
322   def sort_serialized_attrs
323     if self.environment_changed?
324       self.environment = self.class.deep_sort_hash(self.environment)
325     end
326     if self.mounts_changed?
327       self.mounts = self.class.deep_sort_hash(self.mounts)
328     end
329     if self.runtime_constraints_changed?
330       self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
331     end
332     if self.scheduling_parameters_changed?
333       self.scheduling_parameters = self.class.deep_sort_hash(self.scheduling_parameters)
334     end
335   end
336
337   def handle_completed
338     # This container is finished so finalize any associated container requests
339     # that are associated with this container.
340     if self.state_changed? and self.final?
341       act_as_system_user do
342
343         if self.state == Cancelled
344           retryable_requests = ContainerRequest.where("priority > 0 and state = 'Committed' and container_count < container_count_max")
345         else
346           retryable_requests = []
347         end
348
349         if retryable_requests.any?
350           c_attrs = {
351             command: self.command,
352             cwd: self.cwd,
353             environment: self.environment,
354             output_path: self.output_path,
355             container_image: self.container_image,
356             mounts: self.mounts,
357             runtime_constraints: self.runtime_constraints,
358             scheduling_parameters: self.scheduling_parameters
359           }
360           c = Container.create! c_attrs
361           retryable_requests.each do |cr|
362             cr.with_lock do
363               # Use row locking because this increments container_count
364               cr.container_uuid = c.uuid
365               cr.save
366             end
367           end
368         end
369
370         # Notify container requests associated with this container
371         ContainerRequest.where(container_uuid: uuid,
372                                state: ContainerRequest::Committed).each do |cr|
373           cr.finalize!
374         end
375
376         # Try to cancel any outstanding container requests made by this container.
377         ContainerRequest.where(requesting_container_uuid: uuid,
378                                state: ContainerRequest::Committed).each do |cr|
379           cr.priority = 0
380           cr.save
381         end
382       end
383     end
384   end
385
386 end