9623: Check for reusable containers in Completed state that has existing output and...
[arvados.git] / services / api / app / models / container.rb
1 require 'whitelist_update'
2
3 class Container < ArvadosModel
4   include HasUuid
5   include KindAndEtag
6   include CommonApiTemplate
7   include WhitelistUpdate
8
9   serialize :environment, Hash
10   serialize :mounts, Hash
11   serialize :runtime_constraints, Hash
12   serialize :command, Array
13
14   before_validation :fill_field_defaults, :if => :new_record?
15   before_validation :set_timestamps
16   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
17   validate :validate_state_change
18   validate :validate_change
19   validate :validate_lock
20   after_validation :assign_auth
21   before_save :sort_serialized_attrs
22   after_save :handle_completed
23
24   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
25   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
26
27   api_accessible :user, extend: :common do |t|
28     t.add :command
29     t.add :container_image
30     t.add :cwd
31     t.add :environment
32     t.add :exit_code
33     t.add :finished_at
34     t.add :locked_by_uuid
35     t.add :log
36     t.add :mounts
37     t.add :output
38     t.add :output_path
39     t.add :priority
40     t.add :progress
41     t.add :runtime_constraints
42     t.add :started_at
43     t.add :state
44     t.add :auth_uuid
45   end
46
47   # Supported states for a container
48   States =
49     [
50      (Queued = 'Queued'),
51      (Locked = 'Locked'),
52      (Running = 'Running'),
53      (Complete = 'Complete'),
54      (Cancelled = 'Cancelled')
55     ]
56
57   State_transitions = {
58     nil => [Queued],
59     Queued => [Locked, Cancelled],
60     Locked => [Queued, Running, Cancelled],
61     Running => [Complete, Cancelled]
62   }
63
64   def state_transitions
65     State_transitions
66   end
67
68   def update_priority!
69     if [Queued, Locked, Running].include? self.state
70       # Update the priority of this container to the maximum priority of any of
71       # its committed container requests and save the record.
72       self.priority = ContainerRequest.
73         where(container_uuid: uuid,
74               state: ContainerRequest::Committed).
75         maximum('priority')
76       self.save!
77     end
78   end
79
80   def self.find_reusable(attrs)
81     candidates = Container.
82       where('command = ?', attrs[:command].to_yaml).
83       where('cwd = ?', attrs[:cwd]).
84       where('environment = ?', self.deep_sort_hash(attrs[:environment]).to_yaml).
85       where('output_path = ?', attrs[:output_path]).
86       where('container_image = ?', attrs[:container_image]).
87       where('mounts = ?', self.deep_sort_hash(attrs[:mounts]).to_yaml).
88       where('runtime_constraints = ?', self.deep_sort_hash(attrs[:runtime_constraints]).to_yaml).
89       where('state in (?)', [Container::Queued, Container::Locked,
90                              Container::Running, Container::Complete]).
91       reject {|c| c.state == Container::Complete and (c.exit_code != 0 or
92                                                       c.output.nil? or
93                                                       c.log.nil?)}
94     if candidates.empty?
95       nil
96     elsif candidates.count == 1
97       candidates.first
98     else
99       # Multiple candidates found, search for the best one:
100       # The most recent completed container
101       winner = candidates.select {|c| c.state == Container::Complete}.
102         sort_by {|c| c.finished_at}.last
103       return winner if not winner.nil?
104       # The running container that's most likely to finish sooner.
105       winner = candidates.select {|c| c.state == Container::Running}.
106         sort {|a, b| [b.progress, a.started_at] <=> [a.progress, b.started_at]}.first
107       return winner if not winner.nil?
108       # The locked container that's most likely to start sooner.
109       winner = candidates.select {|c| c.state == Container::Locked}.
110         sort {|a, b| [b.priority, a.created_at] <=> [a.priority, b.created_at]}.first
111       return winner if not winner.nil?
112       # The queued container that's most likely to start sooner.
113       winner = candidates.select {|c| c.state == Container::Queued}.
114         sort {|a, b| [b.priority, a.created_at] <=> [a.priority, b.created_at]}.first
115       return winner if not winner.nil?
116     end
117   end
118
119   protected
120
121   def self.deep_sort_hash(x)
122     if x.is_a? Hash
123       x.sort.collect do |k, v|
124         [k, deep_sort_hash(v)]
125       end.to_h
126     elsif x.is_a? Array
127       x.collect { |v| deep_sort_hash(v) }
128     else
129       x
130     end
131   end
132
133   def fill_field_defaults
134     self.state ||= Queued
135     self.environment ||= {}
136     self.runtime_constraints ||= {}
137     self.mounts ||= {}
138     self.cwd ||= "."
139     self.priority ||= 1
140   end
141
142   def permission_to_create
143     current_user.andand.is_admin
144   end
145
146   def permission_to_update
147     current_user.andand.is_admin
148   end
149
150   def set_timestamps
151     if self.state_changed? and self.state == Running
152       self.started_at ||= db_current_time
153     end
154
155     if self.state_changed? and [Complete, Cancelled].include? self.state
156       self.finished_at ||= db_current_time
157     end
158   end
159
160   def validate_change
161     permitted = [:state]
162
163     if self.new_record?
164       permitted.push(:owner_uuid, :command, :container_image, :cwd,
165                      :environment, :mounts, :output_path, :priority,
166                      :runtime_constraints)
167     end
168
169     case self.state
170     when Queued, Locked
171       permitted.push :priority
172
173     when Running
174       permitted.push :priority, :progress
175       if self.state_changed?
176         permitted.push :started_at
177       end
178
179     when Complete
180       if self.state_was == Running
181         permitted.push :finished_at, :output, :log, :exit_code
182       end
183
184     when Cancelled
185       case self.state_was
186       when Running
187         permitted.push :finished_at, :output, :log
188       when Queued, Locked
189         permitted.push :finished_at
190       end
191
192     else
193       # The state_transitions check will add an error message for this
194       return false
195     end
196
197     check_update_whitelist permitted
198   end
199
200   def validate_lock
201     # If the Container is already locked by someone other than the
202     # current api_client_auth, disallow all changes -- except
203     # priority, which needs to change to reflect max(priority) of
204     # relevant ContainerRequests.
205     if locked_by_uuid_was
206       if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
207         check_update_whitelist [:priority]
208       end
209     end
210
211     if [Locked, Running].include? self.state
212       # If the Container was already locked, locked_by_uuid must not
213       # changes. Otherwise, the current auth gets the lock.
214       need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid
215     else
216       need_lock = nil
217     end
218
219     # The caller can provide a new value for locked_by_uuid, but only
220     # if it's exactly what we expect. This allows a caller to perform
221     # an update like {"state":"Unlocked","locked_by_uuid":null}.
222     if self.locked_by_uuid_changed?
223       if self.locked_by_uuid != need_lock
224         return errors.add :locked_by_uuid, "can only change to #{need_lock}"
225       end
226     end
227     self.locked_by_uuid = need_lock
228   end
229
230   def assign_auth
231     if self.auth_uuid_changed?
232       return errors.add :auth_uuid, 'is readonly'
233     end
234     if not [Locked, Running].include? self.state
235       # don't need one
236       self.auth.andand.update_attributes(expires_at: db_current_time)
237       self.auth = nil
238       return
239     elsif self.auth
240       # already have one
241       return
242     end
243     cr = ContainerRequest.
244       where('container_uuid=? and priority>0', self.uuid).
245       order('priority desc').
246       first
247     if !cr
248       return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
249     end
250     self.auth = ApiClientAuthorization.
251       create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
252               api_client_id: 0)
253   end
254
255   def sort_serialized_attrs
256     self.environment = self.class.deep_sort_hash(self.environment)
257     self.mounts = self.class.deep_sort_hash(self.mounts)
258     self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
259   end
260
261   def handle_completed
262     # This container is finished so finalize any associated container requests
263     # that are associated with this container.
264     if self.state_changed? and [Complete, Cancelled].include? self.state
265       act_as_system_user do
266         # Notify container requests associated with this container
267         ContainerRequest.where(container_uuid: uuid,
268                                :state => ContainerRequest::Committed).each do |cr|
269           cr.container_completed!
270         end
271
272         # Try to cancel any outstanding container requests made by this container.
273         ContainerRequest.where(requesting_container_uuid: uuid,
274                                :state => ContainerRequest::Committed).each do |cr|
275           cr.priority = 0
276           cr.save
277         end
278       end
279     end
280   end
281
282 end