X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1a03dd7a7d10a4843848ff60957c03110a09df43..2a64eae3cf8363c596feda5337ea20ce356ca11f:/services/api/script/crunch-dispatch.rb diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb index f749f22a77..f49f21b04c 100755 --- a/services/api/script/crunch-dispatch.rb +++ b/services/api/script/crunch-dispatch.rb @@ -1,7 +1,10 @@ #!/usr/bin/env ruby +require 'trollop' + include Process +$warned = {} $signal = {} %w{TERM INT}.each do |sig| signame = sig @@ -11,12 +14,26 @@ $signal = {} end end -ENV["RAILS_ENV"] = ARGV[0] || "development" +if ENV["CRUNCH_DISPATCH_LOCKFILE"] + lockfilename = ENV.delete "CRUNCH_DISPATCH_LOCKFILE" + lockfile = File.open(lockfilename, File::RDWR|File::CREAT, 0644) + unless lockfile.flock File::LOCK_EX|File::LOCK_NB + abort "Lock unavailable on #{lockfilename} - exit" + end +end + +$trollopts = Trollop::options do + opt :use_env, "Pass selected environment variables (PATH, PYTHONPATH, RUBYLIB, GEM_PATH, PERLLIB) to crunch-job" +end + +ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development" require File.dirname(__FILE__) + '/../config/boot' require File.dirname(__FILE__) + '/../config/environment' require 'open3' +LOG_BUFFER_SIZE = 4096 + class Dispatcher include ApplicationHelper @@ -24,33 +41,102 @@ class Dispatcher return act_as_system_user end + def refresh_running + Job.running.each do |jobrecord| + if !@running[jobrecord.uuid] + f = Log.filter(["object_uuid", "=", jobrecord.uuid]).limit(1).order("created_at desc").results.first + if (Time.now - f.created_at) > 300 + # job is marked running, but not known to crunch-dispatcher, and + # hasn't produced any log entries for 5 minutes, so mark it as failed. + jobrecord.running = false + jobrecord.finished_at ||= Time.now + if jobrecord.success.nil? + jobrecord.success = false + end + jobrecord.save! + end + end + end + end + def refresh_todo - @todo = Job.queue + @todo = Job.queue.select do |j| j.repository end + @todo_pipelines = PipelineInstance.queue end - def start_jobs + def sinfo + @@slurm_version ||= Gem::Version.new(`sinfo --version`.match(/\b[\d\.]+\b/)[0]) + if Gem::Version.new('2.3') <= @@slurm_version + `sinfo --noheader -o '%n:%t'`.strip + else + # Expand rows with hostname ranges (like "foo[1-3,5,9-12]:idle") + # into multiple rows with one hostname each. + `sinfo --noheader -o '%N:%t'`.split("\n").collect do |line| + tokens = line.split ":" + if (re = tokens[0].match /^(.*?)\[([-,\d]+)\]$/) + re[2].split(",").collect do |range| + range = range.split("-").collect(&:to_i) + (range[0]..range[-1]).collect do |n| + [re[1] + n.to_s, tokens[1..-1]].join ":" + end + end + else + tokens.join ":" + end + end.flatten.join "\n" + end + end + + def update_node_status if Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/ - @idle_slurm_nodes = 0 + @nodes_in_state = {idle: 0, alloc: 0, down: 0} + @node_state ||= {} + node_seen = {} begin - `sinfo`. - split("\n"). - collect { |line| line.match /(\d+) +idle/ }. - each do |re| - @idle_slurm_nodes = re[1].to_i if re + sinfo.split("\n"). + each do |line| + re = line.match /(\S+?):+(idle|alloc|down)/ + next if !re + + # sinfo tells us about a node N times if it is shared by N partitions + next if node_seen[re[1]] + node_seen[re[1]] = true + + # count nodes in each state + @nodes_in_state[re[2].to_sym] += 1 + + # update our database (and cache) when a node's state changes + if @node_state[re[1]] != re[2] + @node_state[re[1]] = re[2] + node = Node.where('hostname=?', re[1]).first + if node + $stderr.puts "dispatch: update #{re[1]} state to #{re[2]}" + node.info[:slurm_state] = re[2] + node.save + elsif re[2] != 'down' + $stderr.puts "dispatch: sinfo reports '#{re[1]}' is not down, but no node has that name" + end + end end rescue end end + end + def start_jobs @todo.each do |job| min_nodes = 1 begin - if job.resource_limits['min_nodes'] - min_nodes = begin job.resource_limits['min_nodes'].to_i rescue 1 end + if job.runtime_constraints['min_nodes'] + min_nodes = begin job.runtime_constraints['min_nodes'].to_i rescue 1 end end end - next if @idle_slurm_nodes and @idle_slurm_nodes < min_nodes + + begin + next if @nodes_in_state[:idle] < min_nodes + rescue + end next if @running[job.uuid] next if !take(job) @@ -61,6 +147,7 @@ class Dispatcher cmd_args = [] when :slurm_immediate cmd_args = ["salloc", + "--chdir=/", "--immediate", "--exclusive", "--no-kill", @@ -70,38 +157,78 @@ class Dispatcher raise "Unknown crunch_job_wrapper: #{Server::Application.config.crunch_job_wrapper}" end + if Server::Application.config.crunch_job_user + cmd_args.unshift("sudo", "-E", "-u", Server::Application.config.crunch_job_user) + end + + cmd_args << "HOME=/dev/null" + cmd_args << "ARVADOS_API_HOST=#{ENV['ARVADOS_API_HOST']}" + cmd_args << "ARVADOS_API_HOST_INSECURE=#{ENV['ARVADOS_API_HOST_INSECURE']}" if ENV['ARVADOS_API_HOST_INSECURE'] + + ENV.each do |k, v| + cmd_args << "#{k}=#{v}" if k.starts_with? "CRUNCH_" + end + + if $trollopts.use_env + cmd_args << "PATH=#{ENV['PATH']}" + cmd_args << "PYTHONPATH=#{ENV['PYTHONPATH']}" + cmd_args << "PERLLIB=#{ENV['PERLLIB']}" + cmd_args << "RUBYLIB=#{ENV['RUBYLIB']}" + cmd_args << "GEM_PATH=#{ENV['GEM_PATH']}" + end + job_auth = ApiClientAuthorization. - new(user: User.where('uuid=?', job.modified_by_user).first, + new(user: User.where('uuid=?', job.modified_by_user_uuid).first, api_client_id: 0) job_auth.save - cmd_args << 'crunch-job' + crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip) + if crunch_job_bin == '' + raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path." + end + + require 'shellwords' + + arvados_internal = Rails.configuration.git_internal_dir + if not File.exists? arvados_internal + $stderr.puts `mkdir -p #{arvados_internal.shellescape} && cd #{arvados_internal.shellescape} && git init --bare` + end + + src_repo = File.join(Rails.configuration.git_repositories_dir, job.repository + '.git') + src_repo = File.join(Rails.configuration.git_repositories_dir, job.repository, '.git') unless File.exists? src_repo + + unless src_repo + $stderr.puts "dispatch: #{File.join Rails.configuration.git_repositories_dir, job.repository} doesn't exist" + sleep 1 + untake(job) + next + end + + $stderr.puts `cd #{arvados_internal.shellescape} && git fetch --no-tags #{src_repo.shellescape} && git tag #{job.uuid.shellescape} #{job.script_version.shellescape}` + + cmd_args << crunch_job_bin cmd_args << '--job-api-token' cmd_args << job_auth.api_token cmd_args << '--job' cmd_args << job.uuid + cmd_args << '--git-dir' + cmd_args << arvados_internal - commit = Commit.where(sha1: job.script_version).first - if commit - cmd_args << '--git-dir' - cmd_args << File. - join(Rails.configuration.git_repositories_dir, - commit.repository_name, - '.git') - end - - $stderr.puts "dispatch: #{cmd_args.join ' '}" + $stderr.puts "dispatch: #{cmd_args}" begin - i, o, e, t = Open3.popen3(*cmd_args) + i, o, e, t = Open3.popen3({}, *cmd_args, { :unsetenv_others => true}) rescue $stderr.puts "dispatch: popen3: #{$!}" sleep 1 untake(job) next end - $stderr.puts "dispatch: job #{job.uuid} start" - $stderr.puts "dispatch: child #{t.pid} start" + + $stderr.puts "dispatch: job #{job.uuid}" + start_banner = "dispatch: child #{t.pid} start #{Time.now.ctime.to_s}" + $stderr.puts start_banner + @running[job.uuid] = { stdin: i, stdout: o, @@ -111,7 +238,9 @@ class Dispatcher stderr_buf: '', started: false, sent_int: 0, - job_auth: job_auth + job_auth: job_auth, + stderr_buf_to_flush: '', + stderr_flushed_at: 0 } i.close end @@ -156,6 +285,12 @@ class Dispatcher lines.each do |line| $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) $stderr.puts line + pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n" + j[:stderr_buf_to_flush] << pub_msg + end + + if (LOG_BUFFER_SIZE < j[:stderr_buf_to_flush].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i) + write_log j end end end @@ -203,6 +338,8 @@ class Dispatcher # Ensure every last drop of stdout and stderr is consumed read_pipes + write_log j_done # write any remaining logs + if j_done[:stderr_buf] and j_done[:stderr_buf] != '' $stderr.puts j_done[:stderr_buf] + "\n" end @@ -210,15 +347,49 @@ class Dispatcher # Wait the thread j_done[:wait_thr].value + jobrecord = Job.find_by_uuid(job_done.uuid) + if jobrecord.started_at + # Clean up state fields in case crunch-job exited without + # putting the job in a suitable "finished" state. + jobrecord.running = false + jobrecord.finished_at ||= Time.now + if jobrecord.success.nil? + jobrecord.success = false + end + jobrecord.save! + else + # Don't fail the job if crunch-job didn't even get as far as + # starting it. If the job failed to run due to an infrastructure + # issue with crunch-job or slurm, we want the job to stay in the + # queue. + end + # Invalidate the per-job auth token j_done[:job_auth].update_attributes expires_at: Time.now @running.delete job_done.uuid end + def update_pipelines + expire_tokens = @pipe_auth_tokens.dup + @todo_pipelines.each do |p| + pipe_auth = (@pipe_auth_tokens[p.uuid] ||= ApiClientAuthorization. + create(user: User.where('uuid=?', p.modified_by_user_uuid).first, + api_client_id: 0)) + puts `export ARVADOS_API_TOKEN=#{pipe_auth.api_token} && arv-run-pipeline-instance --run-here --no-wait --instance #{p.uuid}` + expire_tokens.delete p.uuid + end + + expire_tokens.each do |k, v| + v.update_attributes expires_at: Time.now + @pipe_auth_tokens.delete k + end + end + def run act_as_system_user @running ||= {} + @pipe_auth_tokens ||= { } $stderr.puts "dispatch: ready" while !$signal[:term] or @running.size > 0 read_pipes @@ -235,8 +406,15 @@ class Dispatcher end end else + refresh_running unless did_recently(:refresh_running, 60.0) refresh_todo unless did_recently(:refresh_todo, 1.0) - start_jobs unless @todo.empty? or did_recently(:start_jobs, 1.0) + update_node_status + unless @todo.empty? or did_recently(:start_jobs, 1.0) or $signal[:term] + start_jobs + end + unless (@todo_pipelines.empty? and @pipe_auth_tokens.empty?) or did_recently(:update_pipelines, 5.0) + update_pipelines + end end reap_children select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten, @@ -255,6 +433,29 @@ class Dispatcher true end end + + # send message to log table. we want these records to be transient + def write_log running_job + begin + if (running_job && running_job[:stderr_buf_to_flush] != '') + log = Log.new(object_uuid: running_job[:job].uuid, + event_type: 'stderr', + owner_uuid: running_job[:job].owner_uuid, + properties: {"text" => running_job[:stderr_buf_to_flush]}) + log.save! + running_job[:stderr_buf_to_flush] = '' + running_job[:stderr_flushed_at] = Time.now.to_i + end + rescue + running_job[:stderr_buf] = "Failed to write logs \n" + running_job[:stderr_buf_to_flush] = '' + running_job[:stderr_flushed_at] = Time.now.to_i + end + end + end +# This is how crunch-job child procs know where the "refresh" trigger file is +ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.crunch_refresh_trigger + Dispatcher.new.run