require 'shellwords'
class CrunchDispatch
+ extend DbCurrentTime
include ApplicationHelper
include Process
end
@docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN']
+ @docker_run_args = ENV['CRUNCH_JOB_DOCKER_RUN_ARGS']
+ @cgroup_root = ENV['CRUNCH_CGROUP_ROOT']
@arvados_internal = Rails.configuration.git_internal_dir
if not File.exists? @arvados_internal
nodelist
end
- def fail_job job, message
+ def fail_job job, message, skip_lock: false
$stderr.puts "dispatch: #{job.uuid}: #{message}"
begin
Log.new(object_uuid: job.uuid,
$stderr.puts "dispatch: log.create failed"
end
- begin
- job.lock @authorizations[job.uuid].user.uuid
- job.state = "Failed"
- if not job.save
- $stderr.puts "dispatch: save failed setting job #{job.uuid} to failed"
+ if not skip_lock and not have_job_lock?(job)
+ begin
+ job.lock @authorizations[job.uuid].user.uuid
+ rescue ArvadosModel::AlreadyLockedError
+ $stderr.puts "dispatch: tried to mark job #{job.uuid} as failed but it was already locked by someone else"
+ return
end
- rescue ArvadosModel::AlreadyLockedError
- $stderr.puts "dispatch: tried to mark job #{job.uuid} as failed but it was already locked by someone else"
+ end
+
+ job.state = "Failed"
+ if not job.save
+ $stderr.puts "dispatch: save failed setting job #{job.uuid} to failed"
end
end
raise "Unknown crunch_job_wrapper: #{Server::Application.config.crunch_job_wrapper}"
end
- if Server::Application.config.crunch_job_user
- cmd_args.unshift("sudo", "-E", "-u",
- Server::Application.config.crunch_job_user,
- "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
- "PATH=#{ENV['PATH']}",
- "PERLLIB=#{ENV['PERLLIB']}",
- "PYTHONPATH=#{ENV['PYTHONPATH']}",
- "RUBYLIB=#{ENV['RUBYLIB']}",
- "GEM_PATH=#{ENV['GEM_PATH']}")
- end
+ cmd_args = sudo_preface + cmd_args
next unless get_authorization job
# reasonable thing to do at this point.
repo = Repository.where(name: job.repository).first
if repo.nil? or repo.server_path.nil?
- fail_job "Repository #{job.repository} not found under #{@repo_root}"
+ fail_job job, "Repository #{job.repository} not found under #{@repo_root}"
next
end
ready &&= get_commit repo.server_path, job.script_version
'--job', job.uuid,
'--git-dir', @arvados_internal]
+ if @cgroup_root
+ cmd_args += ['--cgroup-root', @cgroup_root]
+ end
+
if @docker_bin
cmd_args += ['--docker-bin', @docker_bin]
end
- if @todo_job_retries.include?(job.uuid)
+ if @docker_run_args
+ cmd_args += ['--docker-run-args', @docker_run_args]
+ end
+
+ if have_job_lock?(job)
cmd_args << "--force-unlock"
end
log_throttle_bytes_so_far: 0,
log_throttle_lines_so_far: 0,
log_throttle_bytes_skipped: 0,
+ log_throttle_partial_line_last_at: Time.new(0),
+ log_throttle_first_partial_line: true,
}
i.close
@todo_job_retries.delete(job.uuid)
message = false
linesize = line.size
if running_job[:log_throttle_is_open]
- running_job[:log_throttle_lines_so_far] += 1
- running_job[:log_throttle_bytes_so_far] += linesize
- running_job[:bytes_logged] += linesize
+ partial_line = false
+ skip_counts = false
+ matches = line.match(/^\S+ \S+ \d+ \d+ stderr (.*)/)
+ if matches and matches[1] and matches[1].start_with?('[...]') and matches[1].end_with?('[...]')
+ partial_line = true
+ if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.crunch_log_partial_line_throttle_period
+ running_job[:log_throttle_partial_line_last_at] = Time.now
+ else
+ skip_counts = true
+ end
+ end
+
+ if !skip_counts
+ running_job[:log_throttle_lines_so_far] += 1
+ running_job[:log_throttle_bytes_so_far] += linesize
+ running_job[:bytes_logged] += linesize
+ end
if (running_job[:bytes_logged] >
Rails.configuration.crunch_limit_log_bytes_per_job)
elsif (running_job[:log_throttle_bytes_so_far] >
Rails.configuration.crunch_log_throttle_bytes)
remaining_time = running_job[:log_throttle_reset_time] - Time.now
- message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds.\n"
+ message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds."
running_job[:log_throttle_is_open] = false
elsif (running_job[:log_throttle_lines_so_far] >
Rails.configuration.crunch_log_throttle_lines)
remaining_time = running_job[:log_throttle_reset_time] - Time.now
- message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds.\n"
+ message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds."
running_job[:log_throttle_is_open] = false
+
+ elsif partial_line and running_job[:log_throttle_first_partial_line]
+ running_job[:log_throttle_first_partial_line] = false
+ message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.crunch_log_partial_line_throttle_period} seconds."
end
end
if message
# Yes, write to logs, but use our "rate exceeded" message
# instead of the log message that exceeded the limit.
+ message += " A complete log is still being written to Keep, and will be available when the job finishes.\n"
line.replace message
true
+ elsif partial_line
+ false
else
running_job[:log_throttle_is_open]
end
j[:log_throttle_lines_so_far] = 0
j[:log_throttle_bytes_skipped] = 0
j[:log_throttle_is_open] = true
+ j[:log_throttle_partial_line_last_at] = Time.new(0)
+ j[:log_throttle_first_partial_line] = true
end
j[:buf].each do |stream, streambuf|
jobrecord = Job.find_by_uuid(job_done.uuid)
- if exit_status == EXIT_RETRY_UNLOCKED
- # The job failed because all of the nodes allocated to it
- # failed. Only this crunch-dispatch process can retry the job:
+ if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid)
+ # Only this crunch-dispatch process can retry the job:
# it's already locked, and there's no way to put it back in the
# Queued state. Put it in our internal todo list unless the job
# has failed this way excessively.
end
end
+ def fail_jobs before: nil
+ act_as_system_user do
+ threshold = nil
+ if before == 'reboot'
+ boottime = nil
+ open('/proc/stat').map(&:split).each do |stat, t|
+ if stat == 'btime'
+ boottime = t
+ end
+ end
+ if not boottime
+ raise "Could not find btime in /proc/stat"
+ end
+ threshold = Time.at(boottime.to_i)
+ elsif before
+ threshold = Time.parse(before, Time.now)
+ else
+ threshold = db_current_time
+ end
+ Rails.logger.info "fail_jobs: threshold is #{threshold}"
+
+ if Rails.configuration.crunch_job_wrapper == :slurm_immediate
+ # [["slurm_job_id", "slurm_job_name"], ...]
+ squeue = File.popen(['squeue', '-h', '-o', '%i %j']).readlines.map do |line|
+ line.strip.split(' ', 2)
+ end
+ else
+ squeue = []
+ end
+
+ Job.where('state = ? and started_at < ?', Job::Running, threshold).
+ each do |job|
+ Rails.logger.debug "fail_jobs: #{job.uuid} started #{job.started_at}"
+ squeue.each do |slurm_id, slurm_name|
+ if slurm_name == job.uuid
+ Rails.logger.info "fail_jobs: scancel #{slurm_id} for #{job.uuid}"
+ scancel slurm_id
+ end
+ end
+ fail_job(job, "cleaned up stale job: started before #{threshold}",
+ skip_lock: true)
+ end
+ end
+ end
+
protected
+ def have_job_lock?(job)
+ # Return true if the given job is locked by this crunch-dispatch, normally
+ # because we've run crunch-job for it.
+ @todo_job_retries.include?(job.uuid)
+ end
+
def did_recently(thing, min_interval)
if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
@did_recently[thing] = Time.now
running_job[:stderr_flushed_at] = Time.now
end
end
+
+ def scancel slurm_id
+ cmd = sudo_preface + ['scancel', slurm_id]
+ puts File.popen(cmd).read
+ if not $?.success?
+ Rails.logger.error "scancel #{slurm_id.shellescape}: $?"
+ end
+ end
+
+ def sudo_preface
+ return [] if not Server::Application.config.crunch_job_user
+ ["sudo", "-E", "-u",
+ Server::Application.config.crunch_job_user,
+ "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
+ "PATH=#{ENV['PATH']}",
+ "PERLLIB=#{ENV['PERLLIB']}",
+ "PYTHONPATH=#{ENV['PYTHONPATH']}",
+ "RUBYLIB=#{ENV['RUBYLIB']}",
+ "GEM_PATH=#{ENV['GEM_PATH']}"]
+ end
end