Merge branch '9848-copy-container-output' refs #9848
[arvados.git] / services / api / app / models / container.rb
1 require 'whitelist_update'
2
3 class Container < ArvadosModel
4   include HasUuid
5   include KindAndEtag
6   include CommonApiTemplate
7   include WhitelistUpdate
8   extend CurrentApiClient
9
10   serialize :environment, Hash
11   serialize :mounts, Hash
12   serialize :runtime_constraints, Hash
13   serialize :command, Array
14
15   before_validation :fill_field_defaults, :if => :new_record?
16   before_validation :set_timestamps
17   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
18   validate :validate_state_change
19   validate :validate_change
20   validate :validate_lock
21   after_validation :assign_auth
22   before_save :sort_serialized_attrs
23   after_save :handle_completed
24
25   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
26   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
27
28   api_accessible :user, extend: :common do |t|
29     t.add :command
30     t.add :container_image
31     t.add :cwd
32     t.add :environment
33     t.add :exit_code
34     t.add :finished_at
35     t.add :locked_by_uuid
36     t.add :log
37     t.add :mounts
38     t.add :output
39     t.add :output_path
40     t.add :priority
41     t.add :progress
42     t.add :runtime_constraints
43     t.add :started_at
44     t.add :state
45     t.add :auth_uuid
46   end
47
48   # Supported states for a container
49   States =
50     [
51      (Queued = 'Queued'),
52      (Locked = 'Locked'),
53      (Running = 'Running'),
54      (Complete = 'Complete'),
55      (Cancelled = 'Cancelled')
56     ]
57
58   State_transitions = {
59     nil => [Queued],
60     Queued => [Locked, Cancelled],
61     Locked => [Queued, Running, Cancelled],
62     Running => [Complete, Cancelled]
63   }
64
65   def state_transitions
66     State_transitions
67   end
68
69   def update_priority!
70     if [Queued, Locked, Running].include? self.state
71       # Update the priority of this container to the maximum priority of any of
72       # its committed container requests and save the record.
73       self.priority = ContainerRequest.
74         where(container_uuid: uuid,
75               state: ContainerRequest::Committed).
76         maximum('priority')
77       self.save!
78     end
79   end
80
81   def self.find_reusable(attrs)
82     candidates = Container.
83       where('command = ?', attrs[:command].to_yaml).
84       where('cwd = ?', attrs[:cwd]).
85       where('environment = ?', self.deep_sort_hash(attrs[:environment]).to_yaml).
86       where('output_path = ?', attrs[:output_path]).
87       where('container_image = ?', attrs[:container_image]).
88       where('mounts = ?', self.deep_sort_hash(attrs[:mounts]).to_yaml).
89       where('runtime_constraints = ?', self.deep_sort_hash(attrs[:runtime_constraints]).to_yaml)
90
91     # Check for Completed candidates that had consistent outputs.
92     completed = candidates.where(state: Complete).where(exit_code: 0)
93     outputs = completed.select('output').group('output').limit(2)
94     if outputs.count.count != 1
95       Rails.logger.debug("Found #{outputs.count.length} different outputs")
96     elsif Collection.
97         readable_by(current_user).
98         where(portable_data_hash: outputs.first.output).
99         count < 1
100       Rails.logger.info("Found reusable container(s) " +
101                         "but output #{outputs.first} is not readable " +
102                         "by user #{current_user.uuid}")
103     else
104       # Return the oldest eligible container whose log is still
105       # present and readable by current_user.
106       readable_pdh = Collection.
107         readable_by(current_user).
108         select('portable_data_hash')
109       completed = completed.
110         where("log in (#{readable_pdh.to_sql})").
111         order('finished_at asc').
112         limit(1)
113       if completed.first
114         return completed.first
115       else
116         Rails.logger.info("Found reusable container(s) but none with a log " +
117                           "readable by user #{current_user.uuid}")
118       end
119     end
120
121     # Check for Running candidates and return the most likely to finish sooner.
122     running = candidates.where(state: Running).
123       order('progress desc, started_at asc').limit(1).first
124     return running if not running.nil?
125
126     # Check for Locked or Queued ones and return the most likely to start first.
127     locked_or_queued = candidates.where("state IN (?)", [Locked, Queued]).
128       order('state asc, priority desc, created_at asc').limit(1).first
129     return locked_or_queued if not locked_or_queued.nil?
130
131     # No suitable candidate found.
132     nil
133   end
134
135   def lock
136     with_lock do
137       if self.state == Locked
138         raise AlreadyLockedError
139       end
140       self.state = Locked
141       self.save!
142     end
143   end
144
145   def unlock
146     with_lock do
147       if self.state == Queued
148         raise InvalidStateTransitionError
149       end
150       self.state = Queued
151       self.save!
152     end
153   end
154
155   def self.readable_by(*users_list)
156     if users_list.select { |u| u.is_admin }.any?
157       return self
158     end
159     user_uuids = users_list.map { |u| u.uuid }
160     uuid_list = user_uuids + users_list.flat_map { |u| u.groups_i_can(:read) }
161     uuid_list.uniq!
162     permitted = "(SELECT head_uuid FROM links WHERE link_class='permission' AND tail_uuid IN (:uuids))"
163     joins(:container_requests).
164       where("container_requests.uuid IN #{permitted} OR "+
165             "container_requests.owner_uuid IN (:uuids)",
166             uuids: uuid_list)
167   end
168
169   protected
170
171   def fill_field_defaults
172     self.state ||= Queued
173     self.environment ||= {}
174     self.runtime_constraints ||= {}
175     self.mounts ||= {}
176     self.cwd ||= "."
177     self.priority ||= 1
178   end
179
180   def permission_to_create
181     current_user.andand.is_admin
182   end
183
184   def permission_to_update
185     current_user.andand.is_admin
186   end
187
188   def set_timestamps
189     if self.state_changed? and self.state == Running
190       self.started_at ||= db_current_time
191     end
192
193     if self.state_changed? and [Complete, Cancelled].include? self.state
194       self.finished_at ||= db_current_time
195     end
196   end
197
198   def validate_change
199     permitted = [:state]
200
201     if self.new_record?
202       permitted.push(:owner_uuid, :command, :container_image, :cwd,
203                      :environment, :mounts, :output_path, :priority,
204                      :runtime_constraints)
205     end
206
207     case self.state
208     when Queued, Locked
209       permitted.push :priority
210
211     when Running
212       permitted.push :priority, :progress
213       if self.state_changed?
214         permitted.push :started_at
215       end
216
217     when Complete
218       if self.state_was == Running
219         permitted.push :finished_at, :output, :log, :exit_code
220       end
221
222     when Cancelled
223       case self.state_was
224       when Running
225         permitted.push :finished_at, :output, :log
226       when Queued, Locked
227         permitted.push :finished_at
228       end
229
230     else
231       # The state_transitions check will add an error message for this
232       return false
233     end
234
235     check_update_whitelist permitted
236   end
237
238   def validate_lock
239     # If the Container is already locked by someone other than the
240     # current api_client_auth, disallow all changes -- except
241     # priority, which needs to change to reflect max(priority) of
242     # relevant ContainerRequests.
243     if locked_by_uuid_was
244       if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
245         check_update_whitelist [:priority]
246       end
247     end
248
249     if [Locked, Running].include? self.state
250       # If the Container was already locked, locked_by_uuid must not
251       # changes. Otherwise, the current auth gets the lock.
252       need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid
253     else
254       need_lock = nil
255     end
256
257     # The caller can provide a new value for locked_by_uuid, but only
258     # if it's exactly what we expect. This allows a caller to perform
259     # an update like {"state":"Unlocked","locked_by_uuid":null}.
260     if self.locked_by_uuid_changed?
261       if self.locked_by_uuid != need_lock
262         return errors.add :locked_by_uuid, "can only change to #{need_lock}"
263       end
264     end
265     self.locked_by_uuid = need_lock
266   end
267
268   def assign_auth
269     if self.auth_uuid_changed?
270       return errors.add :auth_uuid, 'is readonly'
271     end
272     if not [Locked, Running].include? self.state
273       # don't need one
274       self.auth.andand.update_attributes(expires_at: db_current_time)
275       self.auth = nil
276       return
277     elsif self.auth
278       # already have one
279       return
280     end
281     cr = ContainerRequest.
282       where('container_uuid=? and priority>0', self.uuid).
283       order('priority desc').
284       first
285     if !cr
286       return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
287     end
288     self.auth = ApiClientAuthorization.
289       create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
290               api_client_id: 0)
291   end
292
293   def sort_serialized_attrs
294     if self.environment_changed?
295       self.environment = self.class.deep_sort_hash(self.environment)
296     end
297     if self.mounts_changed?
298       self.mounts = self.class.deep_sort_hash(self.mounts)
299     end
300     if self.runtime_constraints_changed?
301       self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
302     end
303   end
304
305   def handle_completed
306     # This container is finished so finalize any associated container requests
307     # that are associated with this container.
308     if self.state_changed? and [Complete, Cancelled].include? self.state
309       act_as_system_user do
310         # Notify container requests associated with this container
311         ContainerRequest.where(container_uuid: uuid,
312                                state: ContainerRequest::Committed).each do |cr|
313           cr.container_completed!
314         end
315
316         # Try to cancel any outstanding container requests made by this container.
317         ContainerRequest.where(requesting_container_uuid: uuid,
318                                state: ContainerRequest::Committed).each do |cr|
319           cr.priority = 0
320           cr.save
321         end
322       end
323     end
324   end
325
326 end