X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/378e6e0cd313541c395893e832e82a85856d5105..a8378b8deaa2bbf9d2c154d9d9bb072538c288cc:/services/api/lib/crunch_dispatch.rb diff --git a/services/api/lib/crunch_dispatch.rb b/services/api/lib/crunch_dispatch.rb index 05f85c7bb6..bea1657de2 100644 --- a/services/api/lib/crunch_dispatch.rb +++ b/services/api/lib/crunch_dispatch.rb @@ -24,9 +24,10 @@ class CrunchDispatch @docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN'] @docker_run_args = ENV['CRUNCH_JOB_DOCKER_RUN_ARGS'] + @cgroup_root = ENV['CRUNCH_CGROUP_ROOT'] @arvados_internal = Rails.configuration.git_internal_dir - if not File.exists? @arvados_internal + if not File.exist? @arvados_internal $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}` raise "No internal git repository available" unless ($? == 0) end @@ -72,7 +73,7 @@ class CrunchDispatch # into multiple rows with one hostname each. `#{cmd} --noheader -o '%N:#{outfmt}'`.each_line do |line| tokens = line.chomp.split(":", max_fields) - if (re = tokens[0].match /^(.*?)\[([-,\d]+)\]$/) + if (re = tokens[0].match(/^(.*?)\[([-,\d]+)\]$/)) tokens.shift re[2].split(",").each do |range| range = range.split("-").collect(&:to_i) @@ -94,7 +95,7 @@ class CrunchDispatch # hasn't been able to communicate with it recently. state.sub!(/^idle\*/, "down") state.sub!(/\W+$/, "") - state = "down" unless %w(idle alloc down).include?(state) + state = "down" unless %w(idle alloc comp mix drng down).include?(state) slurm_nodes[hostname] = {state: state, job: nil} end each_slurm_line("squeue", "%j") do |hostname, job_uuid| @@ -104,7 +105,7 @@ class CrunchDispatch end def update_node_status - return unless Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/ + return unless Server::Application.config.crunch_job_wrapper.to_s.match(/^slurm/) slurm_status.each_pair do |hostname, slurmdata| next if @node_state[hostname] == slurmdata begin @@ -168,7 +169,25 @@ class CrunchDispatch end usable_nodes << node if usable_nodes.count >= min_node_count - return usable_nodes.map { |node| node.hostname } + hostnames = usable_nodes.map(&:hostname) + log_nodes = usable_nodes.map do |n| + "#{n.hostname} #{n.uuid} #{n.properties.to_json}" + end + log_job = "#{job.uuid} #{job.runtime_constraints}" + log_text = "dispatching job #{log_job} to #{log_nodes.join(", ")}" + $stderr.puts log_text + begin + act_as_system_user do + Log.new(object_uuid: job.uuid, + event_type: 'dispatch', + owner_uuid: system_user_uuid, + summary: "dispatching to #{hostnames.join(", ")}", + properties: {'text' => log_text}).save! + end + rescue => e + $stderr.puts "dispatch: log.create failed: #{e}" + end + return hostnames end end nil @@ -203,8 +222,8 @@ class CrunchDispatch owner_uuid: job.owner_uuid, summary: message, properties: {"text" => message}).save! - rescue - $stderr.puts "dispatch: log.create failed" + rescue => e + $stderr.puts "dispatch: log.create failed: #{e}" end if not skip_lock and not have_job_lock?(job) @@ -384,6 +403,10 @@ class CrunchDispatch '--job', job.uuid, '--git-dir', @arvados_internal] + if @cgroup_root + cmd_args += ['--cgroup-root', @cgroup_root] + end + if @docker_bin cmd_args += ['--docker-bin', @docker_bin] end @@ -429,6 +452,8 @@ class CrunchDispatch log_throttle_bytes_so_far: 0, log_throttle_lines_so_far: 0, log_throttle_bytes_skipped: 0, + log_throttle_partial_line_last_at: Time.new(0), + log_throttle_first_partial_line: true, } i.close @todo_job_retries.delete(job.uuid) @@ -443,9 +468,23 @@ class CrunchDispatch message = false linesize = line.size if running_job[:log_throttle_is_open] - running_job[:log_throttle_lines_so_far] += 1 - running_job[:log_throttle_bytes_so_far] += linesize - running_job[:bytes_logged] += linesize + partial_line = false + skip_counts = false + matches = line.match(/^\S+ \S+ \d+ \d+ stderr (.*)/) + if matches and matches[1] and matches[1].start_with?('[...]') and matches[1].end_with?('[...]') + partial_line = true + if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.crunch_log_partial_line_throttle_period + running_job[:log_throttle_partial_line_last_at] = Time.now + else + skip_counts = true + end + end + + if !skip_counts + running_job[:log_throttle_lines_so_far] += 1 + running_job[:log_throttle_bytes_so_far] += linesize + running_job[:bytes_logged] += linesize + end if (running_job[:bytes_logged] > Rails.configuration.crunch_limit_log_bytes_per_job) @@ -456,14 +495,18 @@ class CrunchDispatch elsif (running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_log_throttle_bytes) remaining_time = running_job[:log_throttle_reset_time] - Time.now - message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds.\n" + message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds." running_job[:log_throttle_is_open] = false elsif (running_job[:log_throttle_lines_so_far] > Rails.configuration.crunch_log_throttle_lines) remaining_time = running_job[:log_throttle_reset_time] - Time.now - message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds.\n" + message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds." running_job[:log_throttle_is_open] = false + + elsif partial_line and running_job[:log_throttle_first_partial_line] + running_job[:log_throttle_first_partial_line] = false + message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.crunch_log_partial_line_throttle_period} seconds." end end @@ -475,8 +518,11 @@ class CrunchDispatch if message # Yes, write to logs, but use our "rate exceeded" message # instead of the log message that exceeded the limit. + message += " A complete log is still being written to Keep, and will be available when the job finishes.\n" line.replace message true + elsif partial_line + false else running_job[:log_throttle_is_open] end @@ -484,8 +530,6 @@ class CrunchDispatch def read_pipes @running.each do |job_uuid, j| - job = j[:job] - now = Time.now if now > j[:log_throttle_reset_time] # It has been more than throttle_period seconds since the last @@ -501,6 +545,8 @@ class CrunchDispatch j[:log_throttle_lines_so_far] = 0 j[:log_throttle_bytes_skipped] = 0 j[:log_throttle_is_open] = true + j[:log_throttle_partial_line_last_at] = Time.new(0) + j[:log_throttle_first_partial_line] = true end j[:buf].each do |stream, streambuf| @@ -637,9 +683,8 @@ class CrunchDispatch jobrecord = Job.find_by_uuid(job_done.uuid) - if exit_status == EXIT_RETRY_UNLOCKED - # The job failed because all of the nodes allocated to it - # failed. Only this crunch-dispatch process can retry the job: + if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid) + # Only this crunch-dispatch process can retry the job: # it's already locked, and there's no way to put it back in the # Queued state. Put it in our internal todo list unless the job # has failed this way excessively. @@ -787,6 +832,9 @@ class CrunchDispatch unless (@todo_pipelines.empty? and @pipe_auth_tokens.empty?) or did_recently(:update_pipelines, 5.0) update_pipelines end + unless did_recently('check_orphaned_slurm_jobs', 60) + check_orphaned_slurm_jobs + end end reap_children select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten, @@ -823,22 +871,14 @@ class CrunchDispatch end Rails.logger.info "fail_jobs: threshold is #{threshold}" - if Rails.configuration.crunch_job_wrapper == :slurm_immediate - # [["slurm_job_id", "slurm_job_name"], ...] - squeue = File.popen(['squeue', '-h', '-o', '%i %j']).readlines.map do |line| - line.strip.split(' ', 2) - end - else - squeue = [] - end - + squeue = squeue_jobs Job.where('state = ? and started_at < ?', Job::Running, threshold). each do |job| Rails.logger.debug "fail_jobs: #{job.uuid} started #{job.started_at}" - squeue.each do |slurm_id, slurm_name| + squeue.each do |slurm_name| if slurm_name == job.uuid - Rails.logger.info "fail_jobs: scancel #{slurm_id} for #{job.uuid}" - scancel slurm_id + Rails.logger.info "fail_jobs: scancel #{job.uuid}" + scancel slurm_name end end fail_job(job, "cleaned up stale job: started before #{threshold}", @@ -847,6 +887,37 @@ class CrunchDispatch end end + def check_orphaned_slurm_jobs + act_as_system_user do + squeue_uuids = squeue_jobs.select{|uuid| uuid.match(HasUuid::UUID_REGEX)}. + select{|uuid| !@running.has_key?(uuid)} + + return if squeue_uuids.size == 0 + + scancel_uuids = squeue_uuids - Job.where('uuid in (?) and (state in (?) or modified_at>?)', + squeue_uuids, + ['Running', 'Queued'], + (Time.now - 60)). + collect(&:uuid) + scancel_uuids.each do |uuid| + Rails.logger.info "orphaned job: scancel #{uuid}" + scancel uuid + end + end + end + + def sudo_preface + return [] if not Server::Application.config.crunch_job_user + ["sudo", "-E", "-u", + Server::Application.config.crunch_job_user, + "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}", + "PATH=#{ENV['PATH']}", + "PERLLIB=#{ENV['PERLLIB']}", + "PYTHONPATH=#{ENV['PYTHONPATH']}", + "RUBYLIB=#{ENV['RUBYLIB']}", + "GEM_PATH=#{ENV['GEM_PATH']}"] + end + protected def have_job_lock?(job) @@ -889,23 +960,27 @@ class CrunchDispatch end end - def scancel slurm_id - cmd = sudo_preface + ['scancel', slurm_id] - puts File.popen(cmd).read - if not $?.success? - Rails.logger.error "scancel #{slurm_id.shellescape}: $?" + # An array of job_uuids in squeue + def squeue_jobs + if Rails.configuration.crunch_job_wrapper == :slurm_immediate + p = IO.popen(['squeue', '-a', '-h', '-o', '%j']) + begin + p.readlines.map {|line| line.strip} + ensure + p.close + end + else + [] end end - def sudo_preface - return [] if not Server::Application.config.crunch_job_user - ["sudo", "-E", "-u", - Server::Application.config.crunch_job_user, - "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}", - "PATH=#{ENV['PATH']}", - "PERLLIB=#{ENV['PERLLIB']}", - "PYTHONPATH=#{ENV['PYTHONPATH']}", - "RUBYLIB=#{ENV['RUBYLIB']}", - "GEM_PATH=#{ENV['GEM_PATH']}"] + def scancel slurm_name + cmd = sudo_preface + ['scancel', '-n', slurm_name] + IO.popen(cmd) do |scancel_pipe| + puts scancel_pipe.read + end + if not $?.success? + Rails.logger.error "scancel #{slurm_name.shellescape}: $?" + end end end