X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d03f44d2d2c93841a50cbfca1a74600bc504b593..40196e450ef489a31eeabf2c11a3969094e185b2:/services/api/lib/crunch_dispatch.rb diff --git a/services/api/lib/crunch_dispatch.rb b/services/api/lib/crunch_dispatch.rb index 5d598d48ea..ce94f737a2 100644 --- a/services/api/lib/crunch_dispatch.rb +++ b/services/api/lib/crunch_dispatch.rb @@ -2,6 +2,7 @@ require 'open3' require 'shellwords' class CrunchDispatch + extend DbCurrentTime include ApplicationHelper include Process @@ -23,6 +24,7 @@ class CrunchDispatch @docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN'] @docker_run_args = ENV['CRUNCH_JOB_DOCKER_RUN_ARGS'] + @cgroup_root = ENV['CRUNCH_CGROUP_ROOT'] @arvados_internal = Rails.configuration.git_internal_dir if not File.exists? @arvados_internal @@ -194,7 +196,7 @@ class CrunchDispatch nodelist end - def fail_job job, message + def fail_job job, message, skip_lock: false $stderr.puts "dispatch: #{job.uuid}: #{message}" begin Log.new(object_uuid: job.uuid, @@ -206,7 +208,7 @@ class CrunchDispatch $stderr.puts "dispatch: log.create failed" end - if not have_job_lock?(job) + if not skip_lock and not have_job_lock?(job) begin job.lock @authorizations[job.uuid].user.uuid rescue ArvadosModel::AlreadyLockedError @@ -339,16 +341,7 @@ class CrunchDispatch raise "Unknown crunch_job_wrapper: #{Server::Application.config.crunch_job_wrapper}" end - if Server::Application.config.crunch_job_user - cmd_args.unshift("sudo", "-E", "-u", - Server::Application.config.crunch_job_user, - "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}", - "PATH=#{ENV['PATH']}", - "PERLLIB=#{ENV['PERLLIB']}", - "PYTHONPATH=#{ENV['PYTHONPATH']}", - "RUBYLIB=#{ENV['RUBYLIB']}", - "GEM_PATH=#{ENV['GEM_PATH']}") - end + cmd_args = sudo_preface + cmd_args next unless get_authorization job @@ -362,7 +355,7 @@ class CrunchDispatch # reasonable thing to do at this point. repo = Repository.where(name: job.repository).first if repo.nil? or repo.server_path.nil? - fail_job "Repository #{job.repository} not found under #{@repo_root}" + fail_job job, "Repository #{job.repository} not found under #{@repo_root}" next end ready &&= get_commit repo.server_path, job.script_version @@ -392,6 +385,10 @@ class CrunchDispatch '--job', job.uuid, '--git-dir', @arvados_internal] + if @cgroup_root + cmd_args += ['--cgroup-root', @cgroup_root] + end + if @docker_bin cmd_args += ['--docker-bin', @docker_bin] end @@ -437,6 +434,8 @@ class CrunchDispatch log_throttle_bytes_so_far: 0, log_throttle_lines_so_far: 0, log_throttle_bytes_skipped: 0, + log_throttle_partial_line_last_at: Time.new(0), + log_throttle_first_partial_line: true, } i.close @todo_job_retries.delete(job.uuid) @@ -451,9 +450,23 @@ class CrunchDispatch message = false linesize = line.size if running_job[:log_throttle_is_open] - running_job[:log_throttle_lines_so_far] += 1 - running_job[:log_throttle_bytes_so_far] += linesize - running_job[:bytes_logged] += linesize + partial_line = false + skip_counts = false + matches = line.match(/^\S+ \S+ \d+ \d+ stderr (.*)/) + if matches and matches[1] and matches[1].start_with?('[...]') and matches[1].end_with?('[...]') + partial_line = true + if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.crunch_log_partial_line_throttle_period + running_job[:log_throttle_partial_line_last_at] = Time.now + else + skip_counts = true + end + end + + if !skip_counts + running_job[:log_throttle_lines_so_far] += 1 + running_job[:log_throttle_bytes_so_far] += linesize + running_job[:bytes_logged] += linesize + end if (running_job[:bytes_logged] > Rails.configuration.crunch_limit_log_bytes_per_job) @@ -464,14 +477,18 @@ class CrunchDispatch elsif (running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_log_throttle_bytes) remaining_time = running_job[:log_throttle_reset_time] - Time.now - message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds.\n" + message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds." running_job[:log_throttle_is_open] = false elsif (running_job[:log_throttle_lines_so_far] > Rails.configuration.crunch_log_throttle_lines) remaining_time = running_job[:log_throttle_reset_time] - Time.now - message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds.\n" + message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds." running_job[:log_throttle_is_open] = false + + elsif partial_line and running_job[:log_throttle_first_partial_line] + running_job[:log_throttle_first_partial_line] = false + message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.crunch_log_partial_line_throttle_period} seconds." end end @@ -483,8 +500,11 @@ class CrunchDispatch if message # Yes, write to logs, but use our "rate exceeded" message # instead of the log message that exceeded the limit. + message += " A complete log is still being written to Keep, and will be available when the job finishes.\n" line.replace message true + elsif partial_line + false else running_job[:log_throttle_is_open] end @@ -509,6 +529,8 @@ class CrunchDispatch j[:log_throttle_lines_so_far] = 0 j[:log_throttle_bytes_skipped] = 0 j[:log_throttle_is_open] = true + j[:log_throttle_partial_line_last_at] = Time.new(0) + j[:log_throttle_first_partial_line] = true end j[:buf].each do |stream, streambuf| @@ -645,9 +667,8 @@ class CrunchDispatch jobrecord = Job.find_by_uuid(job_done.uuid) - if exit_status == EXIT_RETRY_UNLOCKED - # The job failed because all of the nodes allocated to it - # failed. Only this crunch-dispatch process can retry the job: + if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid) + # Only this crunch-dispatch process can retry the job: # it's already locked, and there's no way to put it back in the # Queued state. Put it in our internal todo list unless the job # has failed this way excessively. @@ -810,6 +831,51 @@ class CrunchDispatch end end + def fail_jobs before: nil + act_as_system_user do + threshold = nil + if before == 'reboot' + boottime = nil + open('/proc/stat').map(&:split).each do |stat, t| + if stat == 'btime' + boottime = t + end + end + if not boottime + raise "Could not find btime in /proc/stat" + end + threshold = Time.at(boottime.to_i) + elsif before + threshold = Time.parse(before, Time.now) + else + threshold = db_current_time + end + Rails.logger.info "fail_jobs: threshold is #{threshold}" + + if Rails.configuration.crunch_job_wrapper == :slurm_immediate + # [["slurm_job_id", "slurm_job_name"], ...] + squeue = File.popen(['squeue', '-h', '-o', '%i %j']).readlines.map do |line| + line.strip.split(' ', 2) + end + else + squeue = [] + end + + Job.where('state = ? and started_at < ?', Job::Running, threshold). + each do |job| + Rails.logger.debug "fail_jobs: #{job.uuid} started #{job.started_at}" + squeue.each do |slurm_id, slurm_name| + if slurm_name == job.uuid + Rails.logger.info "fail_jobs: scancel #{slurm_id} for #{job.uuid}" + scancel slurm_id + end + end + fail_job(job, "cleaned up stale job: started before #{threshold}", + skip_lock: true) + end + end + end + protected def have_job_lock?(job) @@ -851,4 +917,24 @@ class CrunchDispatch running_job[:stderr_flushed_at] = Time.now end end + + def scancel slurm_id + cmd = sudo_preface + ['scancel', slurm_id] + puts File.popen(cmd).read + if not $?.success? + Rails.logger.error "scancel #{slurm_id.shellescape}: $?" + end + end + + def sudo_preface + return [] if not Server::Application.config.crunch_job_user + ["sudo", "-E", "-u", + Server::Application.config.crunch_job_user, + "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}", + "PATH=#{ENV['PATH']}", + "PERLLIB=#{ENV['PERLLIB']}", + "PYTHONPATH=#{ENV['PYTHONPATH']}", + "RUBYLIB=#{ENV['RUBYLIB']}", + "GEM_PATH=#{ENV['GEM_PATH']}"] + end end