1 class Job < ArvadosModel
4 include CommonApiTemplate
5 attr_protected :docker_image_locator
6 serialize :script_parameters, Hash
7 serialize :runtime_constraints, Hash
8 serialize :tasks_summary, Hash
9 before_create :ensure_unique_submit_id
10 after_commit :trigger_crunch_dispatch_if_cancelled, :on => :update
11 before_validation :set_priority
12 before_validation :update_timestamps_when_state_changes
13 before_validation :update_state_from_old_state_attrs
14 validate :ensure_script_version_is_commit
15 validate :find_docker_image_locator
16 validate :validate_status
17 validate :validate_state_change
19 has_many :commit_ancestors, :foreign_key => :descendant, :primary_key => :script_version
20 has_many(:nodes, foreign_key: :job_uuid, primary_key: :uuid)
22 class SubmitIdReused < StandardError
25 api_accessible :user, extend: :common do |t|
29 t.add :script_parameters
32 t.add :cancelled_by_client_uuid
33 t.add :cancelled_by_user_uuid
40 t.add :is_locked_by_uuid
42 t.add :runtime_constraints
45 t.add :nondeterministic
47 t.add :supplied_script_version
48 t.add :docker_image_locator
54 # Supported states for a job
57 (Running = 'Running'),
58 (Cancelled = 'Cancelled'),
60 (Complete = 'Complete'),
64 update_attributes(finished_at: finished_at || Time.now,
65 success: success.nil? ? false : success,
74 self.where('started_at is ? and is_locked_by_uuid is ? and cancelled_at is ? and success is ?',
76 order('priority desc, created_at')
81 Job::queue.each do |j|
82 if j[:uuid] == self.uuid
90 self.where('running = ?', true).
91 order('priority desc, created_at')
94 def lock locked_by_uuid
97 unless self.state == Queued and self.is_locked_by_uuid.nil?
98 raise ConflictError.new
101 self.is_locked_by_uuid = locked_by_uuid
108 def foreign_key_attributes
109 super + %w(output log)
112 def skip_uuid_read_permission_check
113 super + %w(cancelled_by_client_uuid)
116 def skip_uuid_existence_check
117 super + %w(output log)
121 if self.priority.nil?
127 def ensure_script_version_is_commit
128 if self.state == Running
129 # Apparently client has already decided to go for it. This is
130 # needed to run a local job using a local working directory
131 # instead of a commit-ish.
134 if new_record? or script_version_changed?
135 sha1 = Commit.find_commit_range(current_user, self.repository, nil, self.script_version, nil)[0] rescue nil
137 self.supplied_script_version = self.script_version if self.supplied_script_version.nil? or self.supplied_script_version.empty?
138 self.script_version = sha1
140 self.errors.add :script_version, "#{self.script_version} does not resolve to a commit"
146 def ensure_unique_submit_id
148 if Job.where('submit_id=?',self.submit_id).first
149 raise SubmitIdReused.new
155 def find_docker_image_locator
156 # Find the Collection that holds the Docker image specified in the
157 # runtime constraints, and store its locator in docker_image_locator.
158 unless runtime_constraints.is_a? Hash
159 # We're still in validation stage, so we can't assume
160 # runtime_constraints isn't something horrible like an array or
161 # a string. Treat those cases as "no docker image supplied";
162 # other validations will fail anyway.
163 self.docker_image_locator = nil
166 image_search = runtime_constraints['docker_image']
167 image_tag = runtime_constraints['docker_image_tag']
169 self.docker_image_locator = nil
171 elsif coll = Collection.for_latest_docker_image(image_search, image_tag)
172 self.docker_image_locator = coll.portable_data_hash
175 errors.add(:docker_image_locator, "not found for #{image_search}")
182 queue = self.script_parameters.values
183 while not queue.empty?
184 queue = queue.flatten.compact.collect do |v|
188 v.match(/^(([0-9a-f]{32})\b(\+[^,]+)?,?)*$/) do |locator|
189 deps[locator.to_s] = true
198 def permission_to_update
199 if is_locked_by_uuid_was and !(current_user and
200 (current_user.uuid == is_locked_by_uuid_was or
201 current_user.uuid == system_user.uuid))
202 if script_changed? or
203 script_parameters_changed? or
204 script_version_changed? or
205 (!cancelled_at_was.nil? and
206 (cancelled_by_client_uuid_changed? or
207 cancelled_by_user_uuid_changed? or
208 cancelled_at_changed?)) or
209 started_at_changed? or
210 finished_at_changed? or
215 tasks_summary_changed? or
217 logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}"
221 if !is_locked_by_uuid_changed?
225 logger.warn "Anonymous user tried to change lock on #{self.class.to_s} #{uuid_was}"
227 elsif is_locked_by_uuid_was and is_locked_by_uuid_was != current_user.uuid
228 logger.warn "User #{current_user.uuid} tried to steal lock on #{self.class.to_s} #{uuid_was} from #{is_locked_by_uuid_was}"
230 elsif !is_locked_by_uuid.nil? and is_locked_by_uuid != current_user.uuid
231 logger.warn "User #{current_user.uuid} tried to lock #{self.class.to_s} #{uuid_was} with uuid #{is_locked_by_uuid}"
239 def update_modified_by_fields
240 if self.cancelled_at_changed?
241 # Ensure cancelled_at cannot be set to arbitrary non-now times,
242 # or changed once it is set.
243 if self.cancelled_at and not self.cancelled_at_was
244 self.cancelled_at = Time.now
245 self.cancelled_by_user_uuid = current_user.uuid
246 self.cancelled_by_client_uuid = current_api_client.andand.uuid
247 @need_crunch_dispatch_trigger = true
249 self.cancelled_at = self.cancelled_at_was
250 self.cancelled_by_user_uuid = self.cancelled_by_user_uuid_was
251 self.cancelled_by_client_uuid = self.cancelled_by_client_uuid_was
257 def trigger_crunch_dispatch_if_cancelled
258 if @need_crunch_dispatch_trigger
259 File.open(Rails.configuration.crunch_refresh_trigger, 'wb') do
260 # That's all, just create/touch a file for crunch-job to see.
265 def update_timestamps_when_state_changes
266 return if not (state_changed? or new_record?)
269 self.started_at ||= Time.now
270 when Failed, Complete
271 self.finished_at ||= Time.now
273 self.cancelled_at ||= Time.now
276 # TODO: Remove the following case block when old "success" and
277 # "running" attrs go away. Until then, this ensures we still
278 # expose correct success/running flags to older clients, even if
279 # some new clients are writing only the new state attribute.
287 when Cancelled, Failed
294 self.running ||= false # Default to false instead of nil.
299 def update_state_from_old_state_attrs
300 # If a client has touched the legacy state attrs, update the
301 # "state" attr to agree with the updated values of the legacy
304 # TODO: Remove this method when old "success" and "running" attrs
306 if cancelled_at_changed? or
311 self.state = Cancelled
312 elsif success == false
314 elsif success == true
315 self.state = Complete
316 elsif running == true
326 if self.state.in?(States)
329 errors.add :state, "#{state.inspect} must be one of: #{States.inspect}"
334 def validate_state_change
335 if self.state_changed?
336 if self.state_was.in? [Complete, Failed, Cancelled]
337 # Once in a finished state, don't permit any changes
338 errors.add :state, "invalid change from #{self.state_was} to #{self.state}"
340 elsif self.state_was == Running and not self.state.in? [Complete, Failed, Cancelled]
341 # From running, can only transition to a finished state
342 errors.add :state, "invalid change from #{self.state_was} to #{self.state}"