Merge branch '2596-refactor-pipeline-create'
[arvados.git] / services / api / app / controllers / arvados / v1 / jobs_controller.rb
1 class Arvados::V1::JobsController < ApplicationController
2   accept_attribute_as_json :script_parameters, Hash
3   accept_attribute_as_json :runtime_constraints, Hash
4   accept_attribute_as_json :tasks_summary, Hash
5   skip_before_filter :find_object_by_uuid, :only => :queue
6   skip_before_filter :render_404_if_no_object, :only => :queue
7
8   def create
9     [:repository, :script, :script_version, :script_parameters].each do |r|
10       if !resource_attrs[r]
11         return render json: {
12           :error => "#{r} attribute must be specified"
13         }, status: :unprocessable_entity
14       end
15     end
16
17     r = Commit.find_commit_range(current_user,
18                                  resource_attrs[:repository],
19                                  resource_attrs[:minimum_script_version],
20                                  resource_attrs[:script_version],
21                                  resource_attrs[:exclude_script_versions])
22     if !resource_attrs[:nondeterministic] and !resource_attrs[:no_reuse]
23       # Search for jobs whose script_version is in the list of commits
24       # returned by find_commit_range
25       @object = nil
26       incomplete_job = nil
27       Job.readable_by(current_user).where(script: resource_attrs[:script],
28                                           script_version: r).
29         each do |j|
30         if j.nondeterministic != true and
31             ((j.success == true and j.output != nil) or j.running == true) and
32             j.script_parameters == resource_attrs[:script_parameters]
33           if j.running
34             # We'll use this if we don't find a job that has completed
35             incomplete_job ||= j
36           else
37             # Record the first job in the list
38             if !@object
39               @object = j
40             end
41             # Ensure that all candidate jobs actually did produce the same output
42             if @object.output != j.output
43               @object = nil
44               break
45             end
46           end
47         end
48         @object ||= incomplete_job
49         if @object
50           return show
51         end
52       end
53     end
54     if r
55       resource_attrs[:script_version] = r[0]
56     end
57
58     # Don't pass these on to activerecord
59     resource_attrs.delete(:minimum_script_version)
60     resource_attrs.delete(:exclude_script_versions)
61     resource_attrs.delete(:no_reuse)
62     super
63   end
64
65   def cancel
66     reload_object_before_update
67     @object.update_attributes! cancelled_at: Time.now
68     show
69   end
70
71   class LogStreamer
72     Q_UPDATE_INTERVAL = 12
73     def initialize(job, opts={})
74       @job = job
75       @opts = opts
76     end
77     def each
78       if @job.finished_at
79         yield "#{@job.uuid} finished at #{@job.finished_at}\n"
80         return
81       end
82       while not @job.started_at
83         # send a summary (job queue + available nodes) to the client
84         # every few seconds while waiting for the job to start
85         last_ack_at ||= Time.now - Q_UPDATE_INTERVAL - 1
86         if Time.now - last_ack_at >= Q_UPDATE_INTERVAL
87           nodes_in_state = {idle: 0, alloc: 0}
88           ActiveRecord::Base.uncached do
89             Node.where('hostname is not ?', nil).collect do |n|
90               if n.info[:slurm_state]
91                 nodes_in_state[n.info[:slurm_state]] ||= 0
92                 nodes_in_state[n.info[:slurm_state]] += 1
93               end
94             end
95           end
96           job_queue = Job.queue
97           n_queued_before_me = 0
98           job_queue.each do |j|
99             break if j.uuid == @job.uuid
100             n_queued_before_me += 1
101           end
102           yield "#{Time.now}" \
103             " job #{@job.uuid}" \
104             " queue_position #{n_queued_before_me}" \
105             " queue_size #{job_queue.size}" \
106             " nodes_idle #{nodes_in_state[:idle]}" \
107             " nodes_alloc #{nodes_in_state[:alloc]}\n"
108           last_ack_at = Time.now
109         end
110         sleep 3
111         ActiveRecord::Base.uncached do
112           @job.reload
113         end
114       end
115       @redis = Redis.new(:timeout => 0)
116       if @redis.exists @job.uuid
117         # A log buffer exists. Start by showing the last few KB.
118         @redis.
119           getrange(@job.uuid, 0 - [@opts[:buffer_size], 1].max, -1).
120           sub(/^[^\n]*\n?/, '').
121           split("\n").
122           each do |line|
123           yield "#{line}\n"
124         end
125       end
126       # TODO: avoid missing log entries between getrange() above and
127       # subscribe() below.
128       @redis.subscribe(@job.uuid) do |event|
129         event.message do |channel, msg|
130           if msg == "end"
131             @redis.unsubscribe @job.uuid
132           else
133             yield "#{msg}\n"
134           end
135         end
136       end
137     end
138   end
139
140   def self._log_tail_follow_requires_parameters
141     {
142       buffer_size: {type: 'integer', required: false, default: 2**13}
143     }
144   end
145   def log_tail_follow
146     if !@object.andand.uuid
147       return render_not_found
148     end
149     if client_accepts_plain_text_stream
150       self.response.headers['Last-Modified'] = Time.now.ctime.to_s
151       self.response_body = LogStreamer.new @object, {
152         buffer_size: (params[:buffer_size].to_i rescue 2**13)
153       }
154     else
155       render json: {
156         href: url_for(uuid: @object.uuid),
157         comment: ('To retrieve the log stream as plain text, ' +
158                   'use a request header like "Accept: text/plain"')
159       }
160     end
161   end
162
163   def queue
164     load_where_param
165     @where.merge!({
166                     started_at: nil,
167                     is_locked_by_uuid: nil,
168                     cancelled_at: nil,
169                     success: nil
170                   })
171     params[:order] ||= 'priority desc, created_at'
172     find_objects_for_index
173     index
174   end
175
176   def self._queue_requires_parameters
177     self._index_requires_parameters
178   end
179 end