Merge branch '5539-better-docker' closes #5539
[arvados.git] / services / api / app / controllers / arvados / v1 / jobs_controller.rb
1 class Arvados::V1::JobsController < ApplicationController
2   accept_attribute_as_json :script_parameters, Hash
3   accept_attribute_as_json :runtime_constraints, Hash
4   accept_attribute_as_json :tasks_summary, Hash
5   skip_before_filter :find_object_by_uuid, :only => [:queue, :queue_size]
6   skip_before_filter :render_404_if_no_object, :only => [:queue, :queue_size]
7
8   include DbCurrentTime
9
10   def create
11     [:repository, :script, :script_version, :script_parameters].each do |r|
12       if !resource_attrs[r]
13         return send_error("#{r} attribute must be specified",
14                           status: :unprocessable_entity)
15       end
16     end
17
18     # We used to ask for the minimum_, exclude_, and no_reuse params
19     # in the job resource. Now we advertise them as flags that alter
20     # the behavior of the create action.
21     [:minimum_script_version, :exclude_script_versions].each do |attr|
22       if resource_attrs.has_key? attr
23         params[attr] = resource_attrs.delete attr
24       end
25     end
26     if resource_attrs.has_key? :no_reuse
27       params[:find_or_create] = !resource_attrs.delete(:no_reuse)
28     end
29
30     if params[:find_or_create]
31       return if false.equal?(load_filters_param)
32       if @filters.empty?  # Translate older creation parameters into filters.
33         @filters =
34           [["repository", "=", resource_attrs[:repository]],
35            ["script", "=", resource_attrs[:script]],
36            ["script_version", "in git",
37             params[:minimum_script_version] || resource_attrs[:script_version]],
38            ["script_version", "not in git", params[:exclude_script_versions]],
39           ].reject { |filter| filter.last.nil? or filter.last.empty? }
40         if image_search = resource_attrs[:runtime_constraints].andand["docker_image"]
41           if image_tag = resource_attrs[:runtime_constraints]["docker_image_tag"]
42             image_search += ":#{image_tag}"
43           end
44           @filters.append(["docker_image_locator", "in docker", image_search])
45         else
46           @filters.append(["docker_image_locator", "=", nil])
47         end
48         if sdk_version = resource_attrs[:runtime_constraints].andand["arvados_sdk_version"]
49           @filters.append(["arvados_sdk_version", "in git", sdk_version])
50         end
51         begin
52           load_job_specific_filters
53         rescue ArgumentError => error
54           return send_error(error.message)
55         end
56       end
57
58       # Check specified filters for some reasonableness.
59       filter_names = @filters.map { |f| f.first }.uniq
60       ["repository", "script"].each do |req_filter|
61         if not filter_names.include?(req_filter)
62           return send_error("#{req_filter} filter required")
63         end
64       end
65
66       # Search for a reusable Job, and return it if found.
67       @objects = Job.readable_by(current_user)
68       apply_filters
69       @object = nil
70       incomplete_job = nil
71       @objects.each do |j|
72         if j.nondeterministic != true and
73             ["Queued", "Running", "Complete"].include?(j.state) and
74             j.script_parameters == resource_attrs[:script_parameters]
75           if j.state != "Complete" && j.owner_uuid == current_user.uuid
76             # We'll use this if we don't find a job that has completed
77             incomplete_job ||= j
78           else
79             if Collection.readable_by(current_user).find_by_portable_data_hash(j.output)
80               # Record the first job in the list
81               if !@object
82                 @object = j
83               end
84               # Ensure that all candidate jobs actually did produce the same output
85               if @object.output != j.output
86                 @object = nil
87                 break
88               end
89             end
90           end
91         end
92         @object ||= incomplete_job
93         if @object
94           return show
95         end
96       end
97     end
98
99     super
100   end
101
102   def cancel
103     reload_object_before_update
104     @object.update_attributes! state: Job::Cancelled
105     show
106   end
107
108   def lock
109     @object.lock current_user.uuid
110     show
111   end
112
113   class LogStreamer
114     Q_UPDATE_INTERVAL = 12
115     def initialize(job, opts={})
116       @job = job
117       @opts = opts
118     end
119     def each
120       if @job.finished_at
121         yield "#{@job.uuid} finished at #{@job.finished_at}\n"
122         return
123       end
124       while not @job.started_at
125         # send a summary (job queue + available nodes) to the client
126         # every few seconds while waiting for the job to start
127         current_time = db_current_time
128         last_ack_at ||= current_time - Q_UPDATE_INTERVAL - 1
129         if current_time - last_ack_at >= Q_UPDATE_INTERVAL
130           nodes_in_state = {idle: 0, alloc: 0}
131           ActiveRecord::Base.uncached do
132             Node.where('hostname is not ?', nil).collect do |n|
133               if n.info[:slurm_state]
134                 nodes_in_state[n.info[:slurm_state]] ||= 0
135                 nodes_in_state[n.info[:slurm_state]] += 1
136               end
137             end
138           end
139           job_queue = Job.queue
140           n_queued_before_me = 0
141           job_queue.each do |j|
142             break if j.uuid == @job.uuid
143             n_queued_before_me += 1
144           end
145           yield "#{db_current_time}" \
146             " job #{@job.uuid}" \
147             " queue_position #{n_queued_before_me}" \
148             " queue_size #{job_queue.size}" \
149             " nodes_idle #{nodes_in_state[:idle]}" \
150             " nodes_alloc #{nodes_in_state[:alloc]}\n"
151           last_ack_at = db_current_time
152         end
153         sleep 3
154         ActiveRecord::Base.uncached do
155           @job.reload
156         end
157       end
158     end
159   end
160
161   def queue
162     params[:order] ||= ['priority desc', 'created_at']
163     load_limit_offset_order_params
164     load_where_param
165     @where.merge!({state: Job::Queued})
166     return if false.equal?(load_filters_param)
167     find_objects_for_index
168     index
169   end
170
171   def queue_size
172     # Users may not be allowed to see all the jobs in the queue, so provide a
173     # method to get just the queue size in order to get a gist of how busy the
174     # cluster is.
175     render :json => {:queue_size => Job.queue.size}
176   end
177
178   def self._create_requires_parameters
179     (super rescue {}).
180       merge({
181               find_or_create: {
182                 type: 'boolean', required: false, default: false
183               },
184               filters: {
185                 type: 'array', required: false
186               },
187               minimum_script_version: {
188                 type: 'string', required: false
189               },
190               exclude_script_versions: {
191                 type: 'array', required: false
192               },
193             })
194   end
195
196   def self._queue_requires_parameters
197     self._index_requires_parameters
198   end
199
200   protected
201
202   def load_job_specific_filters
203     # Convert Job-specific @filters entries into general SQL filters.
204     script_info = {"repository" => nil, "script" => nil}
205     git_filters = Hash.new do |hash, key|
206       hash[key] = {"max_version" => "HEAD", "exclude_versions" => []}
207     end
208     @filters.select! do |(attr, operator, operand)|
209       if (script_info.has_key? attr) and (operator == "=")
210         if script_info[attr].nil?
211           script_info[attr] = operand
212         elsif script_info[attr] != operand
213           raise ArgumentError.new("incompatible #{attr} filters")
214         end
215       end
216       case operator
217       when "in git"
218         git_filters[attr]["min_version"] = operand
219         false
220       when "not in git"
221         git_filters[attr]["exclude_versions"] += Array.wrap(operand)
222         false
223       when "in docker", "not in docker"
224         image_hashes = Array.wrap(operand).flat_map do |search_term|
225           image_search, image_tag = search_term.split(':', 2)
226           Collection.
227             find_all_for_docker_image(image_search, image_tag, @read_users).
228             map(&:portable_data_hash)
229         end
230         @filters << [attr, operator.sub(/ docker$/, ""), image_hashes]
231         false
232       else
233         true
234       end
235     end
236
237     # Build a real script_version filter from any "not? in git" filters.
238     git_filters.each_pair do |attr, filter|
239       case attr
240       when "script_version"
241         script_info.each_pair do |key, value|
242           if value.nil?
243             raise ArgumentError.new("script_version filter needs #{key} filter")
244           end
245         end
246         filter["repository"] = script_info["repository"]
247         begin
248           filter["max_version"] = resource_attrs[:script_version]
249         rescue
250           # Using HEAD, set earlier by the hash default, is fine.
251         end
252       when "arvados_sdk_version"
253         filter["repository"] = "arvados"
254       else
255         raise ArgumentError.new("unknown attribute for git filter: #{attr}")
256       end
257       version_range = Commit.find_commit_range(current_user,
258                                                filter["repository"],
259                                                filter["min_version"],
260                                                filter["max_version"],
261                                                filter["exclude_versions"])
262       if version_range.nil?
263         raise ArgumentError.
264           new("error searching #{filter['repository']} from " +
265               "'#{filter['min_version']}' to '#{filter['max_version']}', " +
266               "excluding #{filter['exclude_versions']}")
267       end
268       @filters.append([attr, "in", version_range])
269     end
270   end
271
272   def load_filters_param
273     begin
274       super
275       load_job_specific_filters
276     rescue ArgumentError => error
277       send_error(error.message)
278       false
279     end
280   end
281 end