Merge branch '10194-jobs-image-versioning' refs #10194
[arvados.git] / services / api / app / models / container.rb
1 require 'whitelist_update'
2
3 class Container < ArvadosModel
4   include HasUuid
5   include KindAndEtag
6   include CommonApiTemplate
7   include WhitelistUpdate
8   extend CurrentApiClient
9
10   serialize :environment, Hash
11   serialize :mounts, Hash
12   serialize :runtime_constraints, Hash
13   serialize :command, Array
14
15   before_validation :fill_field_defaults, :if => :new_record?
16   before_validation :set_timestamps
17   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
18   validate :validate_state_change
19   validate :validate_change
20   validate :validate_lock
21   after_validation :assign_auth
22   before_save :sort_serialized_attrs
23   after_save :handle_completed
24
25   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
26   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
27
28   api_accessible :user, extend: :common do |t|
29     t.add :command
30     t.add :container_image
31     t.add :cwd
32     t.add :environment
33     t.add :exit_code
34     t.add :finished_at
35     t.add :locked_by_uuid
36     t.add :log
37     t.add :mounts
38     t.add :output
39     t.add :output_path
40     t.add :priority
41     t.add :progress
42     t.add :runtime_constraints
43     t.add :started_at
44     t.add :state
45     t.add :auth_uuid
46   end
47
48   # Supported states for a container
49   States =
50     [
51      (Queued = 'Queued'),
52      (Locked = 'Locked'),
53      (Running = 'Running'),
54      (Complete = 'Complete'),
55      (Cancelled = 'Cancelled')
56     ]
57
58   State_transitions = {
59     nil => [Queued],
60     Queued => [Locked, Cancelled],
61     Locked => [Queued, Running, Cancelled],
62     Running => [Complete, Cancelled]
63   }
64
65   def state_transitions
66     State_transitions
67   end
68
69   def update_priority!
70     if [Queued, Locked, Running].include? self.state
71       # Update the priority of this container to the maximum priority of any of
72       # its committed container requests and save the record.
73       self.priority = ContainerRequest.
74         where(container_uuid: uuid,
75               state: ContainerRequest::Committed).
76         maximum('priority')
77       self.save!
78     end
79   end
80
81   def self.find_reusable(attrs)
82     candidates = Container.
83       where('command = ?', attrs[:command].to_yaml).
84       where('cwd = ?', attrs[:cwd]).
85       where('environment = ?', self.deep_sort_hash(attrs[:environment]).to_yaml).
86       where('output_path = ?', attrs[:output_path]).
87       where('container_image = ?', attrs[:container_image]).
88       where('mounts = ?', self.deep_sort_hash(attrs[:mounts]).to_yaml).
89       where('runtime_constraints = ?', self.deep_sort_hash(attrs[:runtime_constraints]).to_yaml)
90
91     # Check for Completed candidates that had consistent outputs.
92     completed = candidates.where(state: Complete).where(exit_code: 0)
93     outputs = completed.select('output').group('output').limit(2)
94     if outputs.count.count != 1
95       Rails.logger.debug("Found #{outputs.count.length} different outputs")
96     elsif Collection.
97         readable_by(current_user).
98         where(portable_data_hash: outputs.first.output).
99         count < 1
100       Rails.logger.info("Found reusable container(s) " +
101                         "but output #{outputs.first} is not readable " +
102                         "by user #{current_user.uuid}")
103     else
104       # Return the oldest eligible container whose log is still
105       # present and readable by current_user.
106       readable_pdh = Collection.
107         readable_by(current_user).
108         select('portable_data_hash')
109       completed = completed.
110         where("log in (#{readable_pdh.to_sql})").
111         order('finished_at asc').
112         limit(1)
113       if completed.first
114         return completed.first
115       else
116         Rails.logger.info("Found reusable container(s) but none with a log " +
117                           "readable by user #{current_user.uuid}")
118       end
119     end
120
121     # Check for Running candidates and return the most likely to finish sooner.
122     running = candidates.where(state: Running).
123       order('progress desc, started_at asc').limit(1).first
124     return running if not running.nil?
125
126     # Check for Locked or Queued ones and return the most likely to start first.
127     locked_or_queued = candidates.where("state IN (?)", [Locked, Queued]).
128       order('state asc, priority desc, created_at asc').limit(1).first
129     return locked_or_queued if not locked_or_queued.nil?
130
131     # No suitable candidate found.
132     nil
133   end
134
135   def lock
136     with_lock do
137       if self.state == Locked
138         raise AlreadyLockedError
139       end
140       self.state = Locked
141       self.save!
142     end
143   end
144
145   def unlock
146     with_lock do
147       if self.state == Queued
148         raise InvalidStateTransitionError
149       end
150       self.state = Queued
151       self.save!
152     end
153   end
154
155   def self.readable_by(*users_list)
156     if users_list.select { |u| u.is_admin }.any?
157       return self
158     end
159     user_uuids = users_list.map { |u| u.uuid }
160     uuid_list = user_uuids + users_list.flat_map { |u| u.groups_i_can(:read) }
161     uuid_list.uniq!
162     permitted = "(SELECT head_uuid FROM links WHERE link_class='permission' AND tail_uuid IN (:uuids))"
163     joins(:container_requests).
164       where("container_requests.uuid IN #{permitted} OR "+
165             "container_requests.owner_uuid IN (:uuids)",
166             uuids: uuid_list)
167   end
168
169   def final?
170     [Complete, Cancelled].include?(self.state)
171   end
172
173   protected
174
175   def fill_field_defaults
176     self.state ||= Queued
177     self.environment ||= {}
178     self.runtime_constraints ||= {}
179     self.mounts ||= {}
180     self.cwd ||= "."
181     self.priority ||= 1
182   end
183
184   def permission_to_create
185     current_user.andand.is_admin
186   end
187
188   def permission_to_update
189     current_user.andand.is_admin
190   end
191
192   def set_timestamps
193     if self.state_changed? and self.state == Running
194       self.started_at ||= db_current_time
195     end
196
197     if self.state_changed? and [Complete, Cancelled].include? self.state
198       self.finished_at ||= db_current_time
199     end
200   end
201
202   def validate_change
203     permitted = [:state]
204
205     if self.new_record?
206       permitted.push(:owner_uuid, :command, :container_image, :cwd,
207                      :environment, :mounts, :output_path, :priority,
208                      :runtime_constraints)
209     end
210
211     case self.state
212     when Queued, Locked
213       permitted.push :priority
214
215     when Running
216       permitted.push :priority, :progress
217       if self.state_changed?
218         permitted.push :started_at
219       end
220
221     when Complete
222       if self.state_was == Running
223         permitted.push :finished_at, :output, :log, :exit_code
224       end
225
226     when Cancelled
227       case self.state_was
228       when Running
229         permitted.push :finished_at, :output, :log
230       when Queued, Locked
231         permitted.push :finished_at
232       end
233
234     else
235       # The state_transitions check will add an error message for this
236       return false
237     end
238
239     check_update_whitelist permitted
240   end
241
242   def validate_lock
243     # If the Container is already locked by someone other than the
244     # current api_client_auth, disallow all changes -- except
245     # priority, which needs to change to reflect max(priority) of
246     # relevant ContainerRequests.
247     if locked_by_uuid_was
248       if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
249         check_update_whitelist [:priority]
250       end
251     end
252
253     if [Locked, Running].include? self.state
254       # If the Container was already locked, locked_by_uuid must not
255       # changes. Otherwise, the current auth gets the lock.
256       need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid
257     else
258       need_lock = nil
259     end
260
261     # The caller can provide a new value for locked_by_uuid, but only
262     # if it's exactly what we expect. This allows a caller to perform
263     # an update like {"state":"Unlocked","locked_by_uuid":null}.
264     if self.locked_by_uuid_changed?
265       if self.locked_by_uuid != need_lock
266         return errors.add :locked_by_uuid, "can only change to #{need_lock}"
267       end
268     end
269     self.locked_by_uuid = need_lock
270   end
271
272   def assign_auth
273     if self.auth_uuid_changed?
274       return errors.add :auth_uuid, 'is readonly'
275     end
276     if not [Locked, Running].include? self.state
277       # don't need one
278       self.auth.andand.update_attributes(expires_at: db_current_time)
279       self.auth = nil
280       return
281     elsif self.auth
282       # already have one
283       return
284     end
285     cr = ContainerRequest.
286       where('container_uuid=? and priority>0', self.uuid).
287       order('priority desc').
288       first
289     if !cr
290       return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
291     end
292     self.auth = ApiClientAuthorization.
293       create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
294               api_client_id: 0)
295   end
296
297   def sort_serialized_attrs
298     if self.environment_changed?
299       self.environment = self.class.deep_sort_hash(self.environment)
300     end
301     if self.mounts_changed?
302       self.mounts = self.class.deep_sort_hash(self.mounts)
303     end
304     if self.runtime_constraints_changed?
305       self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
306     end
307   end
308
309   def handle_completed
310     # This container is finished so finalize any associated container requests
311     # that are associated with this container.
312     if self.state_changed? and self.final?
313       act_as_system_user do
314
315         if self.state == Cancelled
316           retryable_requests = ContainerRequest.where("priority > 0 and state = 'Committed' and container_count < container_count_max")
317         else
318           retryable_requests = []
319         end
320
321         if retryable_requests.any?
322           c_attrs = {
323             command: self.command,
324             cwd: self.cwd,
325             environment: self.environment,
326             output_path: self.output_path,
327             container_image: self.container_image,
328             mounts: self.mounts,
329             runtime_constraints: self.runtime_constraints
330           }
331           c = Container.create! c_attrs
332           retryable_requests.each do |cr|
333             cr.with_lock do
334               # Use row locking because this increments container_count
335               cr.container_uuid = c.uuid
336               cr.save
337             end
338           end
339         end
340
341         # Notify container requests associated with this container
342         ContainerRequest.where(container_uuid: uuid,
343                                state: ContainerRequest::Committed).each do |cr|
344           cr.finalize!
345         end
346
347         # Try to cancel any outstanding container requests made by this container.
348         ContainerRequest.where(requesting_container_uuid: uuid,
349                                state: ContainerRequest::Committed).each do |cr|
350           cr.priority = 0
351           cr.save
352         end
353       end
354     end
355   end
356
357 end