9623: Added method to find a reusable container, used by ContainerRequest#resolve...
[arvados.git] / services / api / app / models / container.rb
1 require 'whitelist_update'
2
3 class Container < ArvadosModel
4   include HasUuid
5   include KindAndEtag
6   include CommonApiTemplate
7   include WhitelistUpdate
8
9   serialize :environment, Hash
10   serialize :mounts, Hash
11   serialize :runtime_constraints, Hash
12   serialize :command, Array
13
14   before_validation :fill_field_defaults, :if => :new_record?
15   before_validation :set_timestamps
16   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
17   validate :validate_state_change
18   validate :validate_change
19   validate :validate_lock
20   after_validation :assign_auth
21   before_save :sort_serialized_attrs
22   after_save :handle_completed
23
24   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
25   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
26
27   api_accessible :user, extend: :common do |t|
28     t.add :command
29     t.add :container_image
30     t.add :cwd
31     t.add :environment
32     t.add :exit_code
33     t.add :finished_at
34     t.add :locked_by_uuid
35     t.add :log
36     t.add :mounts
37     t.add :output
38     t.add :output_path
39     t.add :priority
40     t.add :progress
41     t.add :runtime_constraints
42     t.add :started_at
43     t.add :state
44     t.add :auth_uuid
45   end
46
47   # Supported states for a container
48   States =
49     [
50      (Queued = 'Queued'),
51      (Locked = 'Locked'),
52      (Running = 'Running'),
53      (Complete = 'Complete'),
54      (Cancelled = 'Cancelled')
55     ]
56
57   State_transitions = {
58     nil => [Queued],
59     Queued => [Locked, Cancelled],
60     Locked => [Queued, Running, Cancelled],
61     Running => [Complete, Cancelled]
62   }
63
64   def state_transitions
65     State_transitions
66   end
67
68   def update_priority!
69     if [Queued, Locked, Running].include? self.state
70       # Update the priority of this container to the maximum priority of any of
71       # its committed container requests and save the record.
72       self.priority = ContainerRequest.
73         where(container_uuid: uuid,
74               state: ContainerRequest::Committed).
75         maximum('priority')
76       self.save!
77     end
78   end
79
80   def self.find_reusable(attrs)
81     candidates = Container.
82       where('command = ?', attrs[:command].to_yaml).
83       where('cwd = ?', attrs[:cwd]).
84       where('environment = ?', self.deep_sort_hash(attrs[:environment]).to_yaml).
85       where('output_path = ?', attrs[:output_path]).
86       where('container_image = ?', attrs[:container_image]).
87       where('mounts = ?', self.deep_sort_hash(attrs[:mounts]).to_yaml).
88       where('runtime_constraints = ?', self.deep_sort_hash(attrs[:runtime_constraints]).to_yaml).
89       where('state in (?)', ['Queued', 'Locked', 'Running', 'Complete']).
90       reject {|c| c.state == 'Complete' and c.exit_code != 0}
91
92     if candidates.empty?
93       nil
94     elsif candidates.count == 1
95       candidates.first
96     else
97       # Multiple candidates found, search for the best one:
98       # The most recent completed container
99       winner = candidates.select {|c| c.state == 'Complete'}.sort_by {|c| c.finished_at}.last
100       winner if not winner.nil?
101       # The running container that's most likely to finish sooner.
102       winner = candidates.select {|c| c.state == 'Running'}.
103         sort {|a, b| [b.progress, a.started_at] <=> [a.progress, b.started_at]}.first
104       winner if not winner.nil?
105       # The locked container that's most likely to start sooner.
106       winner = candidates.select {|c| c.state == 'Locked'}.
107         sort {|a, b| [b.priority, a.created_at] <=> [a.priority, b.created_at]}.first
108       winner if not winner.nil?
109       # The queued container that's most likely to start sooner.
110       winner = candidates.select {|c| c.state == 'Queued'}.
111         sort {|a, b| [b.priority, a.created_at] <=> [a.priority, b.created_at]}.first
112       winner if not winner.nil?
113     end
114   end
115
116   protected
117
118   def self.deep_sort_hash(x)
119     if x.is_a? Hash
120       x.sort.collect do |k, v|
121         [k, deep_sort_hash(v)]
122       end.to_h
123     elsif x.is_a? Array
124       x.collect { |v| deep_sort_hash(v) }
125     else
126       x
127     end
128   end
129
130   def fill_field_defaults
131     self.state ||= Queued
132     self.environment ||= {}
133     self.runtime_constraints ||= {}
134     self.mounts ||= {}
135     self.cwd ||= "."
136     self.priority ||= 1
137   end
138
139   def permission_to_create
140     current_user.andand.is_admin
141   end
142
143   def permission_to_update
144     current_user.andand.is_admin
145   end
146
147   def set_timestamps
148     if self.state_changed? and self.state == Running
149       self.started_at ||= db_current_time
150     end
151
152     if self.state_changed? and [Complete, Cancelled].include? self.state
153       self.finished_at ||= db_current_time
154     end
155   end
156
157   def validate_change
158     permitted = [:state]
159
160     if self.new_record?
161       permitted.push(:owner_uuid, :command, :container_image, :cwd,
162                      :environment, :mounts, :output_path, :priority,
163                      :runtime_constraints)
164     end
165
166     case self.state
167     when Queued, Locked
168       permitted.push :priority
169
170     when Running
171       permitted.push :priority, :progress
172       if self.state_changed?
173         permitted.push :started_at
174       end
175
176     when Complete
177       if self.state_was == Running
178         permitted.push :finished_at, :output, :log, :exit_code
179       end
180
181     when Cancelled
182       case self.state_was
183       when Running
184         permitted.push :finished_at, :output, :log
185       when Queued, Locked
186         permitted.push :finished_at
187       end
188
189     else
190       # The state_transitions check will add an error message for this
191       return false
192     end
193
194     check_update_whitelist permitted
195   end
196
197   def validate_lock
198     # If the Container is already locked by someone other than the
199     # current api_client_auth, disallow all changes -- except
200     # priority, which needs to change to reflect max(priority) of
201     # relevant ContainerRequests.
202     if locked_by_uuid_was
203       if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
204         check_update_whitelist [:priority]
205       end
206     end
207
208     if [Locked, Running].include? self.state
209       # If the Container was already locked, locked_by_uuid must not
210       # changes. Otherwise, the current auth gets the lock.
211       need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid
212     else
213       need_lock = nil
214     end
215
216     # The caller can provide a new value for locked_by_uuid, but only
217     # if it's exactly what we expect. This allows a caller to perform
218     # an update like {"state":"Unlocked","locked_by_uuid":null}.
219     if self.locked_by_uuid_changed?
220       if self.locked_by_uuid != need_lock
221         return errors.add :locked_by_uuid, "can only change to #{need_lock}"
222       end
223     end
224     self.locked_by_uuid = need_lock
225   end
226
227   def assign_auth
228     if self.auth_uuid_changed?
229       return errors.add :auth_uuid, 'is readonly'
230     end
231     if not [Locked, Running].include? self.state
232       # don't need one
233       self.auth.andand.update_attributes(expires_at: db_current_time)
234       self.auth = nil
235       return
236     elsif self.auth
237       # already have one
238       return
239     end
240     cr = ContainerRequest.
241       where('container_uuid=? and priority>0', self.uuid).
242       order('priority desc').
243       first
244     if !cr
245       return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
246     end
247     self.auth = ApiClientAuthorization.
248       create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
249               api_client_id: 0)
250   end
251
252   def sort_serialized_attrs
253     self.environment = self.class.deep_sort_hash(self.environment)
254     self.mounts = self.class.deep_sort_hash(self.mounts)
255     self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
256   end
257
258   def handle_completed
259     # This container is finished so finalize any associated container requests
260     # that are associated with this container.
261     if self.state_changed? and [Complete, Cancelled].include? self.state
262       act_as_system_user do
263         # Notify container requests associated with this container
264         ContainerRequest.where(container_uuid: uuid,
265                                :state => ContainerRequest::Committed).each do |cr|
266           cr.container_completed!
267         end
268
269         # Try to cancel any outstanding container requests made by this container.
270         ContainerRequest.where(requesting_container_uuid: uuid,
271                                :state => ContainerRequest::Committed).each do |cr|
272           cr.priority = 0
273           cr.save
274         end
275       end
276     end
277   end
278
279 end