9623: Several fixes addressing review comments:
[arvados.git] / services / api / app / models / container.rb
1 require 'whitelist_update'
2
3 class Container < ArvadosModel
4   include HasUuid
5   include KindAndEtag
6   include CommonApiTemplate
7   include WhitelistUpdate
8
9   serialize :environment, Hash
10   serialize :mounts, Hash
11   serialize :runtime_constraints, Hash
12   serialize :command, Array
13
14   before_validation :fill_field_defaults, :if => :new_record?
15   before_validation :set_timestamps
16   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
17   validate :validate_state_change
18   validate :validate_change
19   validate :validate_lock
20   after_validation :assign_auth
21   before_save :sort_serialized_attrs
22   after_save :handle_completed
23
24   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
25   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
26
27   api_accessible :user, extend: :common do |t|
28     t.add :command
29     t.add :container_image
30     t.add :cwd
31     t.add :environment
32     t.add :exit_code
33     t.add :finished_at
34     t.add :locked_by_uuid
35     t.add :log
36     t.add :mounts
37     t.add :output
38     t.add :output_path
39     t.add :priority
40     t.add :progress
41     t.add :runtime_constraints
42     t.add :started_at
43     t.add :state
44     t.add :auth_uuid
45   end
46
47   # Supported states for a container
48   States =
49     [
50      (Queued = 'Queued'),
51      (Locked = 'Locked'),
52      (Running = 'Running'),
53      (Complete = 'Complete'),
54      (Cancelled = 'Cancelled')
55     ]
56
57   State_transitions = {
58     nil => [Queued],
59     Queued => [Locked, Cancelled],
60     Locked => [Queued, Running, Cancelled],
61     Running => [Complete, Cancelled]
62   }
63
64   def state_transitions
65     State_transitions
66   end
67
68   def update_priority!
69     if [Queued, Locked, Running].include? self.state
70       # Update the priority of this container to the maximum priority of any of
71       # its committed container requests and save the record.
72       self.priority = ContainerRequest.
73         where(container_uuid: uuid,
74               state: ContainerRequest::Committed).
75         maximum('priority')
76       self.save!
77     end
78   end
79
80   def self.find_reusable(attrs)
81     candidates = Container.
82       where('command = ?', attrs[:command].to_yaml).
83       where('cwd = ?', attrs[:cwd]).
84       where('environment = ?', self.deep_sort_hash(attrs[:environment]).to_yaml).
85       where('output_path = ?', attrs[:output_path]).
86       where('container_image = ?', attrs[:container_image]).
87       where('mounts = ?', self.deep_sort_hash(attrs[:mounts]).to_yaml).
88       where('runtime_constraints = ?', self.deep_sort_hash(attrs[:runtime_constraints]).to_yaml)
89
90     # Check for Completed candidates that only had consistent outputs.
91     completed = candidates.where(state: Complete).where(exit_code: 0)
92     if completed.select("output").group('output').limit(2).length == 1
93       return completed.order('finished_at asc').limit(1).first
94     end
95
96     # Check for Running candidates and return the most likely to finish sooner.
97     running = candidates.where(state: Running).
98       order('progress desc, started_at asc').limit(1).first
99     return running if not running.nil?
100
101     # Check for Locked or Queued ones and return the most likely to start first.
102     locked_or_queued = candidates.where("state IN (?)", [Locked, Queued]).
103       order('state asc, priority desc, created_at asc').limit(1).first
104     return locked_or_queued if not locked_or_queued.nil?
105
106     # No suitable candidate found.
107     nil
108   end
109
110   def lock
111     with_lock do
112       if self.state == Locked
113         raise AlreadyLockedError
114       end
115       self.state = Locked
116       self.save!
117     end
118   end
119
120   def unlock
121     with_lock do
122       if self.state == Queued
123         raise InvalidStateTransitionError
124       end
125       self.state = Queued
126       self.save!
127     end
128   end
129
130   def self.readable_by(*users_list)
131     if users_list.select { |u| u.is_admin }.any?
132       return self
133     end
134     user_uuids = users_list.map { |u| u.uuid }
135     uuid_list = user_uuids + users_list.flat_map { |u| u.groups_i_can(:read) }
136     uuid_list.uniq!
137     permitted = "(SELECT head_uuid FROM links WHERE link_class='permission' AND tail_uuid IN (:uuids))"
138     joins(:container_requests).
139       where("container_requests.uuid IN #{permitted} OR "+
140             "container_requests.owner_uuid IN (:uuids)",
141             uuids: uuid_list)
142   end
143
144   protected
145
146   def fill_field_defaults
147     self.state ||= Queued
148     self.environment ||= {}
149     self.runtime_constraints ||= {}
150     self.mounts ||= {}
151     self.cwd ||= "."
152     self.priority ||= 1
153   end
154
155   def permission_to_create
156     current_user.andand.is_admin
157   end
158
159   def permission_to_update
160     current_user.andand.is_admin
161   end
162
163   def set_timestamps
164     if self.state_changed? and self.state == Running
165       self.started_at ||= db_current_time
166     end
167
168     if self.state_changed? and [Complete, Cancelled].include? self.state
169       self.finished_at ||= db_current_time
170     end
171   end
172
173   def validate_change
174     permitted = [:state]
175
176     if self.new_record?
177       permitted.push(:owner_uuid, :command, :container_image, :cwd,
178                      :environment, :mounts, :output_path, :priority,
179                      :runtime_constraints)
180     end
181
182     case self.state
183     when Queued, Locked
184       permitted.push :priority
185
186     when Running
187       permitted.push :priority, :progress
188       if self.state_changed?
189         permitted.push :started_at
190       end
191
192     when Complete
193       if self.state_was == Running
194         permitted.push :finished_at, :output, :log, :exit_code
195       end
196
197     when Cancelled
198       case self.state_was
199       when Running
200         permitted.push :finished_at, :output, :log
201       when Queued, Locked
202         permitted.push :finished_at
203       end
204
205     else
206       # The state_transitions check will add an error message for this
207       return false
208     end
209
210     check_update_whitelist permitted
211   end
212
213   def validate_lock
214     # If the Container is already locked by someone other than the
215     # current api_client_auth, disallow all changes -- except
216     # priority, which needs to change to reflect max(priority) of
217     # relevant ContainerRequests.
218     if locked_by_uuid_was
219       if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
220         check_update_whitelist [:priority]
221       end
222     end
223
224     if [Locked, Running].include? self.state
225       # If the Container was already locked, locked_by_uuid must not
226       # changes. Otherwise, the current auth gets the lock.
227       need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid
228     else
229       need_lock = nil
230     end
231
232     # The caller can provide a new value for locked_by_uuid, but only
233     # if it's exactly what we expect. This allows a caller to perform
234     # an update like {"state":"Unlocked","locked_by_uuid":null}.
235     if self.locked_by_uuid_changed?
236       if self.locked_by_uuid != need_lock
237         return errors.add :locked_by_uuid, "can only change to #{need_lock}"
238       end
239     end
240     self.locked_by_uuid = need_lock
241   end
242
243   def assign_auth
244     if self.auth_uuid_changed?
245       return errors.add :auth_uuid, 'is readonly'
246     end
247     if not [Locked, Running].include? self.state
248       # don't need one
249       self.auth.andand.update_attributes(expires_at: db_current_time)
250       self.auth = nil
251       return
252     elsif self.auth
253       # already have one
254       return
255     end
256     cr = ContainerRequest.
257       where('container_uuid=? and priority>0', self.uuid).
258       order('priority desc').
259       first
260     if !cr
261       return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
262     end
263     self.auth = ApiClientAuthorization.
264       create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
265               api_client_id: 0)
266   end
267
268   def sort_serialized_attrs
269     if self.environment_changed?
270       self.environment = self.class.deep_sort_hash(self.environment)
271     end
272     if self.mounts_changed?
273       self.mounts = self.class.deep_sort_hash(self.mounts)
274     end
275     if self.runtime_constraints_changed?
276       self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
277     end
278   end
279
280   def handle_completed
281     # This container is finished so finalize any associated container requests
282     # that are associated with this container.
283     if self.state_changed? and [Complete, Cancelled].include? self.state
284       act_as_system_user do
285         # Notify container requests associated with this container
286         ContainerRequest.where(container_uuid: uuid,
287                                :state => ContainerRequest::Committed).each do |cr|
288           cr.container_completed!
289         end
290
291         # Try to cancel any outstanding container requests made by this container.
292         ContainerRequest.where(requesting_container_uuid: uuid,
293                                :state => ContainerRequest::Committed).each do |cr|
294           cr.priority = 0
295           cr.save
296         end
297       end
298     end
299   end
300
301 end