X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/482d0e7afcfbf3379512c4754ea896ac294c1b6a..e65863586e67c7def2eeb9f730f7835d94bd4341:/services/api/script/crunch-dispatch.rb diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb index 9e7097e98e..88840d53b6 100755 --- a/services/api/script/crunch-dispatch.rb +++ b/services/api/script/crunch-dispatch.rb @@ -11,12 +11,23 @@ $signal = {} end end +if ENV["CRUNCH_DISPATCH_LOCKFILE"] + lockfilename = ENV.delete "CRUNCH_DISPATCH_LOCKFILE" + lockfile = File.open(lockfilename, File::RDWR|File::CREAT, 0644) + unless lockfile.flock File::LOCK_EX|File::LOCK_NB + abort "Lock unavailable on #{lockfilename} - exit" + end +end + ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development" require File.dirname(__FILE__) + '/../config/boot' require File.dirname(__FILE__) + '/../config/environment' require 'open3' +$redis ||= Redis.new +LOG_BUFFER_SIZE = 2**20 + class Dispatcher include ApplicationHelper @@ -28,29 +39,57 @@ class Dispatcher @todo = Job.queue end - def start_jobs + def update_node_status if Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/ - @idle_slurm_nodes = 0 + @nodes_in_state = {idle: 0, alloc: 0, down: 0} + @node_state ||= {} + node_seen = {} begin - `sinfo`. + `sinfo --noheader -o '%n:%t'`. split("\n"). - collect { |line| line.match /(\d+) +idle/ }. - each do |re| - @idle_slurm_nodes = re[1].to_i if re + each do |line| + re = line.match /(\S+?):+(idle|alloc|down)/ + next if !re + + # sinfo tells us about a node N times if it is shared by N partitions + next if node_seen[re[1]] + node_seen[re[1]] = true + + # count nodes in each state + @nodes_in_state[re[2].to_sym] += 1 + + # update our database (and cache) when a node's state changes + if @node_state[re[1]] != re[2] + @node_state[re[1]] = re[2] + node = Node.where('hostname=?', re[1]).first + if node + $stderr.puts "dispatch: update #{re[1]} state to #{re[2]}" + node.info[:slurm_state] = re[2] + node.save + elsif re[2] != 'down' + $stderr.puts "dispatch: sinfo reports '#{re[1]}' is not down, but no node has that name" + end + end end rescue end end + end + def start_jobs @todo.each do |job| min_nodes = 1 begin - if job.resource_limits['min_nodes'] - min_nodes = begin job.resource_limits['min_nodes'].to_i rescue 1 end + if job.runtime_constraints['min_nodes'] + min_nodes = begin job.runtime_constraints['min_nodes'].to_i rescue 1 end end end - next if @idle_slurm_nodes and @idle_slurm_nodes < min_nodes + + begin + next if @nodes_in_state[:idle] < min_nodes + rescue + end next if @running[job.uuid] next if !take(job) @@ -61,6 +100,7 @@ class Dispatcher cmd_args = [] when :slurm_immediate cmd_args = ["salloc", + "--chdir=/", "--immediate", "--exclusive", "--no-kill", @@ -70,24 +110,41 @@ class Dispatcher raise "Unknown crunch_job_wrapper: #{Server::Application.config.crunch_job_wrapper}" end + if Server::Application.config.crunch_job_user + cmd_args.unshift("sudo", "-E", "-u", + Server::Application.config.crunch_job_user, + "PERLLIB=#{ENV['PERLLIB']}") + end + job_auth = ApiClientAuthorization. - new(user: User.where('uuid=?', job.modified_by_user).first, + new(user: User.where('uuid=?', job.modified_by_user_uuid).first, api_client_id: 0) job_auth.save - cmd_args << 'crunch-job' + cmd_args << (ENV['CRUNCH_JOB_BIN'] || `which crunch-job`.strip) cmd_args << '--job-api-token' cmd_args << job_auth.api_token cmd_args << '--job' cmd_args << job.uuid + if cmd_args[0] == '' + raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path." + end + commit = Commit.where(sha1: job.script_version).first if commit cmd_args << '--git-dir' - cmd_args << File. - join(Rails.configuration.git_repositories_dir, - commit.repository_name, - '.git') + if File.exists?(File. + join(Rails.configuration.git_repositories_dir, + commit.repository_name + '.git')) + cmd_args << File. + join(Rails.configuration.git_repositories_dir, + commit.repository_name + '.git') + else + cmd_args << File. + join(Rails.configuration.git_repositories_dir, + commit.repository_name, '.git') + end end $stderr.puts "dispatch: #{cmd_args.join ' '}" @@ -100,8 +157,14 @@ class Dispatcher untake(job) next end - $stderr.puts "dispatch: job #{job.uuid} start" - $stderr.puts "dispatch: child #{t.pid} start" + + $stderr.puts "dispatch: job #{job.uuid}" + start_banner = "dispatch: child #{t.pid} start #{Time.now.ctime.to_s}" + $stderr.puts start_banner + $redis.set job.uuid, start_banner + "\n" + $redis.publish job.uuid, start_banner + $redis.publish job.owner_uuid, start_banner + @running[job.uuid] = { stdin: i, stdout: o, @@ -156,6 +219,16 @@ class Dispatcher lines.each do |line| $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) $stderr.puts line + pub_msg = "#{Time.now.ctime.to_s} #{line.strip}" + $redis.publish job.owner_uuid, pub_msg + $redis.publish job_uuid, pub_msg + $redis.append job_uuid, pub_msg + "\n" + if LOG_BUFFER_SIZE < $redis.strlen(job_uuid) + $redis.set(job_uuid, + $redis + .getrange(job_uuid, (LOG_BUFFER_SIZE >> 1), -1) + .sub(/^.*?\n/, '')) + end end end end @@ -200,6 +273,7 @@ class Dispatcher job_done = j_done[:job] $stderr.puts "dispatch: child #{pid_done} exit" $stderr.puts "dispatch: job #{job_done.uuid} end" + $redis.publish job_done.uuid, "end" # Ensure every last drop of stdout and stderr is consumed read_pipes @@ -236,6 +310,7 @@ class Dispatcher end else refresh_todo unless did_recently(:refresh_todo, 1.0) + update_node_status start_jobs unless @todo.empty? or did_recently(:start_jobs, 1.0) end reap_children