From bd99df2e6c5f7bb0b75c7fa2f1a9c1c2defeca8a Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 1 Oct 2014 17:12:52 -0400 Subject: [PATCH] 3769: Add rate_limit with log throttling logic. Multiple configuration parameters to throttle excessive logging by bytes, lines, logs table events, and total logged output. --- services/api/config/application.default.yml | 25 ++- services/api/script/crunch-dispatch.rb | 191 +++++++++++--------- 2 files changed, 118 insertions(+), 98 deletions(-) diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml index 7185810b94..648d1ade34 100644 --- a/services/api/config/application.default.yml +++ b/services/api/config/application.default.yml @@ -80,16 +80,6 @@ common: # Maximum number of log events that may be generated by a single job. crunch_limit_log_events_per_job: 65536 - # Maximum number of total bytes that may be logged by a single job. - crunch_limit_log_event_bytes_per_job: 67108864 - - # The sample period for throttling logs, in seconds (see below) - crunch_limit_log_event_throttle_period: 60 - - # Maximum number of bytes that job can log over - # crunch_limit_log_event_throttle_period before being silenced - crunch_limit_log_event_throttle_rate: 65536 - # These two settings control how frequently log events are flushed # to the database. If a job generates two or more events within # crunch_log_seconds_between_events, the log data is not flushed @@ -97,6 +87,21 @@ common: crunch_log_bytes_per_event: 4096 crunch_log_seconds_between_events: 1 + # Maximum number of total bytes that may be logged by a single job. Logs + # that are throttled (see below) are not counted against this total. + crunch_limit_log_bytes_per_job: 67108864 + + # The sample period for throttling logs, in seconds (see below) + crunch_log_throttle_period: 30 + + # Maximum number of bytes that job can log over + # crunch_limit_log_event_throttle_period before being silenced + crunch_log_throttle_bytes: 60000 + + # Maximum number of lines that job can log over + # crunch_limit_log_event_throttle_period before being silenced + crunch_log_throttle_lines: 1000 + # Path to /etc/dnsmasq.d, or false = do not update dnsmasq data. dnsmasq_conf_dir: false diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb index ff9d5f6c02..c9ce92c849 100755 --- a/services/api/script/crunch-dispatch.rb +++ b/services/api/script/crunch-dispatch.rb @@ -334,17 +334,17 @@ class Dispatcher stderr: e, wait_thr: t, job: job, - buf: {:stdout => '', :stderr => ''}, + buf: {:stderr => '', :stdout => ''}, started: false, sent_int: 0, job_auth: job_auth, stderr_buf_to_flush: '', - stderr_flushed_at: 0, + stderr_flushed_at: Time.new(0), bytes_logged: 0, events_logged: 0, - log_truncated: false, - log_throttle_timestamp: 0, + log_throttle_timestamp: Time.new(0), log_throttle_bytes_so_far: 0, + log_throttle_lines_so_far: 0, log_throttle_bytes_skipped: 0, } i.close @@ -352,10 +352,72 @@ class Dispatcher end end + # Test for hard cap on total output and for log throttling. Returns whether + # the log line should go to output or not. Modifies "line" in place to + # replace it with an error if a logging limit is tripped. + def rate_limit running_job, line + if running_job[:bytes_logged] > Rails.configuration.crunch_limit_log_bytes_per_job + # Don't log anything if the hard cap has already been exceeded + return false + end + + now = Time.now + throttle_period = Rails.configuration.crunch_log_throttle_period + + #puts "Handle line at #{now - running_job[:log_throttle_timestamp]}, buf bytes #{line.size}, so far #{running_job[:log_throttle_bytes_so_far]}, throttled #{running_job[:log_throttle_bytes_skipped]}" + + if running_job[:log_throttle_bytes_skipped] > 0 + # We've skipped some log in the current time period already, so continue to + # skip the log + running_job[:log_throttle_bytes_skipped] += line.size + return false + end + + # Count lines and bytes logged in this period, and total bytes logged for the job + running_job[:log_throttle_lines_so_far] += 1 + running_job[:log_throttle_bytes_so_far] += line.size + running_job[:bytes_logged] += line.size + + if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_log_throttle_bytes or + running_job[:log_throttle_lines_so_far] > Rails.configuration.crunch_log_throttle_lines + # We've exceeded the per-period throttle, so start skipping + running_job[:log_throttle_bytes_skipped] += line.size + + # Replace log line with a message about skipping the log + remaining_time = throttle_period - (now - running_job[:log_throttle_timestamp]) + if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_log_throttle_bytes + line.replace "Exceeded crunch_log_throttle_bytes rate of #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{throttle_period} seconds, logging will be silenced for the next #{remaining_time.round} seconds\n" + else + line.replace "Exceeded crunch_log_throttle_lines rate of #{Rails.configuration.crunch_log_throttle_lines} lines per #{throttle_period} seconds, logging will be silenced for the next #{remaining_time.round} seconds\n" + end + end + + if running_job[:bytes_logged] > Rails.configuration.crunch_limit_log_bytes_per_job + # Replace log line with a message about truncating the log + line.replace "Exceed hard job log cap crunch_limit_log_bytes_per_job of #{Rails.configuration.crunch_limit_log_bytes_per_job}. Subsequent logs will be truncated." + end + + true + end + def read_pipes @running.each do |job_uuid, j| job = j[:job] + now = Time.now + if (now - j[:log_throttle_timestamp]) > Rails.configuration.crunch_log_throttle_period + # It has been more than throttle_period seconds since the last checkpoint so reset the + # throttle + if j[:log_throttle_bytes_skipped] > 0 + j[:stderr_buf_to_flush] << "Skipped #{j[:log_throttle_bytes_skipped]} bytes of log" + end + + j[:log_throttle_timestamp] = now + j[:log_throttle_bytes_so_far] = 0 + j[:log_throttle_lines_so_far] = 0 + j[:log_throttle_bytes_skipped] = 0 + end + j[:buf].each do |stream, streambuf| # Read some data from the child stream buf = false @@ -365,7 +427,7 @@ class Dispatcher end if buf - # Add to a the buffer + # Add to the stream buffer streambuf << buf # Check for at least one complete line @@ -373,30 +435,25 @@ class Dispatcher lines = streambuf.lines("\n").to_a # check if the last line is partial or not - j[:buf][stream] = if streambuf[-1] == "\n" - # nope - '' + streambuf.replace(if streambuf[-1] == "\n" + '' # ends on a newline else - # Put the partial line back into the buffer - lines.pop - end + lines.pop # Put the partial line back into the buffer + end) # Now spool the lines to the log output buffer lines.each do |line| - $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) - $stderr.puts line - pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n" - if not j[:log_truncated] + # rate_limit returns true or false as to whether to actually log + # the line or not. It also modifies "line" in place to replace + # it with an error if a logging limit is tripped. + if rate_limit j, line + $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) + $stderr.puts line + pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n" j[:stderr_buf_to_flush] << pub_msg end - end - - # Now actually send the log output to the logs table - if not j[:log_truncated] - if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or - (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i)) - write_log j - end + # Send log output to the logs table + write_log j end end end @@ -445,6 +502,7 @@ class Dispatcher # Ensure every last drop of stdout and stderr is consumed read_pipes + j_done[:stderr_flushed_at] = Time.new(0) # reset flush timestamp to make sure log gets written write_log j_done # write any remaining logs j_done[:buf].each do |stream, streambuf| @@ -540,15 +598,6 @@ class Dispatcher protected - def too_many_bytes_logged_for_job(j) - return (j[:bytes_logged] + j[:stderr_buf_to_flush].size > - Rails.configuration.crunch_limit_log_event_bytes_per_job) - end - - def too_many_events_logged_for_job(j) - return (j[:events_logged] >= Rails.configuration.crunch_limit_log_events_per_job) - end - def did_recently(thing, min_interval) @did_recently ||= {} if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval @@ -561,68 +610,34 @@ class Dispatcher # send message to log table. we want these records to be transient def write_log running_job - return if running_job[:log_truncated] return if running_job[:stderr_buf_to_flush] == '' - begin - now = Time.now - throttle_period = Rails.configuration.crunch_limit_log_event_throttle_period + return if running_job[:events_logged] > Rails.configuration.crunch_limit_log_events_per_job - if (now - running_job[:log_throttle_timestamp]) > throttle_period - # It has been more than throttle_period seconds since the last checkpoint so reset the - # throttle - if running_job[:log_throttle_bytes_skipped] > 0 - running_job[:stderr_buf_to_flush] << "Skipped #{running_job[:log_throttle_bytes_skipped]} bytes of log" + # Send out to log event if buffer size exceeds the bytes per event or if + # it has been at least crunch_log_seconds_between_events seconds since + # the last flush. + if running_job[:stderr_buf_to_flush].size > Rails.configuration.crunch_log_bytes_per_event or + (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events + begin + # Just reached crunch_limit_log_events_per_job so replace log with notification. + if running_job[:events_logged] == Rails.configuration.crunch_limit_log_events_per_job + running_job[:stderr_buf_to_flush] = + "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated" end - - running_job[:log_throttle_timestamp] = now - running_job[:log_throttle_bytes_so_far] = 0 - running_job[:log_throttle_bytes_skipped] = 0 - end - - if running_job[:log_throttle_bytes_skipped] > 0 - # We've skipped some log in this time period already, so continue to - # skip the log - running_job[:log_throttle_bytes_skipped] += running_job[:stderr_buf_to_flush].size - return + log = Log.new(object_uuid: running_job[:job].uuid, + event_type: 'stderr', + owner_uuid: running_job[:job].owner_uuid, + properties: {"text" => running_job[:stderr_buf_to_flush]}) + log.save! + running_job[:events_logged] += 1 + rescue => exception + $stderr.puts "Failed to write logs" + $stderr.puts exception.backtrace end - - # Record bytes logged so far in this period - running_job[:log_throttle_bytes_so_far] += running_job[:stderr_buf_to_flush].size - - if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_limit_log_event_throttle_rate - # We've exceeded the throttle rate, so start skipping - running_job[:log_throttle_bytes_skipped] += running_job[:stderr_buf_to_flush].size - - # Replace the message with a message about skipping the log and log that instead - remaining_time = throttle_period - (now - running_job[:log_throttle_timestamp]) - running_job[:stderr_buf_to_flush] = "Exceeded log rate of #{Rails.configuration.crunch_limit_log_event_throttle_rate} per #{throttle_period} seconds, logging will be silenced for the next #{remaining_time} seconds\n" - end - - # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job - # or crunch_limit_log_events_per_job. - if (too_many_bytes_logged_for_job(running_job)) - running_job[:log_truncated] = true - running_job[:stderr_buf_to_flush] = - "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated" - elsif (too_many_events_logged_for_job(running_job)) - running_job[:log_truncated] = true - running_job[:stderr_buf_to_flush] = - "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated" - end - log = Log.new(object_uuid: running_job[:job].uuid, - event_type: 'stderr', - owner_uuid: running_job[:job].owner_uuid, - properties: {"text" => running_job[:stderr_buf_to_flush]}) - log.save! - running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size - running_job[:events_logged] += 1 - rescue - running_job[:buf][:stderr] = "Failed to write logs\n" + running_job[:buf][:stderr] + running_job[:stderr_buf_to_flush] = '' + running_job[:stderr_flushed_at] = Time.now end - running_job[:stderr_buf_to_flush] = '' - running_job[:stderr_flushed_at] = Time.now.to_i end - end # This is how crunch-job child procs know where the "refresh" trigger file is -- 2.30.2