+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
require 'open3'
require 'shellwords'
@docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN']
@docker_run_args = ENV['CRUNCH_JOB_DOCKER_RUN_ARGS']
+ @cgroup_root = ENV['CRUNCH_CGROUP_ROOT']
@arvados_internal = Rails.configuration.git_internal_dir
- if not File.exists? @arvados_internal
+ if not File.exist? @arvados_internal
$stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
raise "No internal git repository available" unless ($? == 0)
end
# into multiple rows with one hostname each.
`#{cmd} --noheader -o '%N:#{outfmt}'`.each_line do |line|
tokens = line.chomp.split(":", max_fields)
- if (re = tokens[0].match /^(.*?)\[([-,\d]+)\]$/)
+ if (re = tokens[0].match(/^(.*?)\[([-,\d]+)\]$/))
tokens.shift
re[2].split(",").each do |range|
range = range.split("-").collect(&:to_i)
# hasn't been able to communicate with it recently.
state.sub!(/^idle\*/, "down")
state.sub!(/\W+$/, "")
- state = "down" unless %w(idle alloc down).include?(state)
+ state = "down" unless %w(idle alloc comp mix drng down).include?(state)
slurm_nodes[hostname] = {state: state, job: nil}
end
each_slurm_line("squeue", "%j") do |hostname, job_uuid|
end
def update_node_status
- return unless Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
+ return unless Server::Application.config.crunch_job_wrapper.to_s.match(/^slurm/)
slurm_status.each_pair do |hostname, slurmdata|
next if @node_state[hostname] == slurmdata
begin
end
usable_nodes << node
if usable_nodes.count >= min_node_count
- return usable_nodes.map { |node| node.hostname }
+ hostnames = usable_nodes.map(&:hostname)
+ log_nodes = usable_nodes.map do |n|
+ "#{n.hostname} #{n.uuid} #{n.properties.to_json}"
+ end
+ log_job = "#{job.uuid} #{job.runtime_constraints}"
+ log_text = "dispatching job #{log_job} to #{log_nodes.join(", ")}"
+ $stderr.puts log_text
+ begin
+ act_as_system_user do
+ Log.new(object_uuid: job.uuid,
+ event_type: 'dispatch',
+ owner_uuid: system_user_uuid,
+ summary: "dispatching to #{hostnames.join(", ")}",
+ properties: {'text' => log_text}).save!
+ end
+ rescue => e
+ $stderr.puts "dispatch: log.create failed: #{e}"
+ end
+ return hostnames
end
end
nil
owner_uuid: job.owner_uuid,
summary: message,
properties: {"text" => message}).save!
- rescue
- $stderr.puts "dispatch: log.create failed"
+ rescue => e
+ $stderr.puts "dispatch: log.create failed: #{e}"
end
if not skip_lock and not have_job_lock?(job)
@fetched_commits[sha1] = ($? == 0)
end
- def tag_commit(commit_hash, tag_name)
+ def tag_commit(job, commit_hash, tag_name)
# @git_tags[T]==V if we know commit V has been tagged T in the
# arvados_internal repository.
if not @git_tags[tag_name]
next
end
ready &&= get_commit repo.server_path, job.script_version
- ready &&= tag_commit job.script_version, job.uuid
+ ready &&= tag_commit job, job.script_version, job.uuid
end
# This should be unnecessary, because API server does it during
# job create/update, but it's still not a bad idea to verify the
# tag is correct before starting the job:
- ready &&= tag_commit job.script_version, job.uuid
+ ready &&= tag_commit job, job.script_version, job.uuid
# The arvados_sdk_version doesn't support use of arbitrary
# remote URLs, so the requested version isn't necessarily copied
# into the internal repository yet.
if job.arvados_sdk_version
ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version
- ready &&= tag_commit job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
+ ready &&= tag_commit job, job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
end
if not ready
'--job', job.uuid,
'--git-dir', @arvados_internal]
+ if @cgroup_root
+ cmd_args += ['--cgroup-root', @cgroup_root]
+ end
+
if @docker_bin
cmd_args += ['--docker-bin', @docker_bin]
end
i, o, e, t = Open3.popen3(*cmd_args)
rescue
$stderr.puts "dispatch: popen3: #{$!}"
- sleep 1
- next
+ # This is a dispatch problem like "Too many open files";
+ # retrying another job right away would be futile. Just return
+ # and hope things are better next time, after (at least) a
+ # did_recently() delay.
+ return
end
$stderr.puts "dispatch: job #{job.uuid}"
log_throttle_bytes_so_far: 0,
log_throttle_lines_so_far: 0,
log_throttle_bytes_skipped: 0,
- log_throttle_partial_line_last_at: 0,
+ log_throttle_partial_line_last_at: Time.new(0),
log_throttle_first_partial_line: true,
}
i.close
if running_job[:log_throttle_is_open]
partial_line = false
skip_counts = false
- if line.start_with?('[...]') and line.end_with?('[...]')
+ matches = line.match(/^\S+ \S+ \d+ \d+ stderr (.*)/)
+ if matches and matches[1] and matches[1].start_with?('[...]') and matches[1].end_with?('[...]')
partial_line = true
- if Time.now > j[:log_throttle_partial_line_last_at] + Rails.configuration.crunch_log_partial_line_throttle_period
- j[:log_throttle_partial_line_last_at] = Time.now
+ if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.crunch_log_partial_line_throttle_period
+ running_job[:log_throttle_partial_line_last_at] = Time.now
else
skip_counts = true
end
message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds."
running_job[:log_throttle_is_open] = false
- elsif partial_line and j[:log_throttle_first_partial_line]
- j[:log_throttle_first_partial_line] = false
+ elsif partial_line and running_job[:log_throttle_first_partial_line]
+ running_job[:log_throttle_first_partial_line] = false
message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.crunch_log_partial_line_throttle_period} seconds."
end
end
message += " A complete log is still being written to Keep, and will be available when the job finishes.\n"
line.replace message
true
+ elsif partial_line
+ false
else
running_job[:log_throttle_is_open]
end
def read_pipes
@running.each do |job_uuid, j|
- job = j[:job]
-
now = Time.now
if now > j[:log_throttle_reset_time]
# It has been more than throttle_period seconds since the last
j[:log_throttle_lines_so_far] = 0
j[:log_throttle_bytes_skipped] = 0
j[:log_throttle_is_open] = true
- j[:log_throttle_partial_line_last_at] = 0
+ j[:log_throttle_partial_line_last_at] = Time.new(0)
j[:log_throttle_first_partial_line] = true
end
pid_done = nil
j_done = nil
- if false
- begin
- pid_done = waitpid(-1, Process::WNOHANG | Process::WUNTRACED)
- if pid_done
- j_done = @running.values.
- select { |j| j[:wait_thr].pid == pid_done }.
- first
- end
- rescue SystemCallError
- # I have @running processes but system reports I have no
- # children. This is likely to happen repeatedly if it happens at
- # all; I will log this no more than once per child process I
- # start.
- if 0 < @running.select { |uuid,j| j[:warned_waitpid_error].nil? }.size
- children = @running.values.collect { |j| j[:wait_thr].pid }.join ' '
- $stderr.puts "dispatch: IPC bug: waitpid() error (#{$!}), but I have children #{children}"
- end
- @running.each do |uuid,j| j[:warned_waitpid_error] = true end
- end
- else
- @running.each do |uuid, j|
- if j[:wait_thr].status == false
- pid_done = j[:wait_thr].pid
- j_done = j
- end
+ @running.each do |uuid, j|
+ if !j[:wait_thr].status
+ pid_done = j[:wait_thr].pid
+ j_done = j
+ break
end
end
jobrecord = Job.find_by_uuid(job_done.uuid)
if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid)
+ $stderr.puts("dispatch: job #{jobrecord.uuid} was interrupted by node failure")
# Only this crunch-dispatch process can retry the job:
# it's already locked, and there's no way to put it back in the
# Queued state. Put it in our internal todo list unless the job
# has failed this way excessively.
@job_retry_counts[jobrecord.uuid] += 1
exit_tempfail = @job_retry_counts[jobrecord.uuid] <= RETRY_UNLOCKED_LIMIT
+ do_what_next = "give up now"
if exit_tempfail
@todo_job_retries[jobrecord.uuid] = jobrecord
- else
- $stderr.puts("dispatch: job #{jobrecord.uuid} exceeded node failure retry limit -- giving up")
+ do_what_next = "re-attempt"
end
+ $stderr.puts("dispatch: job #{jobrecord.uuid} has been interrupted " +
+ "#{@job_retry_counts[jobrecord.uuid]}x, will #{do_what_next}")
end
if !exit_tempfail
unless (@todo_pipelines.empty? and @pipe_auth_tokens.empty?) or did_recently(:update_pipelines, 5.0)
update_pipelines
end
+ unless did_recently('check_orphaned_slurm_jobs', 60)
+ check_orphaned_slurm_jobs
+ end
end
reap_children
select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten,
end
Rails.logger.info "fail_jobs: threshold is #{threshold}"
- if Rails.configuration.crunch_job_wrapper == :slurm_immediate
- # [["slurm_job_id", "slurm_job_name"], ...]
- squeue = File.popen(['squeue', '-h', '-o', '%i %j']).readlines.map do |line|
- line.strip.split(' ', 2)
- end
- else
- squeue = []
- end
-
+ squeue = squeue_jobs
Job.where('state = ? and started_at < ?', Job::Running, threshold).
each do |job|
Rails.logger.debug "fail_jobs: #{job.uuid} started #{job.started_at}"
- squeue.each do |slurm_id, slurm_name|
+ squeue.each do |slurm_name|
if slurm_name == job.uuid
- Rails.logger.info "fail_jobs: scancel #{slurm_id} for #{job.uuid}"
- scancel slurm_id
+ Rails.logger.info "fail_jobs: scancel #{job.uuid}"
+ scancel slurm_name
end
end
fail_job(job, "cleaned up stale job: started before #{threshold}",
end
end
+ def check_orphaned_slurm_jobs
+ act_as_system_user do
+ squeue_uuids = squeue_jobs.select{|uuid| uuid.match(/^[0-9a-z]{5}-8i9sb-[0-9a-z]{15}$/)}.
+ select{|uuid| !@running.has_key?(uuid)}
+
+ return if squeue_uuids.size == 0
+
+ scancel_uuids = squeue_uuids - Job.where('uuid in (?) and (state in (?) or modified_at>?)',
+ squeue_uuids,
+ ['Running', 'Queued'],
+ (Time.now - 60)).
+ collect(&:uuid)
+ scancel_uuids.each do |uuid|
+ Rails.logger.info "orphaned job: scancel #{uuid}"
+ scancel uuid
+ end
+ end
+ end
+
+ def sudo_preface
+ return [] if not Server::Application.config.crunch_job_user
+ ["sudo", "-E", "-u",
+ Server::Application.config.crunch_job_user,
+ "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
+ "PATH=#{ENV['PATH']}",
+ "PERLLIB=#{ENV['PERLLIB']}",
+ "PYTHONPATH=#{ENV['PYTHONPATH']}",
+ "RUBYLIB=#{ENV['RUBYLIB']}",
+ "GEM_PATH=#{ENV['GEM_PATH']}"]
+ end
+
protected
def have_job_lock?(job)
end
end
- def scancel slurm_id
- cmd = sudo_preface + ['scancel', slurm_id]
- puts File.popen(cmd).read
- if not $?.success?
- Rails.logger.error "scancel #{slurm_id.shellescape}: $?"
+ # An array of job_uuids in squeue
+ def squeue_jobs
+ if Rails.configuration.crunch_job_wrapper == :slurm_immediate
+ p = IO.popen(['squeue', '-a', '-h', '-o', '%j'])
+ begin
+ p.readlines.map {|line| line.strip}
+ ensure
+ p.close
+ end
+ else
+ []
end
end
- def sudo_preface
- return [] if not Server::Application.config.crunch_job_user
- ["sudo", "-E", "-u",
- Server::Application.config.crunch_job_user,
- "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
- "PATH=#{ENV['PATH']}",
- "PERLLIB=#{ENV['PERLLIB']}",
- "PYTHONPATH=#{ENV['PYTHONPATH']}",
- "RUBYLIB=#{ENV['RUBYLIB']}",
- "GEM_PATH=#{ENV['GEM_PATH']}"]
+ def scancel slurm_name
+ cmd = sudo_preface + ['scancel', '-n', slurm_name]
+ IO.popen(cmd) do |scancel_pipe|
+ puts scancel_pipe.read
+ end
+ if not $?.success?
+ Rails.logger.error "scancel #{slurm_name.shellescape}: $?"
+ end
end
end