Merge branch 'master' into 11850-singlecontainer-max-requirements
[arvados.git] / services / api / app / controllers / arvados / v1 / jobs_controller.rb
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 class Arvados::V1::JobsController < ApplicationController
6   accept_attribute_as_json :components, Hash
7   accept_attribute_as_json :script_parameters, Hash
8   accept_attribute_as_json :runtime_constraints, Hash
9   accept_attribute_as_json :tasks_summary, Hash
10   skip_before_filter :find_object_by_uuid, :only => [:queue, :queue_size]
11   skip_before_filter :render_404_if_no_object, :only => [:queue, :queue_size]
12
13   include DbCurrentTime
14
15   def create
16     [:repository, :script, :script_version, :script_parameters].each do |r|
17       if !resource_attrs[r]
18         return send_error("#{r} attribute must be specified",
19                           status: :unprocessable_entity)
20       end
21     end
22
23     # We used to ask for the minimum_, exclude_, and no_reuse params
24     # in the job resource. Now we advertise them as flags that alter
25     # the behavior of the create action.
26     [:minimum_script_version, :exclude_script_versions].each do |attr|
27       if resource_attrs.has_key? attr
28         params[attr] = resource_attrs.delete attr
29       end
30     end
31     if resource_attrs.has_key? :no_reuse
32       params[:find_or_create] = !resource_attrs.delete(:no_reuse)
33     end
34
35     return super if !params[:find_or_create]
36     return if !load_filters_param
37
38     begin
39       @object = Job.find_reusable(resource_attrs, params, @filters, @read_users)
40     rescue ArgumentError => error
41       return send_error(error.message)
42     end
43
44     if @object
45       show
46     else
47       super
48     end
49   end
50
51   def cancel
52     reload_object_before_update
53     @object.cancel cascade: params[:cascade]
54     show
55   end
56
57   def lock
58     @object.lock current_user.uuid
59     show
60   end
61
62   class LogStreamer
63     Q_UPDATE_INTERVAL = 12
64     def initialize(job, opts={})
65       @job = job
66       @opts = opts
67     end
68     def each
69       if @job.finished_at
70         yield "#{@job.uuid} finished at #{@job.finished_at}\n"
71         return
72       end
73       while not @job.started_at
74         # send a summary (job queue + available nodes) to the client
75         # every few seconds while waiting for the job to start
76         current_time = db_current_time
77         last_ack_at ||= current_time - Q_UPDATE_INTERVAL - 1
78         if current_time - last_ack_at >= Q_UPDATE_INTERVAL
79           nodes_in_state = {idle: 0, alloc: 0}
80           ActiveRecord::Base.uncached do
81             Node.where('hostname is not ?', nil).collect do |n|
82               if n.info[:slurm_state]
83                 nodes_in_state[n.info[:slurm_state]] ||= 0
84                 nodes_in_state[n.info[:slurm_state]] += 1
85               end
86             end
87           end
88           job_queue = Job.queue.select(:uuid)
89           n_queued_before_me = 0
90           job_queue.each do |j|
91             break if j.uuid == @job.uuid
92             n_queued_before_me += 1
93           end
94           yield "#{db_current_time}" \
95             " job #{@job.uuid}" \
96             " queue_position #{n_queued_before_me}" \
97             " queue_size #{job_queue.count}" \
98             " nodes_idle #{nodes_in_state[:idle]}" \
99             " nodes_alloc #{nodes_in_state[:alloc]}\n"
100           last_ack_at = db_current_time
101         end
102         sleep 3
103         ActiveRecord::Base.uncached do
104           @job.reload
105         end
106       end
107     end
108   end
109
110   def queue
111     params[:order] ||= ['priority desc', 'created_at']
112     load_limit_offset_order_params
113     load_where_param
114     @where.merge!({state: Job::Queued})
115     return if !load_filters_param
116     find_objects_for_index
117     index
118   end
119
120   def queue_size
121     # Users may not be allowed to see all the jobs in the queue, so provide a
122     # method to get just the queue size in order to get a gist of how busy the
123     # cluster is.
124     render :json => {:queue_size => Job.queue.size}
125   end
126
127   def self._create_requires_parameters
128     (super rescue {}).
129       merge({
130               find_or_create: {
131                 type: 'boolean', required: false, default: false
132               },
133               filters: {
134                 type: 'array', required: false
135               },
136               minimum_script_version: {
137                 type: 'string', required: false
138               },
139               exclude_script_versions: {
140                 type: 'array', required: false
141               },
142             })
143   end
144
145   def self._queue_requires_parameters
146     self._index_requires_parameters
147   end
148
149   protected
150
151   def load_filters_param
152     begin
153       super
154       attrs = resource_attrs rescue {}
155       @filters = Job.load_job_specific_filters attrs, @filters, @read_users
156     rescue ArgumentError => error
157       send_error(error.message)
158       false
159     else
160       true
161     end
162   end
163 end