10172: Allow auth_uuid to set container output. Work in progress.
[arvados.git] / services / api / app / models / container.rb
1 require 'whitelist_update'
2
3 class Container < ArvadosModel
4   include HasUuid
5   include KindAndEtag
6   include CommonApiTemplate
7   include WhitelistUpdate
8   extend CurrentApiClient
9
10   serialize :environment, Hash
11   serialize :mounts, Hash
12   serialize :runtime_constraints, Hash
13   serialize :command, Array
14
15   before_validation :fill_field_defaults, :if => :new_record?
16   before_validation :set_timestamps
17   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
18   validate :validate_state_change
19   validate :validate_change
20   validate :validate_lock
21   validate :validate_output
22   after_validation :assign_auth
23   before_save :sort_serialized_attrs
24   after_save :handle_completed
25
26   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
27   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
28
29   api_accessible :user, extend: :common do |t|
30     t.add :command
31     t.add :container_image
32     t.add :cwd
33     t.add :environment
34     t.add :exit_code
35     t.add :finished_at
36     t.add :locked_by_uuid
37     t.add :log
38     t.add :mounts
39     t.add :output
40     t.add :output_path
41     t.add :priority
42     t.add :progress
43     t.add :runtime_constraints
44     t.add :started_at
45     t.add :state
46     t.add :auth_uuid
47   end
48
49   # Supported states for a container
50   States =
51     [
52      (Queued = 'Queued'),
53      (Locked = 'Locked'),
54      (Running = 'Running'),
55      (Complete = 'Complete'),
56      (Cancelled = 'Cancelled')
57     ]
58
59   State_transitions = {
60     nil => [Queued],
61     Queued => [Locked, Cancelled],
62     Locked => [Queued, Running, Cancelled],
63     Running => [Complete, Cancelled]
64   }
65
66   def state_transitions
67     State_transitions
68   end
69
70   def update_priority!
71     if [Queued, Locked, Running].include? self.state
72       # Update the priority of this container to the maximum priority of any of
73       # its committed container requests and save the record.
74       self.priority = ContainerRequest.
75         where(container_uuid: uuid,
76               state: ContainerRequest::Committed).
77         maximum('priority')
78       self.save!
79     end
80   end
81
82   def self.find_reusable(attrs)
83     candidates = Container.
84       where('command = ?', attrs[:command].to_yaml).
85       where('cwd = ?', attrs[:cwd]).
86       where('environment = ?', self.deep_sort_hash(attrs[:environment]).to_yaml).
87       where('output_path = ?', attrs[:output_path]).
88       where('container_image = ?', attrs[:container_image]).
89       where('mounts = ?', self.deep_sort_hash(attrs[:mounts]).to_yaml).
90       where('runtime_constraints = ?', self.deep_sort_hash(attrs[:runtime_constraints]).to_yaml)
91
92     # Check for Completed candidates that had consistent outputs.
93     completed = candidates.where(state: Complete).where(exit_code: 0)
94     outputs = completed.select('output').group('output').limit(2)
95     if outputs.count.count != 1
96       Rails.logger.debug("Found #{outputs.count.length} different outputs")
97     elsif Collection.
98         readable_by(current_user).
99         where(portable_data_hash: outputs.first.output).
100         count < 1
101       Rails.logger.info("Found reusable container(s) " +
102                         "but output #{outputs.first} is not readable " +
103                         "by user #{current_user.uuid}")
104     else
105       # Return the oldest eligible container whose log is still
106       # present and readable by current_user.
107       readable_pdh = Collection.
108         readable_by(current_user).
109         select('portable_data_hash')
110       completed = completed.
111         where("log in (#{readable_pdh.to_sql})").
112         order('finished_at asc').
113         limit(1)
114       if completed.first
115         return completed.first
116       else
117         Rails.logger.info("Found reusable container(s) but none with a log " +
118                           "readable by user #{current_user.uuid}")
119       end
120     end
121
122     # Check for Running candidates and return the most likely to finish sooner.
123     running = candidates.where(state: Running).
124       order('progress desc, started_at asc').limit(1).first
125     return running if not running.nil?
126
127     # Check for Locked or Queued ones and return the most likely to start first.
128     locked_or_queued = candidates.where("state IN (?)", [Locked, Queued]).
129       order('state asc, priority desc, created_at asc').limit(1).first
130     return locked_or_queued if not locked_or_queued.nil?
131
132     # No suitable candidate found.
133     nil
134   end
135
136   def lock
137     with_lock do
138       if self.state == Locked
139         raise AlreadyLockedError
140       end
141       self.state = Locked
142       self.save!
143     end
144   end
145
146   def unlock
147     with_lock do
148       if self.state == Queued
149         raise InvalidStateTransitionError
150       end
151       self.state = Queued
152       self.save!
153     end
154   end
155
156   def self.readable_by(*users_list)
157     if users_list.select { |u| u.is_admin }.any?
158       return self
159     end
160     user_uuids = users_list.map { |u| u.uuid }
161     uuid_list = user_uuids + users_list.flat_map { |u| u.groups_i_can(:read) }
162     uuid_list.uniq!
163     permitted = "(SELECT head_uuid FROM links WHERE link_class='permission' AND tail_uuid IN (:uuids))"
164     joins(:container_requests).
165       where("container_requests.uuid IN #{permitted} OR "+
166             "container_requests.owner_uuid IN (:uuids)",
167             uuids: uuid_list)
168   end
169
170   def final?
171     [Complete, Cancelled].include?(self.state)
172   end
173
174   protected
175
176   def fill_field_defaults
177     self.state ||= Queued
178     self.environment ||= {}
179     self.runtime_constraints ||= {}
180     self.mounts ||= {}
181     self.cwd ||= "."
182     self.priority ||= 1
183   end
184
185   def permission_to_create
186     current_user.andand.is_admin
187   end
188
189   def permission_to_update
190     current_user.andand.is_admin
191   end
192
193   def set_timestamps
194     if self.state_changed? and self.state == Running
195       self.started_at ||= db_current_time
196     end
197
198     if self.state_changed? and [Complete, Cancelled].include? self.state
199       self.finished_at ||= db_current_time
200     end
201   end
202
203   def validate_change
204     permitted = [:state]
205
206     if self.new_record?
207       permitted.push(:owner_uuid, :command, :container_image, :cwd,
208                      :environment, :mounts, :output_path, :priority,
209                      :runtime_constraints)
210     end
211
212     case self.state
213     when Queued, Locked
214       permitted.push :priority
215
216     when Running
217       permitted.push :priority, :progress, :output
218       if self.state_changed?
219         permitted.push :started_at
220       end
221
222     when Complete
223       if self.state_was == Running
224         permitted.push :finished_at, :output, :log, :exit_code
225       end
226
227     when Cancelled
228       case self.state_was
229       when Running
230         permitted.push :finished_at, :output, :log
231       when Queued, Locked
232         permitted.push :finished_at
233       end
234
235     else
236       # The state_transitions check will add an error message for this
237       return false
238     end
239
240     check_update_whitelist permitted
241   end
242
243   def validate_lock
244     # If the Container is already locked by someone other than the
245     # current api_client_auth, disallow all changes -- except
246     # priority, which needs to change to reflect max(priority) of
247     # relevant ContainerRequests.
248     if !locked_by_uuid_was.nil? and locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
249       if auth_uuid_was == Thread.current[:api_client_authorization].uuid
250         check_update_whitelist [:priority, :output, :progress]
251       else
252         check_update_whitelist [:priority]
253       end
254     end
255
256     if [Locked, Running].include? self.state
257       # If the Container was already locked, locked_by_uuid must not
258       # changes. Otherwise, the current auth gets the lock.
259       need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid
260     else
261       need_lock = nil
262     end
263
264     # The caller can provide a new value for locked_by_uuid, but only
265     # if it's exactly what we expect. This allows a caller to perform
266     # an update like {"state":"Unlocked","locked_by_uuid":null}.
267     if self.locked_by_uuid_changed?
268       if self.locked_by_uuid != need_lock
269         return errors.add :locked_by_uuid, "can only change to #{need_lock}"
270       end
271     end
272     self.locked_by_uuid = need_lock
273   end
274
275   def validate_output
276     if output_changed?
277       apiauth = ApiClientAuthorization.find_by_uuid(uuid: auth_uuid)
278       c = Collection.
279           readable_by(User.find_by_id(apiauth.user_id)).
280           where(portable_data_hash: self.output).
281           first
282       if !c
283         raise #ArvadosModel::UnresolvableContainerError.new "cannot mount collection #{uuid.inspect}: not found"
284       end
285     end
286   end
287
288   def assign_auth
289     if self.auth_uuid_changed?
290       return errors.add :auth_uuid, 'is readonly'
291     end
292     if not [Locked, Running].include? self.state
293       # don't need one
294       self.auth.andand.update_attributes(expires_at: db_current_time)
295       self.auth = nil
296       return
297     elsif self.auth
298       # already have one
299       return
300     end
301     cr = ContainerRequest.
302       where('container_uuid=? and priority>0', self.uuid).
303       order('priority desc').
304       first
305     if !cr
306       return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
307     end
308     self.auth = ApiClientAuthorization.
309       create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
310               api_client_id: 0)
311   end
312
313   def sort_serialized_attrs
314     if self.environment_changed?
315       self.environment = self.class.deep_sort_hash(self.environment)
316     end
317     if self.mounts_changed?
318       self.mounts = self.class.deep_sort_hash(self.mounts)
319     end
320     if self.runtime_constraints_changed?
321       self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
322     end
323   end
324
325   def handle_completed
326     # This container is finished so finalize any associated container requests
327     # that are associated with this container.
328     if self.state_changed? and self.final?
329       act_as_system_user do
330
331         if self.state == Cancelled
332           retryable_requests = ContainerRequest.where("priority > 0 and state = 'Committed' and container_count < container_count_max")
333         else
334           retryable_requests = []
335         end
336
337         if retryable_requests.any?
338           c_attrs = {
339             command: self.command,
340             cwd: self.cwd,
341             environment: self.environment,
342             output_path: self.output_path,
343             container_image: self.container_image,
344             mounts: self.mounts,
345             runtime_constraints: self.runtime_constraints
346           }
347           c = Container.create! c_attrs
348           retryable_requests.each do |cr|
349             cr.with_lock do
350               # Use row locking because this increments container_count
351               cr.container_uuid = c.uuid
352               cr.save
353             end
354           end
355         end
356
357         # Notify container requests associated with this container
358         ContainerRequest.where(container_uuid: uuid,
359                                state: ContainerRequest::Committed).each do |cr|
360           cr.finalize!
361         end
362
363         # Try to cancel any outstanding container requests made by this container.
364         ContainerRequest.where(requesting_container_uuid: uuid,
365                                state: ContainerRequest::Committed).each do |cr|
366           cr.priority = 0
367           cr.save
368         end
369       end
370     end
371   end
372
373 end