X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/57ab92d3d7da06e1de3fcb429150883489f9a85c..c402c5c3ef362f2848f0dc9081195ced9d7b84aa:/services/api/app/controllers/arvados/v1/jobs_controller.rb diff --git a/services/api/app/controllers/arvados/v1/jobs_controller.rb b/services/api/app/controllers/arvados/v1/jobs_controller.rb index c330da5122..41cbfd4b0e 100644 --- a/services/api/app/controllers/arvados/v1/jobs_controller.rb +++ b/services/api/app/controllers/arvados/v1/jobs_controller.rb @@ -1,6 +1,6 @@ class Arvados::V1::JobsController < ApplicationController accept_attribute_as_json :script_parameters, Hash - accept_attribute_as_json :resource_limits, Hash + accept_attribute_as_json :runtime_constraints, Hash accept_attribute_as_json :tasks_summary, Hash skip_before_filter :find_object_by_uuid, :only => :queue @@ -41,16 +41,60 @@ class Arvados::V1::JobsController < ApplicationController end class LogStreamer - def initialize(job) + Q_UPDATE_INTERVAL = 12 + def initialize(job, opts={}) @job = job + @opts = opts end def each if @job.finished_at yield "#{@job.uuid} finished at #{@job.finished_at}\n" return end + while not @job.started_at + # send a summary (job queue + available nodes) to the client + # every few seconds while waiting for the job to start + last_ack_at ||= Time.now - Q_UPDATE_INTERVAL - 1 + if Time.now - last_ack_at >= Q_UPDATE_INTERVAL + nodes_in_state = {idle: 0, alloc: 0} + Node.where('hostname is not ?', nil).collect do |n| + if n.info[:slurm_state] + nodes_in_state[n.info[:slurm_state]] ||= 0 + nodes_in_state[n.info[:slurm_state]] += 1 + end + end + job_queue = Job.queue + n_queued_before_me = 0 + job_queue.each do |j| + break if j.uuid == @job.uuid + n_queued_before_me += 1 + end + yield "#{Time.now}" \ + " job #{@job.uuid}" \ + " queue_position #{n_queued_before_me}" \ + " queue_size #{job_queue.size}" \ + " nodes_idle #{nodes_in_state[:idle]}" \ + " nodes_alloc #{nodes_in_state[:alloc]}\n" + last_ack_at = Time.now + end + sleep 3 + @job.reload + end @redis = Redis.new(:timeout => 0) @redis.subscribe(@job.uuid) do |event| + if @redis.exists @job.uuid + # A log buffer exists. Start by showing the last few KB. + @redis. + getrange(@job.uuid, 0 - [@opts[:buffer_size], 1].max, -1). + sub(/^[^\n]*\n?/, ''). + split("\n"). + each do |line| + yield "#{line}\n" + end + end + # TODO: avoid duplicating the last few lines of the log + # file. Use the fact that timestamps are lexicographically + # ordered. event.message do |channel, msg| if msg == "end" @redis.unsubscribe @job.uuid @@ -62,12 +106,19 @@ class Arvados::V1::JobsController < ApplicationController end end + def self._log_tail_follow_requires_parameters + { + buffer_size: {type: 'integer', required: false} + } + end def log_tail_follow if !@object.andand.uuid return render_not_found end self.response.headers['Last-Modified'] = Time.now.ctime.to_s - self.response_body = LogStreamer.new @object + self.response_body = LogStreamer.new @object, { + buffer_size: (params[:buffer_size] || 2**13) + } end def queue