2879: API server can find_or_create Jobs based on Docker image.
[arvados.git] / services / api / app / controllers / arvados / v1 / jobs_controller.rb
1 class Arvados::V1::JobsController < ApplicationController
2   accept_attribute_as_json :script_parameters, Hash
3   accept_attribute_as_json :runtime_constraints, Hash
4   accept_attribute_as_json :tasks_summary, Hash
5   skip_before_filter :find_object_by_uuid, :only => :queue
6   skip_before_filter :render_404_if_no_object, :only => :queue
7
8   def create
9     [:repository, :script, :script_version, :script_parameters].each do |r|
10       if !resource_attrs[r]
11         return render json: {
12           :errors => ["#{r} attribute must be specified"]
13         }, status: :unprocessable_entity
14       end
15     end
16     load_filters_param
17
18     # We used to ask for the minimum_, exclude_, and no_reuse params
19     # in the job resource. Now we advertise them as flags that alter
20     # the behavior of the create action.
21     [:minimum_script_version, :exclude_script_versions].each do |attr|
22       if resource_attrs.has_key? attr
23         params[attr] = resource_attrs.delete attr
24       end
25     end
26     if resource_attrs.has_key? :no_reuse
27       params[:find_or_create] = !resource_attrs.delete(:no_reuse)
28     end
29
30     if params[:find_or_create]
31       # Translate older creation parameters and special range operators
32       # into standard filters.
33       minimum_script_version = params[:minimum_script_version]
34       exclude_script_versions = params.fetch(:exclude_script_versions, [])
35       @filters.select do |filter|
36         case filter[0..1]
37         when ["script_version", "in range"]
38           minimum_script_version = filter.last
39           false
40         when ["script_version", "not in"], ["script_version", "not in range"]
41           begin
42             exclude_script_versions += filter.last
43           rescue TypeError
44             exclude_script_versions << filter.last
45           end
46           false
47         when ["docker_image_locator", "in range"], ["docker_image_locator", "not in range"]
48           filter[1].sub!(/ range$/, '')
49           filter[2] = Collection.uuids_for_docker_image(filter[2])
50           true
51         else
52           true
53         end
54       end
55       @filters.append(["script_version", "in",
56                        Commit.find_commit_range(current_user,
57                                                 resource_attrs[:repository],
58                                                 minimum_script_version,
59                                                 resource_attrs[:script_version],
60                                                 exclude_script_versions)])
61
62       # Set up default filters for specific parameters.
63       if @filters.select { |f| f.first == "script" }.empty?
64         @filters.append(["script", "=", resource_attrs[:script]])
65       end
66       if @filters.select { |f| f.first == "docker_image_locator" }.empty?
67         if image_search = resource_attrs[:runtime_constraints].andand["docker_image"]
68           image_tag = resource_attrs[:runtime_constraints]["docker_image_tag"]
69           image_locator =
70             Collection.uuids_for_docker_image(image_search, image_tag).last
71           return super if image_locator.nil?  # We won't find anything to reuse.
72           @filters.append(["docker_image_locator", "=", image_locator])
73         else
74           @filters.append(["docker_image_locator", "=", nil])
75         end
76       end
77
78       # Search for a reusable Job, and return it if found.
79       @objects = Job.readable_by(current_user)
80       apply_filters
81       @object = nil
82       incomplete_job = nil
83       @objects.each do |j|
84         if j.nondeterministic != true and
85             ((j.success == true and j.output != nil) or j.running == true) and
86             j.script_parameters == resource_attrs[:script_parameters]
87           if j.running
88             # We'll use this if we don't find a job that has completed
89             incomplete_job ||= j
90           else
91             # Record the first job in the list
92             if !@object
93               @object = j
94             end
95             # Ensure that all candidate jobs actually did produce the same output
96             if @object.output != j.output
97               @object = nil
98               break
99             end
100           end
101         end
102         @object ||= incomplete_job
103         if @object
104           return show
105         end
106       end
107     end
108
109     super
110   end
111
112   def cancel
113     reload_object_before_update
114     @object.update_attributes! cancelled_at: Time.now
115     show
116   end
117
118   class LogStreamer
119     Q_UPDATE_INTERVAL = 12
120     def initialize(job, opts={})
121       @job = job
122       @opts = opts
123     end
124     def each
125       if @job.finished_at
126         yield "#{@job.uuid} finished at #{@job.finished_at}\n"
127         return
128       end
129       while not @job.started_at
130         # send a summary (job queue + available nodes) to the client
131         # every few seconds while waiting for the job to start
132         last_ack_at ||= Time.now - Q_UPDATE_INTERVAL - 1
133         if Time.now - last_ack_at >= Q_UPDATE_INTERVAL
134           nodes_in_state = {idle: 0, alloc: 0}
135           ActiveRecord::Base.uncached do
136             Node.where('hostname is not ?', nil).collect do |n|
137               if n.info[:slurm_state]
138                 nodes_in_state[n.info[:slurm_state]] ||= 0
139                 nodes_in_state[n.info[:slurm_state]] += 1
140               end
141             end
142           end
143           job_queue = Job.queue
144           n_queued_before_me = 0
145           job_queue.each do |j|
146             break if j.uuid == @job.uuid
147             n_queued_before_me += 1
148           end
149           yield "#{Time.now}" \
150             " job #{@job.uuid}" \
151             " queue_position #{n_queued_before_me}" \
152             " queue_size #{job_queue.size}" \
153             " nodes_idle #{nodes_in_state[:idle]}" \
154             " nodes_alloc #{nodes_in_state[:alloc]}\n"
155           last_ack_at = Time.now
156         end
157         sleep 3
158         ActiveRecord::Base.uncached do
159           @job.reload
160         end
161       end
162     end
163   end
164
165   def queue
166     params[:order] ||= ['priority desc', 'created_at']
167     load_limit_offset_order_params
168     load_where_param
169     @where.merge!({
170                     started_at: nil,
171                     is_locked_by_uuid: nil,
172                     cancelled_at: nil,
173                     success: nil
174                   })
175     load_filters_param
176     find_objects_for_index
177     index
178   end
179
180   def self._queue_requires_parameters
181     self._index_requires_parameters
182   end
183 end