1 class Arvados::V1::JobsController < ApplicationController
2 accept_attribute_as_json :script_parameters, Hash
3 accept_attribute_as_json :runtime_constraints, Hash
4 accept_attribute_as_json :tasks_summary, Hash
5 skip_before_filter :find_object_by_uuid, :only => :queue
6 skip_before_filter :render_404_if_no_object, :only => :queue
9 [:repository, :script, :script_version, :script_parameters].each do |r|
12 :errors => ["#{r} attribute must be specified"]
13 }, status: :unprocessable_entity
18 # We used to ask for the minimum_, exclude_, and no_reuse params
19 # in the job resource. Now we advertise them as flags that alter
20 # the behavior of the create action.
21 [:minimum_script_version, :exclude_script_versions].each do |attr|
22 if resource_attrs.has_key? attr
23 params[attr] = resource_attrs.delete attr
26 if resource_attrs.has_key? :no_reuse
27 params[:find_or_create] = !resource_attrs.delete(:no_reuse)
30 if params[:find_or_create]
31 # Translate older creation parameters and special range operators
32 # into standard filters.
33 minimum_script_version = params[:minimum_script_version]
34 exclude_script_versions = params.fetch(:exclude_script_versions, [])
35 @filters.select do |filter|
37 when ["script_version", "in range"]
38 minimum_script_version = filter.last
40 when ["script_version", "not in"], ["script_version", "not in range"]
42 exclude_script_versions += filter.last
44 exclude_script_versions << filter.last
47 when ["docker_image_locator", "in range"], ["docker_image_locator", "not in range"]
48 filter[1].sub!(/ range$/, '')
49 filter[2] = Collection.uuids_for_docker_image(filter[2])
55 @filters.append(["script_version", "in",
56 Commit.find_commit_range(current_user,
57 resource_attrs[:repository],
58 minimum_script_version,
59 resource_attrs[:script_version],
60 exclude_script_versions)])
62 # Set up default filters for specific parameters.
63 if @filters.select { |f| f.first == "script" }.empty?
64 @filters.append(["script", "=", resource_attrs[:script]])
66 if @filters.select { |f| f.first == "docker_image_locator" }.empty?
67 if image_search = resource_attrs[:runtime_constraints].andand["docker_image"]
68 image_tag = resource_attrs[:runtime_constraints]["docker_image_tag"]
70 Collection.uuids_for_docker_image(image_search, image_tag).last
71 return super if image_locator.nil? # We won't find anything to reuse.
72 @filters.append(["docker_image_locator", "=", image_locator])
74 @filters.append(["docker_image_locator", "=", nil])
78 # Search for a reusable Job, and return it if found.
79 @objects = Job.readable_by(current_user)
84 if j.nondeterministic != true and
85 ((j.success == true and j.output != nil) or j.running == true) and
86 j.script_parameters == resource_attrs[:script_parameters]
88 # We'll use this if we don't find a job that has completed
91 # Record the first job in the list
95 # Ensure that all candidate jobs actually did produce the same output
96 if @object.output != j.output
102 @object ||= incomplete_job
113 reload_object_before_update
114 @object.update_attributes! cancelled_at: Time.now
119 Q_UPDATE_INTERVAL = 12
120 def initialize(job, opts={})
126 yield "#{@job.uuid} finished at #{@job.finished_at}\n"
129 while not @job.started_at
130 # send a summary (job queue + available nodes) to the client
131 # every few seconds while waiting for the job to start
132 last_ack_at ||= Time.now - Q_UPDATE_INTERVAL - 1
133 if Time.now - last_ack_at >= Q_UPDATE_INTERVAL
134 nodes_in_state = {idle: 0, alloc: 0}
135 ActiveRecord::Base.uncached do
136 Node.where('hostname is not ?', nil).collect do |n|
137 if n.info[:slurm_state]
138 nodes_in_state[n.info[:slurm_state]] ||= 0
139 nodes_in_state[n.info[:slurm_state]] += 1
143 job_queue = Job.queue
144 n_queued_before_me = 0
145 job_queue.each do |j|
146 break if j.uuid == @job.uuid
147 n_queued_before_me += 1
149 yield "#{Time.now}" \
150 " job #{@job.uuid}" \
151 " queue_position #{n_queued_before_me}" \
152 " queue_size #{job_queue.size}" \
153 " nodes_idle #{nodes_in_state[:idle]}" \
154 " nodes_alloc #{nodes_in_state[:alloc]}\n"
155 last_ack_at = Time.now
158 ActiveRecord::Base.uncached do
166 params[:order] ||= ['priority desc', 'created_at']
167 load_limit_offset_order_params
171 is_locked_by_uuid: nil,
176 find_objects_for_index
180 def self._queue_requires_parameters
181 self._index_requires_parameters