Merge branch '8840-lock-job-record' closes #8840
[arvados.git] / services / api / app / controllers / arvados / v1 / jobs_controller.rb
1 class Arvados::V1::JobsController < ApplicationController
2   accept_attribute_as_json :components, Hash
3   accept_attribute_as_json :script_parameters, Hash
4   accept_attribute_as_json :runtime_constraints, Hash
5   accept_attribute_as_json :tasks_summary, Hash
6   skip_before_filter :find_object_by_uuid, :only => [:queue, :queue_size]
7   skip_before_filter :render_404_if_no_object, :only => [:queue, :queue_size]
8
9   include DbCurrentTime
10
11   def create
12     [:repository, :script, :script_version, :script_parameters].each do |r|
13       if !resource_attrs[r]
14         return send_error("#{r} attribute must be specified",
15                           status: :unprocessable_entity)
16       end
17     end
18
19     # We used to ask for the minimum_, exclude_, and no_reuse params
20     # in the job resource. Now we advertise them as flags that alter
21     # the behavior of the create action.
22     [:minimum_script_version, :exclude_script_versions].each do |attr|
23       if resource_attrs.has_key? attr
24         params[attr] = resource_attrs.delete attr
25       end
26     end
27     if resource_attrs.has_key? :no_reuse
28       params[:find_or_create] = !resource_attrs.delete(:no_reuse)
29     end
30
31     if params[:find_or_create]
32       return if false.equal?(load_filters_param)
33       if @filters.empty?  # Translate older creation parameters into filters.
34         @filters =
35           [["repository", "=", resource_attrs[:repository]],
36            ["script", "=", resource_attrs[:script]],
37            ["script_version", "not in git", params[:exclude_script_versions]],
38           ].reject { |filter| filter.last.nil? or filter.last.empty? }
39         if !params[:minimum_script_version].blank?
40           @filters << ["script_version", "in git",
41                        params[:minimum_script_version]]
42         else
43           add_default_git_filter("script_version", resource_attrs[:repository],
44                                  resource_attrs[:script_version])
45         end
46         if image_search = resource_attrs[:runtime_constraints].andand["docker_image"]
47           if image_tag = resource_attrs[:runtime_constraints]["docker_image_tag"]
48             image_search += ":#{image_tag}"
49           end
50           image_locator = Collection.
51             for_latest_docker_image(image_search).andand.portable_data_hash
52         else
53           image_locator = nil
54         end
55         @filters << ["docker_image_locator", "=", image_locator]
56         if sdk_version = resource_attrs[:runtime_constraints].andand["arvados_sdk_version"]
57           add_default_git_filter("arvados_sdk_version", "arvados", sdk_version)
58         end
59         begin
60           load_job_specific_filters
61         rescue ArgumentError => error
62           return send_error(error.message)
63         end
64       end
65
66       # Check specified filters for some reasonableness.
67       filter_names = @filters.map { |f| f.first }.uniq
68       ["repository", "script"].each do |req_filter|
69         if not filter_names.include?(req_filter)
70           return send_error("#{req_filter} filter required")
71         end
72       end
73
74       # Search for a reusable Job, and return it if found.
75       @objects = Job.readable_by(current_user)
76       apply_filters
77       @object = nil
78       incomplete_job = nil
79       @objects.each do |j|
80         if j.nondeterministic != true and
81             ["Queued", "Running", "Complete"].include?(j.state) and
82             j.script_parameters == resource_attrs[:script_parameters]
83           if j.state != "Complete" && j.owner_uuid == current_user.uuid
84             # We'll use this if we don't find a job that has completed
85             incomplete_job ||= j
86           else
87             if Collection.readable_by(current_user).find_by_portable_data_hash(j.output)
88               # Record the first job in the list
89               if !@object
90                 @object = j
91               end
92               # Ensure that all candidate jobs actually did produce the same output
93               if @object.output != j.output
94                 @object = nil
95                 break
96               end
97             end
98           end
99         end
100         @object ||= incomplete_job
101         if @object
102           return show
103         end
104       end
105     end
106
107     super
108   end
109
110   def cancel
111     reload_object_before_update
112     @object.update_attributes! state: Job::Cancelled
113     show
114   end
115
116   def lock
117     @object.lock current_user.uuid
118     show
119   end
120
121   class LogStreamer
122     Q_UPDATE_INTERVAL = 12
123     def initialize(job, opts={})
124       @job = job
125       @opts = opts
126     end
127     def each
128       if @job.finished_at
129         yield "#{@job.uuid} finished at #{@job.finished_at}\n"
130         return
131       end
132       while not @job.started_at
133         # send a summary (job queue + available nodes) to the client
134         # every few seconds while waiting for the job to start
135         current_time = db_current_time
136         last_ack_at ||= current_time - Q_UPDATE_INTERVAL - 1
137         if current_time - last_ack_at >= Q_UPDATE_INTERVAL
138           nodes_in_state = {idle: 0, alloc: 0}
139           ActiveRecord::Base.uncached do
140             Node.where('hostname is not ?', nil).collect do |n|
141               if n.info[:slurm_state]
142                 nodes_in_state[n.info[:slurm_state]] ||= 0
143                 nodes_in_state[n.info[:slurm_state]] += 1
144               end
145             end
146           end
147           job_queue = Job.queue.select(:uuid)
148           n_queued_before_me = 0
149           job_queue.each do |j|
150             break if j.uuid == @job.uuid
151             n_queued_before_me += 1
152           end
153           yield "#{db_current_time}" \
154             " job #{@job.uuid}" \
155             " queue_position #{n_queued_before_me}" \
156             " queue_size #{job_queue.count}" \
157             " nodes_idle #{nodes_in_state[:idle]}" \
158             " nodes_alloc #{nodes_in_state[:alloc]}\n"
159           last_ack_at = db_current_time
160         end
161         sleep 3
162         ActiveRecord::Base.uncached do
163           @job.reload
164         end
165       end
166     end
167   end
168
169   def queue
170     params[:order] ||= ['priority desc', 'created_at']
171     load_limit_offset_order_params
172     load_where_param
173     @where.merge!({state: Job::Queued})
174     return if false.equal?(load_filters_param)
175     find_objects_for_index
176     index
177   end
178
179   def queue_size
180     # Users may not be allowed to see all the jobs in the queue, so provide a
181     # method to get just the queue size in order to get a gist of how busy the
182     # cluster is.
183     render :json => {:queue_size => Job.queue.size}
184   end
185
186   def self._create_requires_parameters
187     (super rescue {}).
188       merge({
189               find_or_create: {
190                 type: 'boolean', required: false, default: false
191               },
192               filters: {
193                 type: 'array', required: false
194               },
195               minimum_script_version: {
196                 type: 'string', required: false
197               },
198               exclude_script_versions: {
199                 type: 'array', required: false
200               },
201             })
202   end
203
204   def self._queue_requires_parameters
205     self._index_requires_parameters
206   end
207
208   protected
209
210   def add_default_git_filter(attr_name, repo_name, refspec)
211     # Add a filter to @filters for `attr_name` = the latest commit available
212     # in `repo_name` at `refspec`.  No filter is added if refspec can't be
213     # resolved.
214     commits = Commit.find_commit_range(repo_name, nil, refspec, nil)
215     if commit_hash = commits.first
216       @filters << [attr_name, "=", commit_hash]
217     end
218   end
219
220   def load_job_specific_filters
221     # Convert Job-specific @filters entries into general SQL filters.
222     script_info = {"repository" => nil, "script" => nil}
223     git_filters = Hash.new do |hash, key|
224       hash[key] = {"max_version" => "HEAD", "exclude_versions" => []}
225     end
226     @filters.select! do |(attr, operator, operand)|
227       if (script_info.has_key? attr) and (operator == "=")
228         if script_info[attr].nil?
229           script_info[attr] = operand
230         elsif script_info[attr] != operand
231           raise ArgumentError.new("incompatible #{attr} filters")
232         end
233       end
234       case operator
235       when "in git"
236         git_filters[attr]["min_version"] = operand
237         false
238       when "not in git"
239         git_filters[attr]["exclude_versions"] += Array.wrap(operand)
240         false
241       when "in docker", "not in docker"
242         image_hashes = Array.wrap(operand).flat_map do |search_term|
243           image_search, image_tag = search_term.split(':', 2)
244           Collection.
245             find_all_for_docker_image(image_search, image_tag, @read_users).
246             map(&:portable_data_hash)
247         end
248         @filters << [attr, operator.sub(/ docker$/, ""), image_hashes]
249         false
250       else
251         true
252       end
253     end
254
255     # Build a real script_version filter from any "not? in git" filters.
256     git_filters.each_pair do |attr, filter|
257       case attr
258       when "script_version"
259         script_info.each_pair do |key, value|
260           if value.nil?
261             raise ArgumentError.new("script_version filter needs #{key} filter")
262           end
263         end
264         filter["repository"] = script_info["repository"]
265         begin
266           filter["max_version"] = resource_attrs[:script_version]
267         rescue
268           # Using HEAD, set earlier by the hash default, is fine.
269         end
270       when "arvados_sdk_version"
271         filter["repository"] = "arvados"
272       else
273         raise ArgumentError.new("unknown attribute for git filter: #{attr}")
274       end
275       revisions = Commit.find_commit_range(filter["repository"],
276                                            filter["min_version"],
277                                            filter["max_version"],
278                                            filter["exclude_versions"])
279       if revisions.empty?
280         raise ArgumentError.
281           new("error searching #{filter['repository']} from " +
282               "'#{filter['min_version']}' to '#{filter['max_version']}', " +
283               "excluding #{filter['exclude_versions']}")
284       end
285       @filters.append([attr, "in", revisions])
286     end
287   end
288
289   def load_filters_param
290     begin
291       super
292       load_job_specific_filters
293     rescue ArgumentError => error
294       send_error(error.message)
295       false
296     end
297   end
298 end