10172: Tests and related fixes for auth_uuid setting output on container.
[arvados.git] / services / api / app / models / container.rb
1 require 'whitelist_update'
2
3 class Container < ArvadosModel
4   include HasUuid
5   include KindAndEtag
6   include CommonApiTemplate
7   include WhitelistUpdate
8   extend CurrentApiClient
9
10   serialize :environment, Hash
11   serialize :mounts, Hash
12   serialize :runtime_constraints, Hash
13   serialize :command, Array
14
15   before_validation :fill_field_defaults, :if => :new_record?
16   before_validation :set_timestamps
17   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
18   validate :validate_state_change
19   validate :validate_change
20   validate :validate_lock
21   validate :validate_output
22   after_validation :assign_auth
23   before_save :sort_serialized_attrs
24   after_save :handle_completed
25
26   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
27   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
28
29   api_accessible :user, extend: :common do |t|
30     t.add :command
31     t.add :container_image
32     t.add :cwd
33     t.add :environment
34     t.add :exit_code
35     t.add :finished_at
36     t.add :locked_by_uuid
37     t.add :log
38     t.add :mounts
39     t.add :output
40     t.add :output_path
41     t.add :priority
42     t.add :progress
43     t.add :runtime_constraints
44     t.add :started_at
45     t.add :state
46     t.add :auth_uuid
47   end
48
49   # Supported states for a container
50   States =
51     [
52      (Queued = 'Queued'),
53      (Locked = 'Locked'),
54      (Running = 'Running'),
55      (Complete = 'Complete'),
56      (Cancelled = 'Cancelled')
57     ]
58
59   State_transitions = {
60     nil => [Queued],
61     Queued => [Locked, Cancelled],
62     Locked => [Queued, Running, Cancelled],
63     Running => [Complete, Cancelled]
64   }
65
66   def state_transitions
67     State_transitions
68   end
69
70   def update_priority!
71     if [Queued, Locked, Running].include? self.state
72       # Update the priority of this container to the maximum priority of any of
73       # its committed container requests and save the record.
74       self.priority = ContainerRequest.
75         where(container_uuid: uuid,
76               state: ContainerRequest::Committed).
77         maximum('priority')
78       self.save!
79     end
80   end
81
82   def self.find_reusable(attrs)
83     candidates = Container.
84       where('command = ?', attrs[:command].to_yaml).
85       where('cwd = ?', attrs[:cwd]).
86       where('environment = ?', self.deep_sort_hash(attrs[:environment]).to_yaml).
87       where('output_path = ?', attrs[:output_path]).
88       where('container_image = ?', attrs[:container_image]).
89       where('mounts = ?', self.deep_sort_hash(attrs[:mounts]).to_yaml).
90       where('runtime_constraints = ?', self.deep_sort_hash(attrs[:runtime_constraints]).to_yaml)
91
92     # Check for Completed candidates that had consistent outputs.
93     completed = candidates.where(state: Complete).where(exit_code: 0)
94     outputs = completed.select('output').group('output').limit(2)
95     if outputs.count.count != 1
96       Rails.logger.debug("Found #{outputs.count.length} different outputs")
97     elsif Collection.
98         readable_by(current_user).
99         where(portable_data_hash: outputs.first.output).
100         count < 1
101       Rails.logger.info("Found reusable container(s) " +
102                         "but output #{outputs.first} is not readable " +
103                         "by user #{current_user.uuid}")
104     else
105       # Return the oldest eligible container whose log is still
106       # present and readable by current_user.
107       readable_pdh = Collection.
108         readable_by(current_user).
109         select('portable_data_hash')
110       completed = completed.
111         where("log in (#{readable_pdh.to_sql})").
112         order('finished_at asc').
113         limit(1)
114       if completed.first
115         return completed.first
116       else
117         Rails.logger.info("Found reusable container(s) but none with a log " +
118                           "readable by user #{current_user.uuid}")
119       end
120     end
121
122     # Check for Running candidates and return the most likely to finish sooner.
123     running = candidates.where(state: Running).
124       order('progress desc, started_at asc').limit(1).first
125     return running if not running.nil?
126
127     # Check for Locked or Queued ones and return the most likely to start first.
128     locked_or_queued = candidates.where("state IN (?)", [Locked, Queued]).
129       order('state asc, priority desc, created_at asc').limit(1).first
130     return locked_or_queued if not locked_or_queued.nil?
131
132     # No suitable candidate found.
133     nil
134   end
135
136   def lock
137     with_lock do
138       if self.state == Locked
139         raise AlreadyLockedError
140       end
141       self.state = Locked
142       self.save!
143     end
144   end
145
146   def unlock
147     with_lock do
148       if self.state == Queued
149         raise InvalidStateTransitionError
150       end
151       self.state = Queued
152       self.save!
153     end
154   end
155
156   def self.readable_by(*users_list)
157     if users_list.select { |u| u.is_admin }.any?
158       return self
159     end
160     user_uuids = users_list.map { |u| u.uuid }
161     uuid_list = user_uuids + users_list.flat_map { |u| u.groups_i_can(:read) }
162     uuid_list.uniq!
163     permitted = "(SELECT head_uuid FROM links WHERE link_class='permission' AND tail_uuid IN (:uuids))"
164     joins(:container_requests).
165       where("container_requests.uuid IN #{permitted} OR "+
166             "container_requests.owner_uuid IN (:uuids)",
167             uuids: uuid_list)
168   end
169
170   def final?
171     [Complete, Cancelled].include?(self.state)
172   end
173
174   protected
175
176   def fill_field_defaults
177     self.state ||= Queued
178     self.environment ||= {}
179     self.runtime_constraints ||= {}
180     self.mounts ||= {}
181     self.cwd ||= "."
182     self.priority ||= 1
183   end
184
185   def permission_to_create
186     current_user.andand.is_admin
187   end
188
189   def permission_to_update
190     # Override base permission check to allow auth_uuid to set progress and
191     # output (only).  Whether it is legal to set progress and output in the current
192     # state has already been checked in validate_change.
193     current_user.andand.is_admin ||
194       (!Thread.current[:api_client_authorization].nil? and
195        [self.auth_uuid, self.locked_by_uuid].include? Thread.current[:api_client_authorization].uuid)
196   end
197
198   def ensure_owner_uuid_is_permitted
199     # Override base permission check to allow auth_uuid to set progress and
200     # output (only).  Whether it is legal to set progress and output in the current
201     # state has already been checked in validate_change.
202     if !Thread.current[:api_client_authorization].nil? and self.auth_uuid == Thread.current[:api_client_authorization].uuid
203       check_update_whitelist [:progress, :output]
204     else
205       super
206     end
207   end
208
209   def set_timestamps
210     if self.state_changed? and self.state == Running
211       self.started_at ||= db_current_time
212     end
213
214     if self.state_changed? and [Complete, Cancelled].include? self.state
215       self.finished_at ||= db_current_time
216     end
217   end
218
219   def validate_change
220     permitted = [:state]
221
222     if self.new_record?
223       permitted.push(:owner_uuid, :command, :container_image, :cwd,
224                      :environment, :mounts, :output_path, :priority,
225                      :runtime_constraints)
226     end
227
228     case self.state
229     when Queued, Locked
230       permitted.push :priority
231
232     when Running
233       permitted.push :priority, :progress, :output
234       if self.state_changed?
235         permitted.push :started_at
236       end
237
238     when Complete
239       if self.state_was == Running
240         permitted.push :finished_at, :output, :log, :exit_code
241       end
242
243     when Cancelled
244       case self.state_was
245       when Running
246         permitted.push :finished_at, :output, :log
247       when Queued, Locked
248         permitted.push :finished_at
249       end
250
251     else
252       # The state_transitions check will add an error message for this
253       return false
254     end
255
256     check_update_whitelist permitted
257   end
258
259   def validate_lock
260     if [Locked, Running].include? self.state
261       # If the Container was already locked, locked_by_uuid must not
262       # changes. Otherwise, the current auth gets the lock.
263       need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].andand.uuid
264     else
265       need_lock = nil
266     end
267
268     # The caller can provide a new value for locked_by_uuid, but only
269     # if it's exactly what we expect. This allows a caller to perform
270     # an update like {"state":"Unlocked","locked_by_uuid":null}.
271     if self.locked_by_uuid_changed?
272       if self.locked_by_uuid != need_lock
273         return errors.add :locked_by_uuid, "can only change to #{need_lock}"
274       end
275     end
276     self.locked_by_uuid = need_lock
277   end
278
279   def validate_output
280     # Output must be exist and be readable by the current user.  This is so
281     # that a container cannot "claim" a collection that it doesn't otherwise
282     # have access to just by setting the output field to the collection PDH.
283     if output_changed?
284       c = Collection.
285           readable_by(current_user).
286           where(portable_data_hash: self.output).
287           first
288       if !c
289         errors.add :output, "collection must exist and be readable by current user."
290       end
291     end
292   end
293
294   def assign_auth
295     if self.auth_uuid_changed?
296       return errors.add :auth_uuid, 'is readonly'
297     end
298     if not [Locked, Running].include? self.state
299       # don't need one
300       self.auth.andand.update_attributes(expires_at: db_current_time)
301       self.auth = nil
302       return
303     elsif self.auth
304       # already have one
305       return
306     end
307     cr = ContainerRequest.
308       where('container_uuid=? and priority>0', self.uuid).
309       order('priority desc').
310       first
311     if !cr
312       return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
313     end
314     self.auth = ApiClientAuthorization.
315       create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
316               api_client_id: 0)
317   end
318
319   def sort_serialized_attrs
320     if self.environment_changed?
321       self.environment = self.class.deep_sort_hash(self.environment)
322     end
323     if self.mounts_changed?
324       self.mounts = self.class.deep_sort_hash(self.mounts)
325     end
326     if self.runtime_constraints_changed?
327       self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
328     end
329   end
330
331   def handle_completed
332     # This container is finished so finalize any associated container requests
333     # that are associated with this container.
334     if self.state_changed? and self.final?
335       act_as_system_user do
336
337         if self.state == Cancelled
338           retryable_requests = ContainerRequest.where("priority > 0 and state = 'Committed' and container_count < container_count_max")
339         else
340           retryable_requests = []
341         end
342
343         if retryable_requests.any?
344           c_attrs = {
345             command: self.command,
346             cwd: self.cwd,
347             environment: self.environment,
348             output_path: self.output_path,
349             container_image: self.container_image,
350             mounts: self.mounts,
351             runtime_constraints: self.runtime_constraints
352           }
353           c = Container.create! c_attrs
354           retryable_requests.each do |cr|
355             cr.with_lock do
356               # Use row locking because this increments container_count
357               cr.container_uuid = c.uuid
358               cr.save
359             end
360           end
361         end
362
363         # Notify container requests associated with this container
364         ContainerRequest.where(container_uuid: uuid,
365                                state: ContainerRequest::Committed).each do |cr|
366           cr.finalize!
367         end
368
369         # Try to cancel any outstanding container requests made by this container.
370         ContainerRequest.where(requesting_container_uuid: uuid,
371                                state: ContainerRequest::Committed).each do |cr|
372           cr.priority = 0
373           cr.save
374         end
375       end
376     end
377   end
378
379 end