X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/19cb98ad222177fb7dd3613282446060e74dd2ce..9b0654adfffaac018395de29f6e441b843d46e85:/services/api/lib/crunch_dispatch.rb diff --git a/services/api/lib/crunch_dispatch.rb b/services/api/lib/crunch_dispatch.rb index 11270c39ce..48b0eb5983 100644 --- a/services/api/lib/crunch_dispatch.rb +++ b/services/api/lib/crunch_dispatch.rb @@ -27,7 +27,7 @@ class CrunchDispatch @cgroup_root = ENV['CRUNCH_CGROUP_ROOT'] @arvados_internal = Rails.configuration.git_internal_dir - if not File.exists? @arvados_internal + if not File.exist? @arvados_internal $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}` raise "No internal git repository available" unless ($? == 0) end @@ -73,7 +73,7 @@ class CrunchDispatch # into multiple rows with one hostname each. `#{cmd} --noheader -o '%N:#{outfmt}'`.each_line do |line| tokens = line.chomp.split(":", max_fields) - if (re = tokens[0].match /^(.*?)\[([-,\d]+)\]$/) + if (re = tokens[0].match(/^(.*?)\[([-,\d]+)\]$/)) tokens.shift re[2].split(",").each do |range| range = range.split("-").collect(&:to_i) @@ -105,7 +105,7 @@ class CrunchDispatch end def update_node_status - return unless Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/ + return unless Server::Application.config.crunch_job_wrapper.to_s.match(/^slurm/) slurm_status.each_pair do |hostname, slurmdata| next if @node_state[hostname] == slurmdata begin @@ -169,7 +169,7 @@ class CrunchDispatch end usable_nodes << node if usable_nodes.count >= min_node_count - return usable_nodes.map { |node| node.hostname } + return usable_nodes.map { |n| n.hostname } end end nil @@ -434,6 +434,8 @@ class CrunchDispatch log_throttle_bytes_so_far: 0, log_throttle_lines_so_far: 0, log_throttle_bytes_skipped: 0, + log_throttle_partial_line_last_at: Time.new(0), + log_throttle_first_partial_line: true, } i.close @todo_job_retries.delete(job.uuid) @@ -448,9 +450,23 @@ class CrunchDispatch message = false linesize = line.size if running_job[:log_throttle_is_open] - running_job[:log_throttle_lines_so_far] += 1 - running_job[:log_throttle_bytes_so_far] += linesize - running_job[:bytes_logged] += linesize + partial_line = false + skip_counts = false + matches = line.match(/^\S+ \S+ \d+ \d+ stderr (.*)/) + if matches and matches[1] and matches[1].start_with?('[...]') and matches[1].end_with?('[...]') + partial_line = true + if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.crunch_log_partial_line_throttle_period + running_job[:log_throttle_partial_line_last_at] = Time.now + else + skip_counts = true + end + end + + if !skip_counts + running_job[:log_throttle_lines_so_far] += 1 + running_job[:log_throttle_bytes_so_far] += linesize + running_job[:bytes_logged] += linesize + end if (running_job[:bytes_logged] > Rails.configuration.crunch_limit_log_bytes_per_job) @@ -461,14 +477,18 @@ class CrunchDispatch elsif (running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_log_throttle_bytes) remaining_time = running_job[:log_throttle_reset_time] - Time.now - message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds.\n" + message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds." running_job[:log_throttle_is_open] = false elsif (running_job[:log_throttle_lines_so_far] > Rails.configuration.crunch_log_throttle_lines) remaining_time = running_job[:log_throttle_reset_time] - Time.now - message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds.\n" + message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds." running_job[:log_throttle_is_open] = false + + elsif partial_line and running_job[:log_throttle_first_partial_line] + running_job[:log_throttle_first_partial_line] = false + message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.crunch_log_partial_line_throttle_period} seconds." end end @@ -480,8 +500,11 @@ class CrunchDispatch if message # Yes, write to logs, but use our "rate exceeded" message # instead of the log message that exceeded the limit. + message += " A complete log is still being written to Keep, and will be available when the job finishes.\n" line.replace message true + elsif partial_line + false else running_job[:log_throttle_is_open] end @@ -489,8 +512,6 @@ class CrunchDispatch def read_pipes @running.each do |job_uuid, j| - job = j[:job] - now = Time.now if now > j[:log_throttle_reset_time] # It has been more than throttle_period seconds since the last @@ -506,6 +527,8 @@ class CrunchDispatch j[:log_throttle_lines_so_far] = 0 j[:log_throttle_bytes_skipped] = 0 j[:log_throttle_is_open] = true + j[:log_throttle_partial_line_last_at] = Time.new(0) + j[:log_throttle_first_partial_line] = true end j[:buf].each do |stream, streambuf|