class Dispatcher
include ApplicationHelper
+ EXIT_TEMPFAIL = 75
+ EXIT_RETRY_UNLOCKED = 93
+ RETRY_UNLOCKED_LIMIT = 3
+
def initialize
@crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
if @crunch_job_bin.empty?
raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
end
+ @docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN']
+
@arvados_internal = Rails.configuration.git_internal_dir
if not File.exists? @arvados_internal
$stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
end
@repo_root = Rails.configuration.git_repositories_dir
+ @arvados_repo_path = Repository.where(name: "arvados").first.server_path
@authorizations = {}
@did_recently = {}
@fetched_commits = {}
@pipe_auth_tokens = {}
@running = {}
@todo = []
+ @todo_job_retries = {}
+ @job_retry_counts = Hash.new(0)
@todo_pipelines = []
end
def refresh_todo
if $options[:jobs]
- @todo = Job.queue.select(&:repository)
+ @todo = @todo_job_retries.values + Job.queue.select(&:repository)
end
if $options[:pipelines]
@todo_pipelines = PipelineInstance.queue
@authorizations[job.uuid]
end
- def get_commit(repo_name, commit_hash)
- # @fetched_commits[V]==true if we know commit V exists in the
- # arvados_internal git repository.
- if !@fetched_commits[commit_hash]
- src_repo = File.join(@repo_root, "#{repo_name}.git")
- if not File.exists? src_repo
- src_repo = File.join(@repo_root, repo_name, '.git')
- if not File.exists? src_repo
- fail_job job, "No #{repo_name}.git or #{repo_name}/.git at #{@repo_root}"
- return nil
- end
- end
-
- # check if the commit needs to be fetched or not
- commit_rev = stdout_s(git_cmd("rev-list", "-n1", commit_hash),
- err: "/dev/null")
- unless $? == 0 and commit_rev == commit_hash
- # commit does not exist in internal repository, so import the source repository using git fetch-pack
- cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo)
- $stderr.puts "dispatch: #{cmd}"
- $stderr.puts(stdout_s(cmd))
- unless $? == 0
- fail_job job, "git fetch-pack failed"
- return nil
- end
- end
- @fetched_commits[commit_hash] = true
+ def internal_repo_has_commit? sha1
+ if (not @fetched_commits[sha1] and
+ sha1 == stdout_s(git_cmd("rev-list", "-n1", sha1), err: "/dev/null") and
+ $? == 0)
+ @fetched_commits[sha1] = true
end
- @fetched_commits[commit_hash]
+ return @fetched_commits[sha1]
+ end
+
+ def get_commit src_repo, sha1
+ return true if internal_repo_has_commit? sha1
+
+ # commit does not exist in internal repository, so import the
+ # source repository using git fetch-pack
+ cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo)
+ $stderr.puts "dispatch: #{cmd}"
+ $stderr.puts(stdout_s(cmd))
+ @fetched_commits[sha1] = ($? == 0)
end
def tag_commit(commit_hash, tag_name)
if Server::Application.config.crunch_job_user
cmd_args.unshift("sudo", "-E", "-u",
Server::Application.config.crunch_job_user,
+ "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
"PATH=#{ENV['PATH']}",
"PERLLIB=#{ENV['PERLLIB']}",
"PYTHONPATH=#{ENV['PYTHONPATH']}",
"GEM_PATH=#{ENV['GEM_PATH']}")
end
- ready = (get_authorization(job) and
- get_commit(job.repository, job.script_version) and
- tag_commit(job.script_version, job.uuid))
- if ready and job.arvados_sdk_version
- ready = (get_commit("arvados", job.arvados_sdk_version) and
- tag_commit(job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"))
+ next unless get_authorization job
+
+ ready = internal_repo_has_commit? job.script_version
+
+ if not ready
+ # Import the commit from the specified repository into the
+ # internal repository. This should have been done already when
+ # the job was created/updated; this code is obsolete except to
+ # avoid deployment races. Failing the job would be a
+ # reasonable thing to do at this point.
+ repo = Repository.where(name: job.repository).first
+ if repo.nil? or repo.server_path.nil?
+ fail_job "Repository #{job.repository} not found under #{@repo_root}"
+ next
+ end
+ ready &&= get_commit repo.server_path, job.script_version
+ ready &&= tag_commit job.script_version, job.uuid
+ end
+
+ # This should be unnecessary, because API server does it during
+ # job create/update, but it's still not a bad idea to verify the
+ # tag is correct before starting the job:
+ ready &&= tag_commit job.script_version, job.uuid
+
+ # The arvados_sdk_version doesn't support use of arbitrary
+ # remote URLs, so the requested version isn't necessarily copied
+ # into the internal repository yet.
+ if job.arvados_sdk_version
+ ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version
+ ready &&= tag_commit job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
+ end
+
+ if not ready
+ fail_job job, "commit not present in internal repository"
+ next
end
- next unless ready
cmd_args += [@crunch_job_bin,
'--job-api-token', @authorizations[job.uuid].api_token,
'--job', job.uuid,
'--git-dir', @arvados_internal]
+ if @docker_bin
+ cmd_args += ['--docker-bin', @docker_bin]
+ end
+
+ if @todo_job_retries.include?(job.uuid)
+ cmd_args << "--force-unlock"
+ end
+
$stderr.puts "dispatch: #{cmd_args.join ' '}"
begin
log_throttle_bytes_skipped: 0,
}
i.close
+ @todo_job_retries.delete(job.uuid)
update_node_status
end
end
return if !pid_done
job_done = j_done[:job]
- $stderr.puts "dispatch: child #{pid_done} exit"
- $stderr.puts "dispatch: job #{job_done.uuid} end"
# Ensure every last drop of stdout and stderr is consumed.
read_pipes
# Wait the thread (returns a Process::Status)
exit_status = j_done[:wait_thr].value.exitstatus
+ exit_tempfail = exit_status == EXIT_TEMPFAIL
+
+ $stderr.puts "dispatch: child #{pid_done} exit #{exit_status}"
+ $stderr.puts "dispatch: job #{job_done.uuid} end"
jobrecord = Job.find_by_uuid(job_done.uuid)
- if exit_status != 75 and jobrecord.state == "Running"
- # crunch-job did not return exit code 75 (see below) and left the job in
- # the "Running" state, which means there was an unhandled error. Fail
- # the job.
- jobrecord.state = "Failed"
- if not jobrecord.save
- $stderr.puts "dispatch: jobrecord.save failed"
+
+ if exit_status == EXIT_RETRY_UNLOCKED
+ # The job failed because all of the nodes allocated to it
+ # failed. Only this crunch-dispatch process can retry the job:
+ # it's already locked, and there's no way to put it back in the
+ # Queued state. Put it in our internal todo list unless the job
+ # has failed this way excessively.
+ @job_retry_counts[jobrecord.uuid] += 1
+ exit_tempfail = @job_retry_counts[jobrecord.uuid] <= RETRY_UNLOCKED_LIMIT
+ if exit_tempfail
+ @todo_job_retries[jobrecord.uuid] = jobrecord
+ else
+ $stderr.puts("dispatch: job #{jobrecord.uuid} exceeded node failure retry limit -- giving up")
+ end
+ end
+
+ if !exit_tempfail
+ @job_retry_counts.delete(jobrecord.uuid)
+ if jobrecord.state == "Running"
+ # Apparently there was an unhandled error. That could potentially
+ # include "all allocated nodes failed" when we don't to retry
+ # because the job has already been retried RETRY_UNLOCKED_LIMIT
+ # times. Fail the job.
+ jobrecord.state = "Failed"
+ if not jobrecord.save
+ $stderr.puts "dispatch: jobrecord.save failed"
+ end
end
else
- # Don't fail the job if crunch-job didn't even get as far as
- # starting it. If the job failed to run due to an infrastructure
+ # If the job failed to run due to an infrastructure
# issue with crunch-job or slurm, we want the job to stay in the
# queue. If crunch-job exited after losing a race to another
# crunch-job process, it exits 75 and we should leave the job
- # record alone so the winner of the race do its thing.
+ # record alone so the winner of the race can do its thing.
+ # If crunch-job exited after all of its allocated nodes failed,
+ # it exits 93, and we want to retry it later (see the
+ # EXIT_RETRY_UNLOCKED `if` block).
#
# There is still an unhandled race condition: If our crunch-job
# process is about to lose a race with another crunch-job
# Invalidate the per-job auth token, unless the job is still queued and we
# might want to try it again.
- if jobrecord.state != "Queued"
+ if jobrecord.state != "Queued" and !@todo_job_retries.include?(jobrecord.uuid)
j_done[:job_auth].update_attributes expires_at: Time.now
end
select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten,
[], [], 1)
end
+ # If there are jobs we wanted to retry, we have to mark them as failed now.
+ # Other dispatchers can't pick them up because we hold their lock.
+ @todo_job_retries.each_key do |job_uuid|
+ job = Job.find_by_uuid(job_uuid)
+ if job.state == "Running"
+ fail_job(job, "crunch-dispatch was stopped during job's tempfail retry loop")
+ end
+ end
end
protected
# This is how crunch-job child procs know where the "refresh" trigger file is
ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.crunch_refresh_trigger
+# If salloc can't allocate resources immediately, make it use our temporary
+# failure exit code. This ensures crunch-dispatch won't mark a job failed
+# because of an issue with node allocation. This often happens when
+# another dispatcher wins the race to allocate nodes.
+ENV["SLURM_EXIT_IMMEDIATE"] = Dispatcher::EXIT_TEMPFAIL.to_s
+
Dispatcher.new.run