class Dispatcher
include ApplicationHelper
- include DbCurrentTime
+
+ EXIT_TEMPFAIL = 75
def initialize
@crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
end
@repo_root = Rails.configuration.git_repositories_dir
+ @arvados_repo_path = Repository.where(name: "arvados").first.server_path
@authorizations = {}
@did_recently = {}
@fetched_commits = {}
nodelist = nodes_available_for_job_now(job)
if nodelist.nil? and not did_recently(:wait_for_available_nodes, 3600)
$stderr.puts "dispatch: waiting for nodes for #{job.uuid}"
- @node_wait_deadline = db_current_time + 5.minutes
+ @node_wait_deadline = Time.now + 5.minutes
end
nodelist
end
# We already made a token for this job, but we need a new one
# because modified_by_user_uuid has changed (the job will run
# as a different user).
- @authorizations[job.uuid].update_attributes expires_at: db_current_time
+ @authorizations[job.uuid].update_attributes expires_at: Time.now
@authorizations[job.uuid] = nil
end
if not @authorizations[job.uuid]
@authorizations[job.uuid]
end
- def get_commit(repo_name, commit_hash)
- # @fetched_commits[V]==true if we know commit V exists in the
- # arvados_internal git repository.
- if !@fetched_commits[commit_hash]
- src_repo = File.join(@repo_root, "#{repo_name}.git")
- if not File.exists? src_repo
- src_repo = File.join(@repo_root, repo_name, '.git')
- if not File.exists? src_repo
- fail_job job, "No #{repo_name}.git or #{repo_name}/.git at #{@repo_root}"
- return nil
- end
- end
-
- # check if the commit needs to be fetched or not
- commit_rev = stdout_s(git_cmd("rev-list", "-n1", commit_hash),
- err: "/dev/null")
- unless $? == 0 and commit_rev == commit_hash
- # commit does not exist in internal repository, so import the source repository using git fetch-pack
- cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo)
- $stderr.puts "dispatch: #{cmd}"
- $stderr.puts(stdout_s(cmd))
- unless $? == 0
- fail_job job, "git fetch-pack failed"
- return nil
- end
- end
- @fetched_commits[commit_hash] = true
+ def internal_repo_has_commit? sha1
+ if (not @fetched_commits[sha1] and
+ sha1 == stdout_s(git_cmd("rev-list", "-n1", sha1), err: "/dev/null") and
+ $? == 0)
+ @fetched_commits[sha1] = true
end
- @fetched_commits[commit_hash]
+ return @fetched_commits[sha1]
+ end
+
+ def get_commit src_repo, sha1
+ return true if internal_repo_has_commit? sha1
+
+ # commit does not exist in internal repository, so import the
+ # source repository using git fetch-pack
+ cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo)
+ $stderr.puts "dispatch: #{cmd}"
+ $stderr.puts(stdout_s(cmd))
+ @fetched_commits[sha1] = ($? == 0)
end
def tag_commit(commit_hash, tag_name)
when :slurm_immediate
nodelist = nodes_available_for_job(job)
if nodelist.nil?
- if db_current_time < @node_wait_deadline
+ if Time.now < @node_wait_deadline
break
else
next
"GEM_PATH=#{ENV['GEM_PATH']}")
end
- ready = (get_authorization(job) and
- get_commit(job.repository, job.script_version) and
- tag_commit(job.script_version, job.uuid))
- if ready and job.arvados_sdk_version
- ready = (get_commit("arvados", job.arvados_sdk_version) and
- tag_commit(job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"))
+ next unless get_authorization job
+
+ ready = internal_repo_has_commit? job.script_version
+
+ if not ready
+ # Import the commit from the specified repository into the
+ # internal repository. This should have been done already when
+ # the job was created/updated; this code is obsolete except to
+ # avoid deployment races. Failing the job would be a
+ # reasonable thing to do at this point.
+ repo = Repository.where(name: job.repository).first
+ if repo.nil? or repo.server_path.nil?
+ fail_job "Repository #{job.repository} not found under #{@repo_root}"
+ next
+ end
+ ready &&= get_commit repo.server_path, job.script_version
+ ready &&= tag_commit job.script_version, job.uuid
+ end
+
+ # This should be unnecessary, because API server does it during
+ # job create/update, but it's still not a bad idea to verify the
+ # tag is correct before starting the job:
+ ready &&= tag_commit job.script_version, job.uuid
+
+ # The arvados_sdk_version doesn't support use of arbitrary
+ # remote URLs, so the requested version isn't necessarily copied
+ # into the internal repository yet.
+ if job.arvados_sdk_version
+ ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version
+ ready &&= tag_commit job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
+ end
+
+ if not ready
+ fail_job job, "commit not present in internal repository"
+ next
end
- next unless ready
cmd_args += [@crunch_job_bin,
'--job-api-token', @authorizations[job.uuid].api_token,
bytes_logged: 0,
events_logged: 0,
log_throttle_is_open: true,
- log_throttle_reset_time: db_current_time + Rails.configuration.crunch_log_throttle_period,
+ log_throttle_reset_time: Time.now + Rails.configuration.crunch_log_throttle_period,
log_throttle_bytes_so_far: 0,
log_throttle_lines_so_far: 0,
log_throttle_bytes_skipped: 0,
if (running_job[:bytes_logged] >
Rails.configuration.crunch_limit_log_bytes_per_job)
message = "Exceeded log limit #{Rails.configuration.crunch_limit_log_bytes_per_job} bytes (crunch_limit_log_bytes_per_job). Log will be truncated."
- running_job[:log_throttle_reset_time] = db_current_time + 100.years
+ running_job[:log_throttle_reset_time] = Time.now + 100.years
running_job[:log_throttle_is_open] = false
elsif (running_job[:log_throttle_bytes_so_far] >
Rails.configuration.crunch_log_throttle_bytes)
- remaining_time = running_job[:log_throttle_reset_time] - db_current_time
+ remaining_time = running_job[:log_throttle_reset_time] - Time.now
message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds.\n"
running_job[:log_throttle_is_open] = false
elsif (running_job[:log_throttle_lines_so_far] >
Rails.configuration.crunch_log_throttle_lines)
- remaining_time = running_job[:log_throttle_reset_time] - db_current_time
+ remaining_time = running_job[:log_throttle_reset_time] - Time.now
message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds.\n"
running_job[:log_throttle_is_open] = false
end
@running.each do |job_uuid, j|
job = j[:job]
- now = db_current_time
+ now = Time.now
if now > j[:log_throttle_reset_time]
# It has been more than throttle_period seconds since the last
# checkpoint so reset the throttle
exit_status = j_done[:wait_thr].value.exitstatus
jobrecord = Job.find_by_uuid(job_done.uuid)
- if exit_status != 75 and jobrecord.state == "Running"
+ if exit_status != EXIT_TEMPFAIL and jobrecord.state == "Running"
# crunch-job did not return exit code 75 (see below) and left the job in
# the "Running" state, which means there was an unhandled error. Fail
# the job.
# Invalidate the per-job auth token, unless the job is still queued and we
# might want to try it again.
if jobrecord.state != "Queued"
- j_done[:job_auth].update_attributes expires_at: db_current_time
+ j_done[:job_auth].update_attributes expires_at: Time.now
end
@running.delete job_done.uuid
end
expire_tokens.each do |k, v|
- v.update_attributes expires_at: db_current_time
+ v.update_attributes expires_at: Time.now
@pipe_auth_tokens.delete k
end
end
protected
def did_recently(thing, min_interval)
- current_time = db_current_time
- if !@did_recently[thing] or @did_recently[thing] < current_time - min_interval
- @did_recently[thing] = current_time
+ if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
+ @did_recently[thing] = Time.now
false
else
true
# it has been at least crunch_log_seconds_between_events seconds since
# the last flush.
if running_job[:stderr_buf_to_flush].size > Rails.configuration.crunch_log_bytes_per_event or
- (db_current_time - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events
+ (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events
begin
log = Log.new(object_uuid: running_job[:job].uuid,
event_type: 'stderr',
$stderr.puts exception.backtrace
end
running_job[:stderr_buf_to_flush] = ''
- running_job[:stderr_flushed_at] = db_current_time
+ running_job[:stderr_flushed_at] = Time.now
end
end
end
# This is how crunch-job child procs know where the "refresh" trigger file is
ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.crunch_refresh_trigger
+# If salloc can't allocate resources immediately, make it use our temporary
+# failure exit code. This ensures crunch-dispatch won't mark a job failed
+# because of an issue with node allocation. This often happens when
+# another dispatcher wins the race to allocate nodes.
+ENV["SLURM_EXIT_IMMEDIATE"] = Dispatcher::EXIT_TEMPFAIL.to_s
+
Dispatcher.new.run