@docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN']
@docker_run_args = ENV['CRUNCH_JOB_DOCKER_RUN_ARGS']
@cgroup_root = ENV['CRUNCH_CGROUP_ROOT']
+ @srun_sync_timeout = ENV['CRUNCH_SRUN_SYNC_TIMEOUT']
- @arvados_internal = Rails.configuration.git_internal_dir
+ @arvados_internal = Rails.configuration.Containers["JobsAPI"]["GitInternalDir"]
if not File.exist? @arvados_internal
$stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
raise "No internal git repository available" unless ($? == 0)
end
- @repo_root = Rails.configuration.git_repositories_dir
+ @repo_root = Rails.configuration.Git["Repositories"]
@arvados_repo_path = Repository.where(name: "arvados").first.server_path
@authorizations = {}
@did_recently = {}
@fetched_commits[sha1] = ($? == 0)
end
- def tag_commit(commit_hash, tag_name)
+ def tag_commit(job, commit_hash, tag_name)
# @git_tags[T]==V if we know commit V has been tagged T in the
# arvados_internal repository.
if not @git_tags[tag_name]
next
end
ready &&= get_commit repo.server_path, job.script_version
- ready &&= tag_commit job.script_version, job.uuid
+ ready &&= tag_commit job, job.script_version, job.uuid
end
# This should be unnecessary, because API server does it during
# job create/update, but it's still not a bad idea to verify the
# tag is correct before starting the job:
- ready &&= tag_commit job.script_version, job.uuid
+ ready &&= tag_commit job, job.script_version, job.uuid
# The arvados_sdk_version doesn't support use of arbitrary
# remote URLs, so the requested version isn't necessarily copied
# into the internal repository yet.
if job.arvados_sdk_version
ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version
- ready &&= tag_commit job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
+ ready &&= tag_commit job, job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
end
if not ready
cmd_args += ['--docker-run-args', @docker_run_args]
end
+ if @srun_sync_timeout
+ cmd_args += ['--srun-sync-timeout', @srun_sync_timeout]
+ end
+
if have_job_lock?(job)
cmd_args << "--force-unlock"
end
i, o, e, t = Open3.popen3(*cmd_args)
rescue
$stderr.puts "dispatch: popen3: #{$!}"
- sleep 1
- next
+ # This is a dispatch problem like "Too many open files";
+ # retrying another job right away would be futile. Just return
+ # and hope things are better next time, after (at least) a
+ # did_recently() delay.
+ return
end
$stderr.puts "dispatch: job #{job.uuid}"
bytes_logged: 0,
events_logged: 0,
log_throttle_is_open: true,
- log_throttle_reset_time: Time.now + Rails.configuration.crunch_log_throttle_period,
+ log_throttle_reset_time: Time.now + Rails.configuration.Containers["Logging"]["LogThrottlePeriod"],
log_throttle_bytes_so_far: 0,
log_throttle_lines_so_far: 0,
log_throttle_bytes_skipped: 0,
matches = line.match(/^\S+ \S+ \d+ \d+ stderr (.*)/)
if matches and matches[1] and matches[1].start_with?('[...]') and matches[1].end_with?('[...]')
partial_line = true
- if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.crunch_log_partial_line_throttle_period
+ if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.Containers["Logging"]["LogPartialLineThrottlePeriod"]
running_job[:log_throttle_partial_line_last_at] = Time.now
else
skip_counts = true
end
if (running_job[:bytes_logged] >
- Rails.configuration.crunch_limit_log_bytes_per_job)
- message = "Exceeded log limit #{Rails.configuration.crunch_limit_log_bytes_per_job} bytes (crunch_limit_log_bytes_per_job). Log will be truncated."
+ Rails.configuration.Containers["Logging"]["LimitLogBytesPerJob"])
+ message = "Exceeded log limit #{Rails.configuration.Containers["Logging"]["LimitLogBytesPerJob"]} bytes (LimitLogBytesPerJob). Log will be truncated."
running_job[:log_throttle_reset_time] = Time.now + 100.years
running_job[:log_throttle_is_open] = false
elsif (running_job[:log_throttle_bytes_so_far] >
- Rails.configuration.crunch_log_throttle_bytes)
+ Rails.configuration.Containers["Logging"]["LogThrottleBytes"])
remaining_time = running_job[:log_throttle_reset_time] - Time.now
- message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds."
+ message = "Exceeded rate #{Rails.configuration.Containers["Logging"]["LogThrottleBytes"]} bytes per #{Rails.configuration.Containers["Logging"]["LogThrottlePeriod"]} seconds (LogThrottleBytes). Logging will be silenced for the next #{remaining_time.round} seconds."
running_job[:log_throttle_is_open] = false
elsif (running_job[:log_throttle_lines_so_far] >
- Rails.configuration.crunch_log_throttle_lines)
+ Rails.configuration.Containers["Logging"]["LogThrottleLines"])
remaining_time = running_job[:log_throttle_reset_time] - Time.now
- message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds."
+ message = "Exceeded rate #{Rails.configuration.Containers["Logging"]["LogThrottleLines"]} lines per #{Rails.configuration.Containers["Logging"]["LogThrottlePeriod"]} seconds (LogThrottleLines), logging will be silenced for the next #{remaining_time.round} seconds."
running_job[:log_throttle_is_open] = false
elsif partial_line and running_job[:log_throttle_first_partial_line]
running_job[:log_throttle_first_partial_line] = false
- message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.crunch_log_partial_line_throttle_period} seconds."
+ message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.Containers["Logging"]["LogPartialLineThrottlePeriod"]} seconds."
end
end
j[:stderr_buf_to_flush] << "#{LogTime.now} #{message}\n"
end
- j[:log_throttle_reset_time] = now + Rails.configuration.crunch_log_throttle_period
+ j[:log_throttle_reset_time] = now + Rails.configuration.Containers["Logging"]["LogThrottlePeriod"]
j[:log_throttle_bytes_so_far] = 0
j[:log_throttle_lines_so_far] = 0
j[:log_throttle_bytes_skipped] = 0
bufend = ''
streambuf.each_line do |line|
if not line.end_with? $/
- if line.size > Rails.configuration.crunch_log_throttle_bytes
+ if line.size > Rails.configuration.Containers["Logging"]["LogThrottleBytes"]
# Without a limit here, we'll use 2x an arbitrary amount
# of memory, and waste a lot of time copying strings
# around, all without providing any feedback to anyone
pid_done = nil
j_done = nil
- if false
- begin
- pid_done = waitpid(-1, Process::WNOHANG | Process::WUNTRACED)
- if pid_done
- j_done = @running.values.
- select { |j| j[:wait_thr].pid == pid_done }.
- first
- end
- rescue SystemCallError
- # I have @running processes but system reports I have no
- # children. This is likely to happen repeatedly if it happens at
- # all; I will log this no more than once per child process I
- # start.
- if 0 < @running.select { |uuid,j| j[:warned_waitpid_error].nil? }.size
- children = @running.values.collect { |j| j[:wait_thr].pid }.join ' '
- $stderr.puts "dispatch: IPC bug: waitpid() error (#{$!}), but I have children #{children}"
- end
- @running.each do |uuid,j| j[:warned_waitpid_error] = true end
- end
- else
- @running.each do |uuid, j|
- if j[:wait_thr].status == false
- pid_done = j[:wait_thr].pid
- j_done = j
- end
+ @running.each do |uuid, j|
+ if !j[:wait_thr].status
+ pid_done = j[:wait_thr].pid
+ j_done = j
+ break
end
end
# This is how crunch-job child procs know where the "refresh"
# trigger file is
- ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.crunch_refresh_trigger
+ ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.Containers["JobsAPI"]["CrunchRefreshTrigger"]
# If salloc can't allocate resources immediately, make it use our
# temporary failure exit code. This ensures crunch-dispatch won't
# Send out to log event if buffer size exceeds the bytes per event or if
# it has been at least crunch_log_seconds_between_events seconds since
# the last flush.
- if running_job[:stderr_buf_to_flush].size > Rails.configuration.crunch_log_bytes_per_event or
- (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events
+ if running_job[:stderr_buf_to_flush].size > Rails.configuration.Containers["Logging"]["LogBytesPerEvent"] or
+ (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.Containers["Logging"]["LogSecondsBetweenEvents"]
begin
log = Log.new(object_uuid: running_job[:job].uuid,
event_type: 'stderr',
# An array of job_uuids in squeue
def squeue_jobs
- if Rails.configuration.crunch_job_wrapper == :slurm_immediate
+ if Rails.configuration.Containers["JobsAPI"]["CrunchJobWrapper"].to_sym == :slurm_immediate
p = IO.popen(['squeue', '-a', '-h', '-o', '%j'])
begin
p.readlines.map {|line| line.strip}