Merge branch 'master' into 10979-cancelled-job-nodes
[arvados.git] / services / api / app / models / container.rb
1 require 'whitelist_update'
2
3 class Container < ArvadosModel
4   include HasUuid
5   include KindAndEtag
6   include CommonApiTemplate
7   include WhitelistUpdate
8   extend CurrentApiClient
9
10   serialize :environment, Hash
11   serialize :mounts, Hash
12   serialize :runtime_constraints, Hash
13   serialize :command, Array
14   serialize :scheduling_parameters, Hash
15
16   before_validation :fill_field_defaults, :if => :new_record?
17   before_validation :set_timestamps
18   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
19   validate :validate_state_change
20   validate :validate_change
21   validate :validate_lock
22   validate :validate_output
23   after_validation :assign_auth
24   before_save :sort_serialized_attrs
25   after_save :handle_completed
26
27   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
28   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
29
30   api_accessible :user, extend: :common do |t|
31     t.add :command
32     t.add :container_image
33     t.add :cwd
34     t.add :environment
35     t.add :exit_code
36     t.add :finished_at
37     t.add :locked_by_uuid
38     t.add :log
39     t.add :mounts
40     t.add :output
41     t.add :output_path
42     t.add :priority
43     t.add :progress
44     t.add :runtime_constraints
45     t.add :started_at
46     t.add :state
47     t.add :auth_uuid
48     t.add :scheduling_parameters
49   end
50
51   # Supported states for a container
52   States =
53     [
54      (Queued = 'Queued'),
55      (Locked = 'Locked'),
56      (Running = 'Running'),
57      (Complete = 'Complete'),
58      (Cancelled = 'Cancelled')
59     ]
60
61   State_transitions = {
62     nil => [Queued],
63     Queued => [Locked, Cancelled],
64     Locked => [Queued, Running, Cancelled],
65     Running => [Complete, Cancelled]
66   }
67
68   def state_transitions
69     State_transitions
70   end
71
72   def update_priority!
73     if [Queued, Locked, Running].include? self.state
74       # Update the priority of this container to the maximum priority of any of
75       # its committed container requests and save the record.
76       self.priority = ContainerRequest.
77         where(container_uuid: uuid,
78               state: ContainerRequest::Committed).
79         maximum('priority')
80       self.save!
81     end
82   end
83
84   def self.find_reusable(attrs)
85     candidates = Container.
86       where('command = ?', attrs[:command].to_yaml).
87       where('cwd = ?', attrs[:cwd]).
88       where('environment = ?', self.deep_sort_hash(attrs[:environment]).to_yaml).
89       where('output_path = ?', attrs[:output_path]).
90       where('container_image = ?', attrs[:container_image]).
91       where('mounts = ?', self.deep_sort_hash(attrs[:mounts]).to_yaml).
92       where('runtime_constraints = ?', self.deep_sort_hash(attrs[:runtime_constraints]).to_yaml)
93
94     # Check for Completed candidates whose output and log are both readable.
95     select_readable_pdh = Collection.
96       readable_by(current_user).
97       select(:portable_data_hash).
98       to_sql
99     usable = candidates.
100       where(state: Complete).
101       where(exit_code: 0).
102       where("log IN (#{select_readable_pdh})").
103       where("output IN (#{select_readable_pdh})").
104       order('finished_at ASC').
105       limit(1).
106       first
107     return usable if usable
108
109     # Check for Running candidates and return the most likely to finish sooner.
110     running = candidates.where(state: Running).
111       order('progress desc, started_at asc').limit(1).first
112     return running if not running.nil?
113
114     # Check for Locked or Queued ones and return the most likely to start first.
115     locked_or_queued = candidates.where("state IN (?)", [Locked, Queued]).
116       order('state asc, priority desc, created_at asc').limit(1).first
117     return locked_or_queued if not locked_or_queued.nil?
118
119     # No suitable candidate found.
120     nil
121   end
122
123   def lock
124     with_lock do
125       if self.state == Locked
126         raise AlreadyLockedError
127       end
128       self.state = Locked
129       self.save!
130     end
131   end
132
133   def unlock
134     with_lock do
135       if self.state == Queued
136         raise InvalidStateTransitionError
137       end
138       self.state = Queued
139       self.save!
140     end
141   end
142
143   def self.readable_by(*users_list)
144     if users_list.select { |u| u.is_admin }.any?
145       return self
146     end
147     user_uuids = users_list.map { |u| u.uuid }
148     uuid_list = user_uuids + users_list.flat_map { |u| u.groups_i_can(:read) }
149     uuid_list.uniq!
150     permitted = "(SELECT head_uuid FROM links WHERE link_class='permission' AND tail_uuid IN (:uuids))"
151     joins(:container_requests).
152       where("container_requests.uuid IN #{permitted} OR "+
153             "container_requests.owner_uuid IN (:uuids)",
154             uuids: uuid_list)
155   end
156
157   def final?
158     [Complete, Cancelled].include?(self.state)
159   end
160
161   protected
162
163   def fill_field_defaults
164     self.state ||= Queued
165     self.environment ||= {}
166     self.runtime_constraints ||= {}
167     self.mounts ||= {}
168     self.cwd ||= "."
169     self.priority ||= 1
170     self.scheduling_parameters ||= {}
171   end
172
173   def permission_to_create
174     current_user.andand.is_admin
175   end
176
177   def permission_to_update
178     # Override base permission check to allow auth_uuid to set progress and
179     # output (only).  Whether it is legal to set progress and output in the current
180     # state has already been checked in validate_change.
181     current_user.andand.is_admin ||
182       (!current_api_client_authorization.nil? and
183        [self.auth_uuid, self.locked_by_uuid].include? current_api_client_authorization.uuid)
184   end
185
186   def ensure_owner_uuid_is_permitted
187     # Override base permission check to allow auth_uuid to set progress and
188     # output (only).  Whether it is legal to set progress and output in the current
189     # state has already been checked in validate_change.
190     if !current_api_client_authorization.nil? and self.auth_uuid == current_api_client_authorization.uuid
191       check_update_whitelist [:progress, :output]
192     else
193       super
194     end
195   end
196
197   def set_timestamps
198     if self.state_changed? and self.state == Running
199       self.started_at ||= db_current_time
200     end
201
202     if self.state_changed? and [Complete, Cancelled].include? self.state
203       self.finished_at ||= db_current_time
204     end
205   end
206
207   def validate_change
208     permitted = [:state]
209
210     if self.new_record?
211       permitted.push(:owner_uuid, :command, :container_image, :cwd,
212                      :environment, :mounts, :output_path, :priority,
213                      :runtime_constraints, :scheduling_parameters)
214     end
215
216     case self.state
217     when Queued, Locked
218       permitted.push :priority
219
220     when Running
221       permitted.push :priority, :progress, :output
222       if self.state_changed?
223         permitted.push :started_at
224       end
225
226     when Complete
227       if self.state_was == Running
228         permitted.push :finished_at, :output, :log, :exit_code
229       end
230
231     when Cancelled
232       case self.state_was
233       when Running
234         permitted.push :finished_at, :output, :log
235       when Queued, Locked
236         permitted.push :finished_at
237       end
238
239     else
240       # The state_transitions check will add an error message for this
241       return false
242     end
243
244     check_update_whitelist permitted
245   end
246
247   def validate_lock
248     if [Locked, Running].include? self.state
249       # If the Container was already locked, locked_by_uuid must not
250       # changes. Otherwise, the current auth gets the lock.
251       need_lock = locked_by_uuid_was || current_api_client_authorization.andand.uuid
252     else
253       need_lock = nil
254     end
255
256     # The caller can provide a new value for locked_by_uuid, but only
257     # if it's exactly what we expect. This allows a caller to perform
258     # an update like {"state":"Unlocked","locked_by_uuid":null}.
259     if self.locked_by_uuid_changed?
260       if self.locked_by_uuid != need_lock
261         return errors.add :locked_by_uuid, "can only change to #{need_lock}"
262       end
263     end
264     self.locked_by_uuid = need_lock
265   end
266
267   def validate_output
268     # Output must exist and be readable by the current user.  This is so
269     # that a container cannot "claim" a collection that it doesn't otherwise
270     # have access to just by setting the output field to the collection PDH.
271     if output_changed?
272       c = Collection.
273           readable_by(current_user).
274           where(portable_data_hash: self.output).
275           first
276       if !c
277         errors.add :output, "collection must exist and be readable by current user."
278       end
279     end
280   end
281
282   def assign_auth
283     if self.auth_uuid_changed?
284       return errors.add :auth_uuid, 'is readonly'
285     end
286     if not [Locked, Running].include? self.state
287       # don't need one
288       self.auth.andand.update_attributes(expires_at: db_current_time)
289       self.auth = nil
290       return
291     elsif self.auth
292       # already have one
293       return
294     end
295     cr = ContainerRequest.
296       where('container_uuid=? and priority>0', self.uuid).
297       order('priority desc').
298       first
299     if !cr
300       return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
301     end
302     self.auth = ApiClientAuthorization.
303       create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
304               api_client_id: 0)
305   end
306
307   def sort_serialized_attrs
308     if self.environment_changed?
309       self.environment = self.class.deep_sort_hash(self.environment)
310     end
311     if self.mounts_changed?
312       self.mounts = self.class.deep_sort_hash(self.mounts)
313     end
314     if self.runtime_constraints_changed?
315       self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
316     end
317     if self.scheduling_parameters_changed?
318       self.scheduling_parameters = self.class.deep_sort_hash(self.scheduling_parameters)
319     end
320   end
321
322   def handle_completed
323     # This container is finished so finalize any associated container requests
324     # that are associated with this container.
325     if self.state_changed? and self.final?
326       act_as_system_user do
327
328         if self.state == Cancelled
329           retryable_requests = ContainerRequest.where("container_uuid = ? and priority > 0 and state = 'Committed' and container_count < container_count_max", uuid)
330         else
331           retryable_requests = []
332         end
333
334         if retryable_requests.any?
335           c_attrs = {
336             command: self.command,
337             cwd: self.cwd,
338             environment: self.environment,
339             output_path: self.output_path,
340             container_image: self.container_image,
341             mounts: self.mounts,
342             runtime_constraints: self.runtime_constraints,
343             scheduling_parameters: self.scheduling_parameters
344           }
345           c = Container.create! c_attrs
346           retryable_requests.each do |cr|
347             cr.with_lock do
348               # Use row locking because this increments container_count
349               cr.container_uuid = c.uuid
350               cr.save
351             end
352           end
353         end
354
355         # Notify container requests associated with this container
356         ContainerRequest.where(container_uuid: uuid,
357                                state: ContainerRequest::Committed).each do |cr|
358           cr.finalize!
359         end
360
361         # Try to cancel any outstanding container requests made by this container.
362         ContainerRequest.where(requesting_container_uuid: uuid,
363                                state: ContainerRequest::Committed).each do |cr|
364           cr.priority = 0
365           cr.save
366         end
367       end
368     end
369   end
370
371 end