X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/9b0654adfffaac018395de29f6e441b843d46e85..0cd5f7190f1c841051a0c950ea3cf74477cf20e7:/services/api/lib/crunch_dispatch.rb diff --git a/services/api/lib/crunch_dispatch.rb b/services/api/lib/crunch_dispatch.rb index 48b0eb5983..4e640186d1 100644 --- a/services/api/lib/crunch_dispatch.rb +++ b/services/api/lib/crunch_dispatch.rb @@ -1,3 +1,7 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 + require 'open3' require 'shellwords' @@ -25,14 +29,15 @@ class CrunchDispatch @docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN'] @docker_run_args = ENV['CRUNCH_JOB_DOCKER_RUN_ARGS'] @cgroup_root = ENV['CRUNCH_CGROUP_ROOT'] + @srun_sync_timeout = ENV['CRUNCH_SRUN_SYNC_TIMEOUT'] - @arvados_internal = Rails.configuration.git_internal_dir + @arvados_internal = Rails.configuration.Containers.JobsAPI.GitInternalDir if not File.exist? @arvados_internal $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}` raise "No internal git repository available" unless ($? == 0) end - @repo_root = Rails.configuration.git_repositories_dir + @repo_root = Rails.configuration.Git.Repositories @arvados_repo_path = Repository.where(name: "arvados").first.server_path @authorizations = {} @did_recently = {} @@ -95,7 +100,7 @@ class CrunchDispatch # hasn't been able to communicate with it recently. state.sub!(/^idle\*/, "down") state.sub!(/\W+$/, "") - state = "down" unless %w(idle alloc down).include?(state) + state = "down" unless %w(idle alloc comp mix drng down).include?(state) slurm_nodes[hostname] = {state: state, job: nil} end each_slurm_line("squeue", "%j") do |hostname, job_uuid| @@ -105,7 +110,7 @@ class CrunchDispatch end def update_node_status - return unless Server::Application.config.crunch_job_wrapper.to_s.match(/^slurm/) + return unless Rails.configuration.Containers.JobsAPI.CrunchJobWrapper.to_s.match(/^slurm/) slurm_status.each_pair do |hostname, slurmdata| next if @node_state[hostname] == slurmdata begin @@ -169,7 +174,25 @@ class CrunchDispatch end usable_nodes << node if usable_nodes.count >= min_node_count - return usable_nodes.map { |n| n.hostname } + hostnames = usable_nodes.map(&:hostname) + log_nodes = usable_nodes.map do |n| + "#{n.hostname} #{n.uuid} #{n.properties.to_json}" + end + log_job = "#{job.uuid} #{job.runtime_constraints}" + log_text = "dispatching job #{log_job} to #{log_nodes.join(", ")}" + $stderr.puts log_text + begin + act_as_system_user do + Log.new(object_uuid: job.uuid, + event_type: 'dispatch', + owner_uuid: system_user_uuid, + summary: "dispatching to #{hostnames.join(", ")}", + properties: {'text' => log_text}).save! + end + rescue => e + $stderr.puts "dispatch: log.create failed: #{e}" + end + return hostnames end end nil @@ -204,8 +227,8 @@ class CrunchDispatch owner_uuid: job.owner_uuid, summary: message, properties: {"text" => message}).save! - rescue - $stderr.puts "dispatch: log.create failed" + rescue => e + $stderr.puts "dispatch: log.create failed: #{e}" end if not skip_lock and not have_job_lock?(job) @@ -275,7 +298,7 @@ class CrunchDispatch @fetched_commits[sha1] = ($? == 0) end - def tag_commit(commit_hash, tag_name) + def tag_commit(job, commit_hash, tag_name) # @git_tags[T]==V if we know commit V has been tagged T in the # arvados_internal repository. if not @git_tags[tag_name] @@ -314,14 +337,14 @@ class CrunchDispatch next if @running[job.uuid] cmd_args = nil - case Server::Application.config.crunch_job_wrapper - when :none + case Rails.configuration.Containers.JobsAPI.CrunchJobWrapper + when "none" if @running.size > 0 # Don't run more than one at a time. return end cmd_args = [] - when :slurm_immediate + when "slurm_immediate" nodelist = nodes_available_for_job(job) if nodelist.nil? if Time.now < @node_wait_deadline @@ -338,7 +361,7 @@ class CrunchDispatch "--job-name=#{job.uuid}", "--nodelist=#{nodelist.join(',')}"] else - raise "Unknown crunch_job_wrapper: #{Server::Application.config.crunch_job_wrapper}" + raise "Unknown crunch_job_wrapper: #{Rails.configuration.Containers.JobsAPI.CrunchJobWrapper}" end cmd_args = sudo_preface + cmd_args @@ -359,20 +382,20 @@ class CrunchDispatch next end ready &&= get_commit repo.server_path, job.script_version - ready &&= tag_commit job.script_version, job.uuid + ready &&= tag_commit job, job.script_version, job.uuid end # This should be unnecessary, because API server does it during # job create/update, but it's still not a bad idea to verify the # tag is correct before starting the job: - ready &&= tag_commit job.script_version, job.uuid + ready &&= tag_commit job, job.script_version, job.uuid # The arvados_sdk_version doesn't support use of arbitrary # remote URLs, so the requested version isn't necessarily copied # into the internal repository yet. if job.arvados_sdk_version ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version - ready &&= tag_commit job.arvados_sdk_version, "#{job.uuid}-arvados-sdk" + ready &&= tag_commit job, job.arvados_sdk_version, "#{job.uuid}-arvados-sdk" end if not ready @@ -397,6 +420,10 @@ class CrunchDispatch cmd_args += ['--docker-run-args', @docker_run_args] end + if @srun_sync_timeout + cmd_args += ['--srun-sync-timeout', @srun_sync_timeout] + end + if have_job_lock?(job) cmd_args << "--force-unlock" end @@ -407,8 +434,11 @@ class CrunchDispatch i, o, e, t = Open3.popen3(*cmd_args) rescue $stderr.puts "dispatch: popen3: #{$!}" - sleep 1 - next + # This is a dispatch problem like "Too many open files"; + # retrying another job right away would be futile. Just return + # and hope things are better next time, after (at least) a + # did_recently() delay. + return end $stderr.puts "dispatch: job #{job.uuid}" @@ -430,7 +460,7 @@ class CrunchDispatch bytes_logged: 0, events_logged: 0, log_throttle_is_open: true, - log_throttle_reset_time: Time.now + Rails.configuration.crunch_log_throttle_period, + log_throttle_reset_time: Time.now + Rails.configuration.Containers.Logging.LogThrottlePeriod, log_throttle_bytes_so_far: 0, log_throttle_lines_so_far: 0, log_throttle_bytes_skipped: 0, @@ -455,7 +485,7 @@ class CrunchDispatch matches = line.match(/^\S+ \S+ \d+ \d+ stderr (.*)/) if matches and matches[1] and matches[1].start_with?('[...]') and matches[1].end_with?('[...]') partial_line = true - if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.crunch_log_partial_line_throttle_period + if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.Containers.Logging.LogPartialLineThrottlePeriod running_job[:log_throttle_partial_line_last_at] = Time.now else skip_counts = true @@ -469,26 +499,26 @@ class CrunchDispatch end if (running_job[:bytes_logged] > - Rails.configuration.crunch_limit_log_bytes_per_job) - message = "Exceeded log limit #{Rails.configuration.crunch_limit_log_bytes_per_job} bytes (crunch_limit_log_bytes_per_job). Log will be truncated." + Rails.configuration.Containers.Logging.LimitLogBytesPerJob) + message = "Exceeded log limit #{Rails.configuration.Containers.Logging.LimitLogBytesPerJob} bytes (LimitLogBytesPerJob). Log will be truncated." running_job[:log_throttle_reset_time] = Time.now + 100.years running_job[:log_throttle_is_open] = false elsif (running_job[:log_throttle_bytes_so_far] > - Rails.configuration.crunch_log_throttle_bytes) + Rails.configuration.Containers.Logging.LogThrottleBytes) remaining_time = running_job[:log_throttle_reset_time] - Time.now - message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds." + message = "Exceeded rate #{Rails.configuration.Containers.Logging.LogThrottleBytes} bytes per #{Rails.configuration.Containers.Logging.LogThrottlePeriod} seconds (LogThrottleBytes). Logging will be silenced for the next #{remaining_time.round} seconds." running_job[:log_throttle_is_open] = false elsif (running_job[:log_throttle_lines_so_far] > - Rails.configuration.crunch_log_throttle_lines) + Rails.configuration.Containers.Logging.LogThrottleLines) remaining_time = running_job[:log_throttle_reset_time] - Time.now - message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds." + message = "Exceeded rate #{Rails.configuration.Containers.Logging.LogThrottleLines} lines per #{Rails.configuration.Containers.Logging.LogThrottlePeriod} seconds (LogThrottleLines), logging will be silenced for the next #{remaining_time.round} seconds." running_job[:log_throttle_is_open] = false elsif partial_line and running_job[:log_throttle_first_partial_line] running_job[:log_throttle_first_partial_line] = false - message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.crunch_log_partial_line_throttle_period} seconds." + message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.Containers.Logging.LogPartialLineThrottlePeriod} seconds." end end @@ -522,7 +552,7 @@ class CrunchDispatch j[:stderr_buf_to_flush] << "#{LogTime.now} #{message}\n" end - j[:log_throttle_reset_time] = now + Rails.configuration.crunch_log_throttle_period + j[:log_throttle_reset_time] = now + Rails.configuration.Containers.Logging.LogThrottlePeriod j[:log_throttle_bytes_so_far] = 0 j[:log_throttle_lines_so_far] = 0 j[:log_throttle_bytes_skipped] = 0 @@ -562,7 +592,7 @@ class CrunchDispatch bufend = '' streambuf.each_line do |line| if not line.end_with? $/ - if line.size > Rails.configuration.crunch_log_throttle_bytes + if line.size > Rails.configuration.Containers.Logging.LogThrottleBytes # Without a limit here, we'll use 2x an arbitrary amount # of memory, and waste a lot of time copying strings # around, all without providing any feedback to anyone @@ -611,31 +641,11 @@ class CrunchDispatch pid_done = nil j_done = nil - if false - begin - pid_done = waitpid(-1, Process::WNOHANG | Process::WUNTRACED) - if pid_done - j_done = @running.values. - select { |j| j[:wait_thr].pid == pid_done }. - first - end - rescue SystemCallError - # I have @running processes but system reports I have no - # children. This is likely to happen repeatedly if it happens at - # all; I will log this no more than once per child process I - # start. - if 0 < @running.select { |uuid,j| j[:warned_waitpid_error].nil? }.size - children = @running.values.collect { |j| j[:wait_thr].pid }.join ' ' - $stderr.puts "dispatch: IPC bug: waitpid() error (#{$!}), but I have children #{children}" - end - @running.each do |uuid,j| j[:warned_waitpid_error] = true end - end - else - @running.each do |uuid, j| - if j[:wait_thr].status == false - pid_done = j[:wait_thr].pid - j_done = j - end + @running.each do |uuid, j| + if !j[:wait_thr].status + pid_done = j[:wait_thr].pid + j_done = j + break end end @@ -666,17 +676,20 @@ class CrunchDispatch jobrecord = Job.find_by_uuid(job_done.uuid) if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid) + $stderr.puts("dispatch: job #{jobrecord.uuid} was interrupted by node failure") # Only this crunch-dispatch process can retry the job: # it's already locked, and there's no way to put it back in the # Queued state. Put it in our internal todo list unless the job # has failed this way excessively. @job_retry_counts[jobrecord.uuid] += 1 exit_tempfail = @job_retry_counts[jobrecord.uuid] <= RETRY_UNLOCKED_LIMIT + do_what_next = "give up now" if exit_tempfail @todo_job_retries[jobrecord.uuid] = jobrecord - else - $stderr.puts("dispatch: job #{jobrecord.uuid} exceeded node failure retry limit -- giving up") + do_what_next = "re-attempt" end + $stderr.puts("dispatch: job #{jobrecord.uuid} has been interrupted " + + "#{@job_retry_counts[jobrecord.uuid]}x, will #{do_what_next}") end if !exit_tempfail @@ -762,7 +775,7 @@ class CrunchDispatch # This is how crunch-job child procs know where the "refresh" # trigger file is - ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.crunch_refresh_trigger + ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.Containers.JobsAPI.CrunchRefreshTrigger # If salloc can't allocate resources immediately, make it use our # temporary failure exit code. This ensures crunch-dispatch won't @@ -814,6 +827,9 @@ class CrunchDispatch unless (@todo_pipelines.empty? and @pipe_auth_tokens.empty?) or did_recently(:update_pipelines, 5.0) update_pipelines end + unless did_recently('check_orphaned_slurm_jobs', 60) + check_orphaned_slurm_jobs + end end reap_children select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten, @@ -850,22 +866,14 @@ class CrunchDispatch end Rails.logger.info "fail_jobs: threshold is #{threshold}" - if Rails.configuration.crunch_job_wrapper == :slurm_immediate - # [["slurm_job_id", "slurm_job_name"], ...] - squeue = File.popen(['squeue', '-h', '-o', '%i %j']).readlines.map do |line| - line.strip.split(' ', 2) - end - else - squeue = [] - end - + squeue = squeue_jobs Job.where('state = ? and started_at < ?', Job::Running, threshold). each do |job| Rails.logger.debug "fail_jobs: #{job.uuid} started #{job.started_at}" - squeue.each do |slurm_id, slurm_name| + squeue.each do |slurm_name| if slurm_name == job.uuid - Rails.logger.info "fail_jobs: scancel #{slurm_id} for #{job.uuid}" - scancel slurm_id + Rails.logger.info "fail_jobs: scancel #{job.uuid}" + scancel slurm_name end end fail_job(job, "cleaned up stale job: started before #{threshold}", @@ -874,6 +882,37 @@ class CrunchDispatch end end + def check_orphaned_slurm_jobs + act_as_system_user do + squeue_uuids = squeue_jobs.select{|uuid| uuid.match(/^[0-9a-z]{5}-8i9sb-[0-9a-z]{15}$/)}. + select{|uuid| !@running.has_key?(uuid)} + + return if squeue_uuids.size == 0 + + scancel_uuids = squeue_uuids - Job.where('uuid in (?) and (state in (?) or modified_at>?)', + squeue_uuids, + ['Running', 'Queued'], + (Time.now - 60)). + collect(&:uuid) + scancel_uuids.each do |uuid| + Rails.logger.info "orphaned job: scancel #{uuid}" + scancel uuid + end + end + end + + def sudo_preface + return [] if not Rails.configuration.Containers.JobsAPI.CrunchJobUser + ["sudo", "-E", "-u", + Rails.configuration.Containers.JobsAPI.CrunchJobUser, + "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}", + "PATH=#{ENV['PATH']}", + "PERLLIB=#{ENV['PERLLIB']}", + "PYTHONPATH=#{ENV['PYTHONPATH']}", + "RUBYLIB=#{ENV['RUBYLIB']}", + "GEM_PATH=#{ENV['GEM_PATH']}"] + end + protected def have_job_lock?(job) @@ -898,8 +937,8 @@ class CrunchDispatch # Send out to log event if buffer size exceeds the bytes per event or if # it has been at least crunch_log_seconds_between_events seconds since # the last flush. - if running_job[:stderr_buf_to_flush].size > Rails.configuration.crunch_log_bytes_per_event or - (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events + if running_job[:stderr_buf_to_flush].size > Rails.configuration.Containers.Logging.LogBytesPerEvent or + (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.Containers.Logging.LogSecondsBetweenEvents begin log = Log.new(object_uuid: running_job[:job].uuid, event_type: 'stderr', @@ -916,23 +955,27 @@ class CrunchDispatch end end - def scancel slurm_id - cmd = sudo_preface + ['scancel', slurm_id] - puts File.popen(cmd).read - if not $?.success? - Rails.logger.error "scancel #{slurm_id.shellescape}: $?" + # An array of job_uuids in squeue + def squeue_jobs + if Rails.configuration.Containers.JobsAPI.CrunchJobWrapper == "slurm_immediate" + p = IO.popen(['squeue', '-a', '-h', '-o', '%j']) + begin + p.readlines.map {|line| line.strip} + ensure + p.close + end + else + [] end end - def sudo_preface - return [] if not Server::Application.config.crunch_job_user - ["sudo", "-E", "-u", - Server::Application.config.crunch_job_user, - "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}", - "PATH=#{ENV['PATH']}", - "PERLLIB=#{ENV['PERLLIB']}", - "PYTHONPATH=#{ENV['PYTHONPATH']}", - "RUBYLIB=#{ENV['RUBYLIB']}", - "GEM_PATH=#{ENV['GEM_PATH']}"] + def scancel slurm_name + cmd = sudo_preface + ['scancel', '-n', slurm_name] + IO.popen(cmd) do |scancel_pipe| + puts scancel_pipe.read + end + if not $?.success? + Rails.logger.error "scancel #{slurm_name.shellescape}: $?" + end end end