Merge branch '11138-docker-load-fail'
[arvados.git] / services / api / app / models / container.rb
1 require 'whitelist_update'
2 require 'safe_json'
3
4 class Container < ArvadosModel
5   include HasUuid
6   include KindAndEtag
7   include CommonApiTemplate
8   include WhitelistUpdate
9   extend CurrentApiClient
10
11   serialize :environment, Hash
12   serialize :mounts, Hash
13   serialize :runtime_constraints, Hash
14   serialize :command, Array
15   serialize :scheduling_parameters, Hash
16
17   before_validation :fill_field_defaults, :if => :new_record?
18   before_validation :set_timestamps
19   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
20   validate :validate_state_change
21   validate :validate_change
22   validate :validate_lock
23   validate :validate_output
24   after_validation :assign_auth
25   before_save :sort_serialized_attrs
26   after_save :handle_completed
27
28   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
29   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
30
31   api_accessible :user, extend: :common do |t|
32     t.add :command
33     t.add :container_image
34     t.add :cwd
35     t.add :environment
36     t.add :exit_code
37     t.add :finished_at
38     t.add :locked_by_uuid
39     t.add :log
40     t.add :mounts
41     t.add :output
42     t.add :output_path
43     t.add :priority
44     t.add :progress
45     t.add :runtime_constraints
46     t.add :started_at
47     t.add :state
48     t.add :auth_uuid
49     t.add :scheduling_parameters
50   end
51
52   # Supported states for a container
53   States =
54     [
55      (Queued = 'Queued'),
56      (Locked = 'Locked'),
57      (Running = 'Running'),
58      (Complete = 'Complete'),
59      (Cancelled = 'Cancelled')
60     ]
61
62   State_transitions = {
63     nil => [Queued],
64     Queued => [Locked, Cancelled],
65     Locked => [Queued, Running, Cancelled],
66     Running => [Complete, Cancelled]
67   }
68
69   def state_transitions
70     State_transitions
71   end
72
73   def update_priority!
74     if [Queued, Locked, Running].include? self.state
75       # Update the priority of this container to the maximum priority of any of
76       # its committed container requests and save the record.
77       self.priority = ContainerRequest.
78         where(container_uuid: uuid,
79               state: ContainerRequest::Committed).
80         maximum('priority')
81       self.save!
82     end
83   end
84
85   def self.find_reusable(attrs)
86     candidates = Container.
87       where_serialized(:command, attrs[:command]).
88       where('cwd = ?', attrs[:cwd]).
89       where_serialized(:environment, attrs[:environment]).
90       where('output_path = ?', attrs[:output_path]).
91       where('container_image = ?', attrs[:container_image]).
92       where_serialized(:mounts, attrs[:mounts]).
93       where_serialized(:runtime_constraints, attrs[:runtime_constraints])
94
95     # Check for Completed candidates whose output and log are both readable.
96     select_readable_pdh = Collection.
97       readable_by(current_user).
98       select(:portable_data_hash).
99       to_sql
100     usable = candidates.
101       where(state: Complete).
102       where(exit_code: 0).
103       where("log IN (#{select_readable_pdh})").
104       where("output IN (#{select_readable_pdh})").
105       order('finished_at ASC').
106       limit(1).
107       first
108     return usable if usable
109
110     # Check for Running candidates and return the most likely to finish sooner.
111     running = candidates.where(state: Running).
112       order('progress desc, started_at asc').limit(1).first
113     return running if not running.nil?
114
115     # Check for Locked or Queued ones and return the most likely to start first.
116     locked_or_queued = candidates.where("state IN (?)", [Locked, Queued]).
117       order('state asc, priority desc, created_at asc').limit(1).first
118     return locked_or_queued if not locked_or_queued.nil?
119
120     # No suitable candidate found.
121     nil
122   end
123
124   def lock
125     with_lock do
126       if self.state == Locked
127         raise AlreadyLockedError
128       end
129       self.state = Locked
130       self.save!
131     end
132   end
133
134   def unlock
135     with_lock do
136       if self.state == Queued
137         raise InvalidStateTransitionError
138       end
139       self.state = Queued
140       self.save!
141     end
142   end
143
144   def self.readable_by(*users_list)
145     if users_list.select { |u| u.is_admin }.any?
146       return self
147     end
148     user_uuids = users_list.map { |u| u.uuid }
149     uuid_list = user_uuids + users_list.flat_map { |u| u.groups_i_can(:read) }
150     uuid_list.uniq!
151     permitted = "(SELECT head_uuid FROM links WHERE link_class='permission' AND tail_uuid IN (:uuids))"
152     joins(:container_requests).
153       where("container_requests.uuid IN #{permitted} OR "+
154             "container_requests.owner_uuid IN (:uuids)",
155             uuids: uuid_list)
156   end
157
158   def final?
159     [Complete, Cancelled].include?(self.state)
160   end
161
162   protected
163
164   def fill_field_defaults
165     self.state ||= Queued
166     self.environment ||= {}
167     self.runtime_constraints ||= {}
168     self.mounts ||= {}
169     self.cwd ||= "."
170     self.priority ||= 1
171     self.scheduling_parameters ||= {}
172   end
173
174   def permission_to_create
175     current_user.andand.is_admin
176   end
177
178   def permission_to_update
179     # Override base permission check to allow auth_uuid to set progress and
180     # output (only).  Whether it is legal to set progress and output in the current
181     # state has already been checked in validate_change.
182     current_user.andand.is_admin ||
183       (!current_api_client_authorization.nil? and
184        [self.auth_uuid, self.locked_by_uuid].include? current_api_client_authorization.uuid)
185   end
186
187   def ensure_owner_uuid_is_permitted
188     # Override base permission check to allow auth_uuid to set progress and
189     # output (only).  Whether it is legal to set progress and output in the current
190     # state has already been checked in validate_change.
191     if !current_api_client_authorization.nil? and self.auth_uuid == current_api_client_authorization.uuid
192       check_update_whitelist [:progress, :output]
193     else
194       super
195     end
196   end
197
198   def set_timestamps
199     if self.state_changed? and self.state == Running
200       self.started_at ||= db_current_time
201     end
202
203     if self.state_changed? and [Complete, Cancelled].include? self.state
204       self.finished_at ||= db_current_time
205     end
206   end
207
208   def validate_change
209     permitted = [:state]
210
211     if self.new_record?
212       permitted.push(:owner_uuid, :command, :container_image, :cwd,
213                      :environment, :mounts, :output_path, :priority,
214                      :runtime_constraints, :scheduling_parameters)
215     end
216
217     case self.state
218     when Queued, Locked
219       permitted.push :priority
220
221     when Running
222       permitted.push :priority, :progress, :output
223       if self.state_changed?
224         permitted.push :started_at
225       end
226
227     when Complete
228       if self.state_was == Running
229         permitted.push :finished_at, :output, :log, :exit_code
230       end
231
232     when Cancelled
233       case self.state_was
234       when Running
235         permitted.push :finished_at, :output, :log
236       when Queued, Locked
237         permitted.push :finished_at
238       end
239
240     else
241       # The state_transitions check will add an error message for this
242       return false
243     end
244
245     check_update_whitelist permitted
246   end
247
248   def validate_lock
249     if [Locked, Running].include? self.state
250       # If the Container was already locked, locked_by_uuid must not
251       # changes. Otherwise, the current auth gets the lock.
252       need_lock = locked_by_uuid_was || current_api_client_authorization.andand.uuid
253     else
254       need_lock = nil
255     end
256
257     # The caller can provide a new value for locked_by_uuid, but only
258     # if it's exactly what we expect. This allows a caller to perform
259     # an update like {"state":"Unlocked","locked_by_uuid":null}.
260     if self.locked_by_uuid_changed?
261       if self.locked_by_uuid != need_lock
262         return errors.add :locked_by_uuid, "can only change to #{need_lock}"
263       end
264     end
265     self.locked_by_uuid = need_lock
266   end
267
268   def validate_output
269     # Output must exist and be readable by the current user.  This is so
270     # that a container cannot "claim" a collection that it doesn't otherwise
271     # have access to just by setting the output field to the collection PDH.
272     if output_changed?
273       c = Collection.unscoped do
274         Collection.
275             readable_by(current_user).
276             where(portable_data_hash: self.output).
277             first
278       end
279       if !c
280         errors.add :output, "collection must exist and be readable by current user."
281       end
282     end
283   end
284
285   def assign_auth
286     if self.auth_uuid_changed?
287       return errors.add :auth_uuid, 'is readonly'
288     end
289     if not [Locked, Running].include? self.state
290       # don't need one
291       self.auth.andand.update_attributes(expires_at: db_current_time)
292       self.auth = nil
293       return
294     elsif self.auth
295       # already have one
296       return
297     end
298     cr = ContainerRequest.
299       where('container_uuid=? and priority>0', self.uuid).
300       order('priority desc').
301       first
302     if !cr
303       return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
304     end
305     self.auth = ApiClientAuthorization.
306       create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
307               api_client_id: 0)
308   end
309
310   def sort_serialized_attrs
311     if self.environment_changed?
312       self.environment = self.class.deep_sort_hash(self.environment)
313     end
314     if self.mounts_changed?
315       self.mounts = self.class.deep_sort_hash(self.mounts)
316     end
317     if self.runtime_constraints_changed?
318       self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
319     end
320     if self.scheduling_parameters_changed?
321       self.scheduling_parameters = self.class.deep_sort_hash(self.scheduling_parameters)
322     end
323   end
324
325   def handle_completed
326     # This container is finished so finalize any associated container requests
327     # that are associated with this container.
328     if self.state_changed? and self.final?
329       act_as_system_user do
330
331         if self.state == Cancelled
332           retryable_requests = ContainerRequest.where("container_uuid = ? and priority > 0 and state = 'Committed' and container_count < container_count_max", uuid)
333         else
334           retryable_requests = []
335         end
336
337         if retryable_requests.any?
338           c_attrs = {
339             command: self.command,
340             cwd: self.cwd,
341             environment: self.environment,
342             output_path: self.output_path,
343             container_image: self.container_image,
344             mounts: self.mounts,
345             runtime_constraints: self.runtime_constraints,
346             scheduling_parameters: self.scheduling_parameters
347           }
348           c = Container.create! c_attrs
349           retryable_requests.each do |cr|
350             cr.with_lock do
351               # Use row locking because this increments container_count
352               cr.container_uuid = c.uuid
353               cr.save
354             end
355           end
356         end
357
358         # Notify container requests associated with this container
359         ContainerRequest.where(container_uuid: uuid,
360                                state: ContainerRequest::Committed).each do |cr|
361           cr.finalize!
362         end
363
364         # Try to cancel any outstanding container requests made by this container.
365         ContainerRequest.where(requesting_container_uuid: uuid,
366                                state: ContainerRequest::Committed).each do |cr|
367           cr.priority = 0
368           cr.save
369         end
370       end
371     end
372   end
373
374 end