Merge branch 'master' into 9623-reuse-containers
[arvados.git] / services / api / app / models / container.rb
1 require 'whitelist_update'
2
3 class Container < ArvadosModel
4   include HasUuid
5   include KindAndEtag
6   include CommonApiTemplate
7   include WhitelistUpdate
8
9   serialize :environment, Hash
10   serialize :mounts, Hash
11   serialize :runtime_constraints, Hash
12   serialize :command, Array
13
14   before_validation :fill_field_defaults, :if => :new_record?
15   before_validation :set_timestamps
16   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
17   validate :validate_state_change
18   validate :validate_change
19   validate :validate_lock
20   after_validation :assign_auth
21   before_save :sort_serialized_attrs
22   after_save :handle_completed
23
24   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
25   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
26
27   api_accessible :user, extend: :common do |t|
28     t.add :command
29     t.add :container_image
30     t.add :cwd
31     t.add :environment
32     t.add :exit_code
33     t.add :finished_at
34     t.add :locked_by_uuid
35     t.add :log
36     t.add :mounts
37     t.add :output
38     t.add :output_path
39     t.add :priority
40     t.add :progress
41     t.add :runtime_constraints
42     t.add :started_at
43     t.add :state
44     t.add :auth_uuid
45   end
46
47   # Supported states for a container
48   States =
49     [
50      (Queued = 'Queued'),
51      (Locked = 'Locked'),
52      (Running = 'Running'),
53      (Complete = 'Complete'),
54      (Cancelled = 'Cancelled')
55     ]
56
57   State_transitions = {
58     nil => [Queued],
59     Queued => [Locked, Cancelled],
60     Locked => [Queued, Running, Cancelled],
61     Running => [Complete, Cancelled]
62   }
63
64   def state_transitions
65     State_transitions
66   end
67
68   def update_priority!
69     if [Queued, Locked, Running].include? self.state
70       # Update the priority of this container to the maximum priority of any of
71       # its committed container requests and save the record.
72       self.priority = ContainerRequest.
73         where(container_uuid: uuid,
74               state: ContainerRequest::Committed).
75         maximum('priority')
76       self.save!
77     end
78   end
79
80   def self.find_reusable(attrs)
81     candidates = Container.
82       where('command = ?', attrs[:command].to_yaml).
83       where('cwd = ?', attrs[:cwd]).
84       where('environment = ?', self.deep_sort_hash(attrs[:environment]).to_yaml).
85       where('output_path = ?', attrs[:output_path]).
86       where('container_image = ?', attrs[:container_image]).
87       where('mounts = ?', self.deep_sort_hash(attrs[:mounts]).to_yaml).
88       where('runtime_constraints = ?', self.deep_sort_hash(attrs[:runtime_constraints]).to_yaml).
89       where('state in (?)', [Queued, Locked, Running, Complete]).
90       reject {|c| c.state == Complete and
91               (c.exit_code != 0 or c.output.nil? or c.log.nil?)}
92     if candidates.empty?
93       nil
94     elsif candidates.count == 1
95       candidates.first
96     else
97       # Multiple candidates found, search for the best one:
98       # The most recent completed container
99       winner = candidates.select {|c| c.state == Complete}.
100         sort_by {|c| c.finished_at}.last
101       return winner if not winner.nil?
102       # The running container that's most likely to finish sooner.
103       winner = candidates.select {|c| c.state == Running}.
104         sort {|a, b| [b.progress, a.started_at] <=> [a.progress, b.started_at]}.first
105       return winner if not winner.nil?
106       # The locked container that's most likely to start sooner.
107       winner = candidates.select {|c| c.state == Locked}.
108         sort {|a, b| [b.priority, a.created_at] <=> [a.priority, b.created_at]}.first
109       return winner if not winner.nil?
110       # The queued container that's most likely to start sooner.
111       winner = candidates.select {|c| c.state == Queued}.
112         sort {|a, b| [b.priority, a.created_at] <=> [a.priority, b.created_at]}.first
113       return winner if not winner.nil?
114     end
115   end
116
117   def lock
118     with_lock do
119       if self.state == Locked
120         raise AlreadyLockedError
121       end
122       self.state = Locked
123       self.save!
124     end
125   end
126
127   def unlock
128     with_lock do
129       if self.state == Queued
130         raise InvalidStateTransitionError
131       end
132       self.state = Queued
133       self.save!
134     end
135   end
136
137   def self.readable_by(*users_list)
138     if users_list.select { |u| u.is_admin }.any?
139       return self
140     end
141     user_uuids = users_list.map { |u| u.uuid }
142     uuid_list = user_uuids + users_list.flat_map { |u| u.groups_i_can(:read) }
143     uuid_list.uniq!
144     permitted = "(SELECT head_uuid FROM links WHERE link_class='permission' AND tail_uuid IN (:uuids))"
145     joins(:container_requests).
146       where("container_requests.uuid IN #{permitted} OR "+
147             "container_requests.owner_uuid IN (:uuids)",
148             uuids: uuid_list)
149   end
150
151   protected
152
153   def self.deep_sort_hash(x)
154     if x.is_a? Hash
155       x.sort.collect do |k, v|
156         [k, deep_sort_hash(v)]
157       end.to_h
158     elsif x.is_a? Array
159       x.collect { |v| deep_sort_hash(v) }
160     else
161       x
162     end
163   end
164
165   def fill_field_defaults
166     self.state ||= Queued
167     self.environment ||= {}
168     self.runtime_constraints ||= {}
169     self.mounts ||= {}
170     self.cwd ||= "."
171     self.priority ||= 1
172   end
173
174   def permission_to_create
175     current_user.andand.is_admin
176   end
177
178   def permission_to_update
179     current_user.andand.is_admin
180   end
181
182   def set_timestamps
183     if self.state_changed? and self.state == Running
184       self.started_at ||= db_current_time
185     end
186
187     if self.state_changed? and [Complete, Cancelled].include? self.state
188       self.finished_at ||= db_current_time
189     end
190   end
191
192   def validate_change
193     permitted = [:state]
194
195     if self.new_record?
196       permitted.push(:owner_uuid, :command, :container_image, :cwd,
197                      :environment, :mounts, :output_path, :priority,
198                      :runtime_constraints)
199     end
200
201     case self.state
202     when Queued, Locked
203       permitted.push :priority
204
205     when Running
206       permitted.push :priority, :progress
207       if self.state_changed?
208         permitted.push :started_at
209       end
210
211     when Complete
212       if self.state_was == Running
213         permitted.push :finished_at, :output, :log, :exit_code
214       end
215
216     when Cancelled
217       case self.state_was
218       when Running
219         permitted.push :finished_at, :output, :log
220       when Queued, Locked
221         permitted.push :finished_at
222       end
223
224     else
225       # The state_transitions check will add an error message for this
226       return false
227     end
228
229     check_update_whitelist permitted
230   end
231
232   def validate_lock
233     # If the Container is already locked by someone other than the
234     # current api_client_auth, disallow all changes -- except
235     # priority, which needs to change to reflect max(priority) of
236     # relevant ContainerRequests.
237     if locked_by_uuid_was
238       if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
239         check_update_whitelist [:priority]
240       end
241     end
242
243     if [Locked, Running].include? self.state
244       # If the Container was already locked, locked_by_uuid must not
245       # changes. Otherwise, the current auth gets the lock.
246       need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid
247     else
248       need_lock = nil
249     end
250
251     # The caller can provide a new value for locked_by_uuid, but only
252     # if it's exactly what we expect. This allows a caller to perform
253     # an update like {"state":"Unlocked","locked_by_uuid":null}.
254     if self.locked_by_uuid_changed?
255       if self.locked_by_uuid != need_lock
256         return errors.add :locked_by_uuid, "can only change to #{need_lock}"
257       end
258     end
259     self.locked_by_uuid = need_lock
260   end
261
262   def assign_auth
263     if self.auth_uuid_changed?
264       return errors.add :auth_uuid, 'is readonly'
265     end
266     if not [Locked, Running].include? self.state
267       # don't need one
268       self.auth.andand.update_attributes(expires_at: db_current_time)
269       self.auth = nil
270       return
271     elsif self.auth
272       # already have one
273       return
274     end
275     cr = ContainerRequest.
276       where('container_uuid=? and priority>0', self.uuid).
277       order('priority desc').
278       first
279     if !cr
280       return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
281     end
282     self.auth = ApiClientAuthorization.
283       create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
284               api_client_id: 0)
285   end
286
287   def sort_serialized_attrs
288     if self.environment_changed?
289       self.environment = self.class.deep_sort_hash(self.environment)
290     end
291     if self.mounts_changed?
292       self.mounts = self.class.deep_sort_hash(self.mounts)
293     end
294     if self.runtime_constraints_changed?
295       self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
296     end
297   end
298
299   def handle_completed
300     # This container is finished so finalize any associated container requests
301     # that are associated with this container.
302     if self.state_changed? and [Complete, Cancelled].include? self.state
303       act_as_system_user do
304         # Notify container requests associated with this container
305         ContainerRequest.where(container_uuid: uuid,
306                                :state => ContainerRequest::Committed).each do |cr|
307           cr.container_completed!
308         end
309
310         # Try to cancel any outstanding container requests made by this container.
311         ContainerRequest.where(requesting_container_uuid: uuid,
312                                :state => ContainerRequest::Committed).each do |cr|
313           cr.priority = 0
314           cr.save
315         end
316       end
317     end
318   end
319
320 end