require File.dirname(__FILE__) + '/../config/environment'
require 'open3'
+class LogTime < Time
+ def to_s
+ self.utc.strftime "%Y-%m-%d_%H:%M:%S"
+ end
+end
+
class Dispatcher
include ApplicationHelper
+ EXIT_TEMPFAIL = 75
+ EXIT_RETRY_UNLOCKED = 93
+ RETRY_UNLOCKED_LIMIT = 3
+
+ def initialize
+ @crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
+ if @crunch_job_bin.empty?
+ raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
+ end
+
+ @docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN']
+
+ @arvados_internal = Rails.configuration.git_internal_dir
+ if not File.exists? @arvados_internal
+ $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
+ raise "No internal git repository available" unless ($? == 0)
+ end
+
+ @repo_root = Rails.configuration.git_repositories_dir
+ @arvados_repo_path = Repository.where(name: "arvados").first.server_path
+ @authorizations = {}
+ @did_recently = {}
+ @fetched_commits = {}
+ @git_tags = {}
+ @node_state = {}
+ @pipe_auth_tokens = {}
+ @running = {}
+ @todo = []
+ @todo_job_retries = {}
+ @job_retry_counts = Hash.new(0)
+ @todo_pipelines = []
+ end
+
def sysuser
return act_as_system_user
end
def refresh_todo
- @todo = []
if $options[:jobs]
- @todo = Job.queue.select(&:repository)
+ @todo = @todo_job_retries.values + Job.queue.select(&:repository)
end
- @todo_pipelines = []
if $options[:pipelines]
@todo_pipelines = PipelineInstance.queue
end
def slurm_status
slurm_nodes = {}
each_slurm_line("sinfo", "%t") do |hostname, state|
+ # Treat nodes in idle* state as down, because the * means that slurm
+ # hasn't been able to communicate with it recently.
+ state.sub!(/^idle\*/, "down")
state.sub!(/\W+$/, "")
state = "down" unless %w(idle alloc down).include?(state)
slurm_nodes[hostname] = {state: state, job: nil}
def update_node_status
return unless Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
- @node_state ||= {}
slurm_status.each_pair do |hostname, slurmdata|
next if @node_state[hostname] == slurmdata
begin
rescue
$stderr.puts "dispatch: log.create failed"
end
- job.state = "Failed"
- if not job.save
- $stderr.puts "dispatch: job.save failed"
+
+ begin
+ job.lock @authorizations[job.uuid].user.uuid
+ job.state = "Failed"
+ if not job.save
+ $stderr.puts "dispatch: save failed setting job #{job.uuid} to failed"
+ end
+ rescue ArvadosModel::AlreadyLockedError
+ $stderr.puts "dispatch: tried to mark job #{job.uuid} as failed but it was already locked by someone else"
+ end
+ end
+
+ def stdout_s(cmd_a, opts={})
+ IO.popen(cmd_a, "r", opts) do |pipe|
+ return pipe.read.chomp
+ end
+ end
+
+ def git_cmd(*cmd_a)
+ ["git", "--git-dir=#{@arvados_internal}"] + cmd_a
+ end
+
+ def get_authorization(job)
+ if @authorizations[job.uuid] and
+ @authorizations[job.uuid].user.uuid != job.modified_by_user_uuid
+ # We already made a token for this job, but we need a new one
+ # because modified_by_user_uuid has changed (the job will run
+ # as a different user).
+ @authorizations[job.uuid].update_attributes expires_at: Time.now
+ @authorizations[job.uuid] = nil
+ end
+ if not @authorizations[job.uuid]
+ auth = ApiClientAuthorization.
+ new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
+ api_client_id: 0)
+ if not auth.save
+ $stderr.puts "dispatch: auth.save failed for #{job.uuid}"
+ else
+ @authorizations[job.uuid] = auth
+ end
+ end
+ @authorizations[job.uuid]
+ end
+
+ def internal_repo_has_commit? sha1
+ if (not @fetched_commits[sha1] and
+ sha1 == stdout_s(git_cmd("rev-list", "-n1", sha1), err: "/dev/null") and
+ $? == 0)
+ @fetched_commits[sha1] = true
+ end
+ return @fetched_commits[sha1]
+ end
+
+ def get_commit src_repo, sha1
+ return true if internal_repo_has_commit? sha1
+
+ # commit does not exist in internal repository, so import the
+ # source repository using git fetch-pack
+ cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo)
+ $stderr.puts "dispatch: #{cmd}"
+ $stderr.puts(stdout_s(cmd))
+ @fetched_commits[sha1] = ($? == 0)
+ end
+
+ def tag_commit(commit_hash, tag_name)
+ # @git_tags[T]==V if we know commit V has been tagged T in the
+ # arvados_internal repository.
+ if not @git_tags[tag_name]
+ cmd = git_cmd("tag", tag_name, commit_hash)
+ $stderr.puts "dispatch: #{cmd}"
+ $stderr.puts(stdout_s(cmd, err: "/dev/null"))
+ unless $? == 0
+ # git tag failed. This may be because the tag already exists, so check for that.
+ tag_rev = stdout_s(git_cmd("rev-list", "-n1", tag_name))
+ if $? == 0
+ # We got a revision back
+ if tag_rev != commit_hash
+ # Uh oh, the tag doesn't point to the revision we were expecting.
+ # Someone has been monkeying with the job record and/or git.
+ fail_job job, "Existing tag #{tag_name} points to commit #{tag_rev} but expected commit #{commit_hash}"
+ return nil
+ end
+ # we're okay (fall through to setting @git_tags below)
+ else
+ # git rev-list failed for some reason.
+ fail_job job, "'git tag' for #{tag_name} failed but did not find any existing tag using 'git rev-list'"
+ return nil
+ end
+ end
+ # 'git tag' was successful, or there is an existing tag that points to the same revision.
+ @git_tags[tag_name] = commit_hash
+ elsif @git_tags[tag_name] != commit_hash
+ fail_job job, "Existing tag #{tag_name} points to commit #{@git_tags[tag_name]} but this job uses commit #{commit_hash}"
+ return nil
end
+ @git_tags[tag_name]
end
def start_jobs
if Server::Application.config.crunch_job_user
cmd_args.unshift("sudo", "-E", "-u",
Server::Application.config.crunch_job_user,
+ "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
"PATH=#{ENV['PATH']}",
"PERLLIB=#{ENV['PERLLIB']}",
"PYTHONPATH=#{ENV['PYTHONPATH']}",
"GEM_PATH=#{ENV['GEM_PATH']}")
end
- @authorizations ||= {}
- if @authorizations[job.uuid] and
- @authorizations[job.uuid].user.uuid != job.modified_by_user_uuid
- # We already made a token for this job, but we need a new one
- # because modified_by_user_uuid has changed (the job will run
- # as a different user).
- @authorizations[job.uuid].update_attributes expires_at: Time.now
- @authorizations[job.uuid] = nil
- end
- if not @authorizations[job.uuid]
- auth = ApiClientAuthorization.
- new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
- api_client_id: 0)
- if not auth.save
- $stderr.puts "dispatch: auth.save failed"
+ next unless get_authorization job
+
+ ready = internal_repo_has_commit? job.script_version
+
+ if not ready
+ # Import the commit from the specified repository into the
+ # internal repository. This should have been done already when
+ # the job was created/updated; this code is obsolete except to
+ # avoid deployment races. Failing the job would be a
+ # reasonable thing to do at this point.
+ repo = Repository.where(name: job.repository).first
+ if repo.nil? or repo.server_path.nil?
+ fail_job "Repository #{job.repository} not found under #{@repo_root}"
next
end
- @authorizations[job.uuid] = auth
+ ready &&= get_commit repo.server_path, job.script_version
+ ready &&= tag_commit job.script_version, job.uuid
end
- crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
- if crunch_job_bin == ''
- raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
+ # This should be unnecessary, because API server does it during
+ # job create/update, but it's still not a bad idea to verify the
+ # tag is correct before starting the job:
+ ready &&= tag_commit job.script_version, job.uuid
+
+ # The arvados_sdk_version doesn't support use of arbitrary
+ # remote URLs, so the requested version isn't necessarily copied
+ # into the internal repository yet.
+ if job.arvados_sdk_version
+ ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version
+ ready &&= tag_commit job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
end
- arvados_internal = Rails.configuration.git_internal_dir
- if not File.exists? arvados_internal
- $stderr.puts `mkdir -p #{arvados_internal.shellescape} && cd #{arvados_internal.shellescape} && git init --bare`
+ if not ready
+ fail_job job, "commit not present in internal repository"
+ next
end
- git = "git --git-dir=#{arvados_internal.shellescape}"
+ cmd_args += [@crunch_job_bin,
+ '--job-api-token', @authorizations[job.uuid].api_token,
+ '--job', job.uuid,
+ '--git-dir', @arvados_internal]
- # @fetched_commits[V]==true if we know commit V exists in the
- # arvados_internal git repository.
- @fetched_commits ||= {}
- if !@fetched_commits[job.script_version]
-
- repo_root = Rails.configuration.git_repositories_dir
- src_repo = File.join(repo_root, job.repository + '.git')
- if not File.exists? src_repo
- src_repo = File.join(repo_root, job.repository, '.git')
- if not File.exists? src_repo
- fail_job job, "No #{job.repository}.git or #{job.repository}/.git at #{repo_root}"
- next
- end
- end
-
- # check if the commit needs to be fetched or not
- commit_rev = `#{git} rev-list -n1 #{job.script_version.shellescape} 2>/dev/null`.chomp
- unless $? == 0 and commit_rev == job.script_version
- # commit does not exist in internal repository, so import the source repository using git fetch-pack
- cmd = "#{git} fetch-pack --no-progress --all #{src_repo.shellescape}"
- $stderr.puts cmd
- $stderr.puts `#{cmd}`
- unless $? == 0
- fail_job job, "git fetch-pack failed"
- next
- end
- end
- @fetched_commits[job.script_version] = true
+ if @docker_bin
+ cmd_args += ['--docker-bin', @docker_bin]
end
- # @job_tags[J]==V if we know commit V has been tagged J in the
- # arvados_internal repository. (J is a job UUID, V is a commit
- # sha1.)
- @job_tags ||= {}
- if not @job_tags[job.uuid]
- # check if the commit needs to be tagged with this job uuid
- tag_rev = `#{git} rev-list -n1 #{job.uuid.shellescape} 2>/dev/null`.chomp
- if $? != 0
- # no job tag found, so create one
- cmd = "#{git} tag #{job.uuid.shellescape} #{job.script_version.shellescape}"
- $stderr.puts cmd
- $stderr.puts `#{cmd}`
- unless $? == 0
- fail_job job, "git tag failed"
- next
- end
- else
- # job tag found, check that it has the expected revision
- unless tag_rev == job.script_version
- # Uh oh, the tag doesn't point to the revision we were expecting.
- # Someone has been monkeying with the job record and/or git.
- fail_job job, "Existing tag #{job.uuid} points to commit #{tag_rev} but expected commit #{job.script_version}"
- next
- end
- end
- @job_tags[job.uuid] = job.script_version
- elsif @job_tags[job.uuid] != job.script_version
- fail_job job, "Existing tag #{job.uuid} points to commit #{@job_tags[job.uuid]} but this job uses commit #{job.script_version}"
+ if @todo_job_retries.include?(job.uuid)
+ cmd_args << "--force-unlock"
end
- cmd_args << crunch_job_bin
- cmd_args << '--job-api-token'
- cmd_args << @authorizations[job.uuid].api_token
- cmd_args << '--job'
- cmd_args << job.uuid
- cmd_args << '--git-dir'
- cmd_args << arvados_internal
-
$stderr.puts "dispatch: #{cmd_args.join ' '}"
begin
end
$stderr.puts "dispatch: job #{job.uuid}"
- start_banner = "dispatch: child #{t.pid} start #{Time.now.ctime.to_s}"
+ start_banner = "dispatch: child #{t.pid} start #{LogTime.now}"
$stderr.puts start_banner
@running[job.uuid] = {
log_throttle_bytes_skipped: 0,
}
i.close
+ @todo_job_retries.delete(job.uuid)
update_node_status
end
end
if j[:log_throttle_bytes_skipped] > 0
message = "#{job_uuid} ! Skipped #{j[:log_throttle_bytes_skipped]} bytes of log"
$stderr.puts message
- j[:stderr_buf_to_flush] << "#{Time.now.ctime.to_s} #{message}\n"
+ j[:stderr_buf_to_flush] << "#{LogTime.now} #{message}\n"
end
j[:log_throttle_reset_time] = now + Rails.configuration.crunch_log_throttle_period
if rate_limit j, line
$stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
$stderr.puts line
- pub_msg = "#{Time.now.ctime.to_s} #{line.strip}\n"
+ pub_msg = "#{LogTime.now} #{line.strip}\n"
j[:stderr_buf_to_flush] << pub_msg
end
end
return if !pid_done
job_done = j_done[:job]
- $stderr.puts "dispatch: child #{pid_done} exit"
- $stderr.puts "dispatch: job #{job_done.uuid} end"
# Ensure every last drop of stdout and stderr is consumed.
read_pipes
# Wait the thread (returns a Process::Status)
exit_status = j_done[:wait_thr].value.exitstatus
+ exit_tempfail = exit_status == EXIT_TEMPFAIL
+
+ $stderr.puts "dispatch: child #{pid_done} exit #{exit_status}"
+ $stderr.puts "dispatch: job #{job_done.uuid} end"
jobrecord = Job.find_by_uuid(job_done.uuid)
- if exit_status != 75 and jobrecord.state == "Running"
- # crunch-job did not return exit code 75 (see below) and left the job in
- # the "Running" state, which means there was an unhandled error. Fail
- # the job.
- jobrecord.state = "Failed"
- if not jobrecord.save
- $stderr.puts "dispatch: jobrecord.save failed"
+
+ if exit_status == EXIT_RETRY_UNLOCKED
+ # The job failed because all of the nodes allocated to it
+ # failed. Only this crunch-dispatch process can retry the job:
+ # it's already locked, and there's no way to put it back in the
+ # Queued state. Put it in our internal todo list unless the job
+ # has failed this way excessively.
+ @job_retry_counts[jobrecord.uuid] += 1
+ exit_tempfail = @job_retry_counts[jobrecord.uuid] <= RETRY_UNLOCKED_LIMIT
+ if exit_tempfail
+ @todo_job_retries[jobrecord.uuid] = jobrecord
+ else
+ $stderr.puts("dispatch: job #{jobrecord.uuid} exceeded node failure retry limit -- giving up")
+ end
+ end
+
+ if !exit_tempfail
+ @job_retry_counts.delete(jobrecord.uuid)
+ if jobrecord.state == "Running"
+ # Apparently there was an unhandled error. That could potentially
+ # include "all allocated nodes failed" when we don't to retry
+ # because the job has already been retried RETRY_UNLOCKED_LIMIT
+ # times. Fail the job.
+ jobrecord.state = "Failed"
+ if not jobrecord.save
+ $stderr.puts "dispatch: jobrecord.save failed"
+ end
end
else
- # Don't fail the job if crunch-job didn't even get as far as
- # starting it. If the job failed to run due to an infrastructure
+ # If the job failed to run due to an infrastructure
# issue with crunch-job or slurm, we want the job to stay in the
# queue. If crunch-job exited after losing a race to another
# crunch-job process, it exits 75 and we should leave the job
- # record alone so the winner of the race do its thing.
+ # record alone so the winner of the race can do its thing.
+ # If crunch-job exited after all of its allocated nodes failed,
+ # it exits 93, and we want to retry it later (see the
+ # EXIT_RETRY_UNLOCKED `if` block).
#
# There is still an unhandled race condition: If our crunch-job
# process is about to lose a race with another crunch-job
# fine.
end
- # Invalidate the per-job auth token
- j_done[:job_auth].update_attributes expires_at: Time.now
+ # Invalidate the per-job auth token, unless the job is still queued and we
+ # might want to try it again.
+ if jobrecord.state != "Queued" and !@todo_job_retries.include?(jobrecord.uuid)
+ j_done[:job_auth].update_attributes expires_at: Time.now
+ end
@running.delete job_done.uuid
end
def run
act_as_system_user
- @running ||= {}
- @pipe_auth_tokens ||= { }
$stderr.puts "dispatch: ready"
while !$signal[:term] or @running.size > 0
read_pipes
select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten,
[], [], 1)
end
+ # If there are jobs we wanted to retry, we have to mark them as failed now.
+ # Other dispatchers can't pick them up because we hold their lock.
+ @todo_job_retries.each_key do |job_uuid|
+ job = Job.find_by_uuid(job_uuid)
+ if job.state == "Running"
+ fail_job(job, "crunch-dispatch was stopped during job's tempfail retry loop")
+ end
+ end
end
protected
def did_recently(thing, min_interval)
- @did_recently ||= {}
if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
@did_recently[thing] = Time.now
false
# This is how crunch-job child procs know where the "refresh" trigger file is
ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.crunch_refresh_trigger
+# If salloc can't allocate resources immediately, make it use our temporary
+# failure exit code. This ensures crunch-dispatch won't mark a job failed
+# because of an issue with node allocation. This often happens when
+# another dispatcher wins the race to allocate nodes.
+ENV["SLURM_EXIT_IMMEDIATE"] = Dispatcher::EXIT_TEMPFAIL.to_s
+
Dispatcher.new.run