3769: Add rate_limit with log throttling logic. Multiple configuration
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 1 Oct 2014 21:12:52 +0000 (17:12 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 1 Oct 2014 21:12:52 +0000 (17:12 -0400)
parameters to throttle excessive logging by bytes, lines, logs table events,
and total logged output.

services/api/config/application.default.yml
services/api/script/crunch-dispatch.rb

index 7185810b94b170d80b8967bff3175598afddc55c..648d1ade34929539883bb233283bff2cd72f9382 100644 (file)
@@ -80,16 +80,6 @@ common:
   # Maximum number of log events that may be generated by a single job.
   crunch_limit_log_events_per_job: 65536
 
   # Maximum number of log events that may be generated by a single job.
   crunch_limit_log_events_per_job: 65536
 
-  # Maximum number of total bytes that may be logged by a single job.
-  crunch_limit_log_event_bytes_per_job: 67108864
-
-  # The sample period for throttling logs, in seconds (see below)
-  crunch_limit_log_event_throttle_period: 60
-
-  # Maximum number of bytes that job can log over
-  # crunch_limit_log_event_throttle_period before being silenced
-  crunch_limit_log_event_throttle_rate: 65536
-
   # These two settings control how frequently log events are flushed
   # to the database.  If a job generates two or more events within
   # crunch_log_seconds_between_events, the log data is not flushed
   # These two settings control how frequently log events are flushed
   # to the database.  If a job generates two or more events within
   # crunch_log_seconds_between_events, the log data is not flushed
@@ -97,6 +87,21 @@ common:
   crunch_log_bytes_per_event: 4096
   crunch_log_seconds_between_events: 1
 
   crunch_log_bytes_per_event: 4096
   crunch_log_seconds_between_events: 1
 
+  # Maximum number of total bytes that may be logged by a single job.  Logs
+  # that are throttled (see below) are not counted against this total.
+  crunch_limit_log_bytes_per_job: 67108864
+
+  # The sample period for throttling logs, in seconds (see below)
+  crunch_log_throttle_period: 30
+
+  # Maximum number of bytes that job can log over
+  # crunch_limit_log_event_throttle_period before being silenced
+  crunch_log_throttle_bytes: 60000
+
+  # Maximum number of lines that job can log over
+  # crunch_limit_log_event_throttle_period before being silenced
+  crunch_log_throttle_lines: 1000
+
   # Path to /etc/dnsmasq.d, or false = do not update dnsmasq data.
   dnsmasq_conf_dir: false
 
   # Path to /etc/dnsmasq.d, or false = do not update dnsmasq data.
   dnsmasq_conf_dir: false
 
index ff9d5f6c02dce2f24158ed28c39d916c814be6a9..c9ce92c8495a1be528b3aea786937d6735b77b8a 100755 (executable)
@@ -334,17 +334,17 @@ class Dispatcher
         stderr: e,
         wait_thr: t,
         job: job,
         stderr: e,
         wait_thr: t,
         job: job,
-        buf: {:stdout => '', :stderr => ''},
+        buf: {:stderr => '', :stdout => ''},
         started: false,
         sent_int: 0,
         job_auth: job_auth,
         stderr_buf_to_flush: '',
         started: false,
         sent_int: 0,
         job_auth: job_auth,
         stderr_buf_to_flush: '',
-        stderr_flushed_at: 0,
+        stderr_flushed_at: Time.new(0),
         bytes_logged: 0,
         events_logged: 0,
         bytes_logged: 0,
         events_logged: 0,
-        log_truncated: false,
-        log_throttle_timestamp: 0,
+        log_throttle_timestamp: Time.new(0),
         log_throttle_bytes_so_far: 0,
         log_throttle_bytes_so_far: 0,
+        log_throttle_lines_so_far: 0,
         log_throttle_bytes_skipped: 0,
       }
       i.close
         log_throttle_bytes_skipped: 0,
       }
       i.close
@@ -352,10 +352,72 @@ class Dispatcher
     end
   end
 
     end
   end
 
+  # Test for hard cap on total output and for log throttling.  Returns whether
+  # the log line should go to output or not.  Modifies "line" in place to
+  # replace it with an error if a logging limit is tripped.
+  def rate_limit running_job, line
+    if running_job[:bytes_logged] > Rails.configuration.crunch_limit_log_bytes_per_job
+      # Don't log anything if the hard cap has already been exceeded
+      return false
+    end
+
+    now = Time.now
+    throttle_period = Rails.configuration.crunch_log_throttle_period
+
+    #puts "Handle line at #{now - running_job[:log_throttle_timestamp]}, buf bytes #{line.size}, so far #{running_job[:log_throttle_bytes_so_far]}, throttled #{running_job[:log_throttle_bytes_skipped]}"
+
+    if running_job[:log_throttle_bytes_skipped] > 0
+      # We've skipped some log in the current time period already, so continue to
+      # skip the log
+      running_job[:log_throttle_bytes_skipped] += line.size
+      return false
+    end
+
+    # Count lines and bytes logged in this period, and total bytes logged for the job
+    running_job[:log_throttle_lines_so_far] += 1
+    running_job[:log_throttle_bytes_so_far] += line.size
+    running_job[:bytes_logged] += line.size
+
+    if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_log_throttle_bytes or
+        running_job[:log_throttle_lines_so_far] > Rails.configuration.crunch_log_throttle_lines
+      # We've exceeded the per-period throttle, so start skipping
+      running_job[:log_throttle_bytes_skipped] += line.size
+
+      # Replace log line with a message about skipping the log
+      remaining_time = throttle_period - (now - running_job[:log_throttle_timestamp])
+      if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_log_throttle_bytes
+        line.replace "Exceeded crunch_log_throttle_bytes rate of #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{throttle_period} seconds, logging will be silenced for the next #{remaining_time.round} seconds\n"
+      else
+        line.replace "Exceeded crunch_log_throttle_lines rate of #{Rails.configuration.crunch_log_throttle_lines} lines per #{throttle_period} seconds, logging will be silenced for the next #{remaining_time.round} seconds\n"
+      end
+    end
+
+    if running_job[:bytes_logged] > Rails.configuration.crunch_limit_log_bytes_per_job
+      # Replace log line with a message about truncating the log
+      line.replace "Exceed hard job log cap crunch_limit_log_bytes_per_job of #{Rails.configuration.crunch_limit_log_bytes_per_job}. Subsequent logs will be truncated."
+    end
+
+    true
+  end
+
   def read_pipes
     @running.each do |job_uuid, j|
       job = j[:job]
 
   def read_pipes
     @running.each do |job_uuid, j|
       job = j[:job]
 
+      now = Time.now
+      if (now - j[:log_throttle_timestamp]) > Rails.configuration.crunch_log_throttle_period
+        # It has been more than throttle_period seconds since the last checkpoint so reset the
+        # throttle
+        if j[:log_throttle_bytes_skipped] > 0
+          j[:stderr_buf_to_flush] << "Skipped #{j[:log_throttle_bytes_skipped]} bytes of log"
+        end
+
+        j[:log_throttle_timestamp] = now
+        j[:log_throttle_bytes_so_far] = 0
+        j[:log_throttle_lines_so_far] = 0
+        j[:log_throttle_bytes_skipped] = 0
+      end
+
       j[:buf].each do |stream, streambuf|
         # Read some data from the child stream
         buf = false
       j[:buf].each do |stream, streambuf|
         # Read some data from the child stream
         buf = false
@@ -365,7 +427,7 @@ class Dispatcher
         end
 
         if buf
         end
 
         if buf
-          # Add to a the buffer
+          # Add to the stream buffer
           streambuf << buf
 
           # Check for at least one complete line
           streambuf << buf
 
           # Check for at least one complete line
@@ -373,30 +435,25 @@ class Dispatcher
             lines = streambuf.lines("\n").to_a
 
             # check if the last line is partial or not
             lines = streambuf.lines("\n").to_a
 
             # check if the last line is partial or not
-            j[:buf][stream] = if streambuf[-1] == "\n"
-                                # nope
-                                ''
+            streambuf.replace(if streambuf[-1] == "\n"
+                                ''        # ends on a newline
                               else
                               else
-                                # Put the partial line back into the buffer
-                                lines.pop
-                              end
+                                lines.pop # Put the partial line back into the buffer
+                              end)
 
             # Now spool the lines to the log output buffer
             lines.each do |line|
 
             # Now spool the lines to the log output buffer
             lines.each do |line|
-              $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
-              $stderr.puts line
-              pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
-              if not j[:log_truncated]
+              # rate_limit returns true or false as to whether to actually log
+              # the line or not.  It also modifies "line" in place to replace
+              # it with an error if a logging limit is tripped.
+              if rate_limit j, line
+                $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
+                $stderr.puts line
+                pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
                 j[:stderr_buf_to_flush] << pub_msg
               end
                 j[:stderr_buf_to_flush] << pub_msg
               end
-            end
-
-            # Now actually send the log output to the logs table
-            if not j[:log_truncated]
-              if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
-                  (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
-                write_log j
-              end
+              # Send log output to the logs table
+              write_log j
             end
           end
         end
             end
           end
         end
@@ -445,6 +502,7 @@ class Dispatcher
 
     # Ensure every last drop of stdout and stderr is consumed
     read_pipes
 
     # Ensure every last drop of stdout and stderr is consumed
     read_pipes
+    j_done[:stderr_flushed_at] = Time.new(0) # reset flush timestamp to make sure log gets written
     write_log j_done # write any remaining logs
 
     j_done[:buf].each do |stream, streambuf|
     write_log j_done # write any remaining logs
 
     j_done[:buf].each do |stream, streambuf|
@@ -540,15 +598,6 @@ class Dispatcher
 
   protected
 
 
   protected
 
-  def too_many_bytes_logged_for_job(j)
-    return (j[:bytes_logged] + j[:stderr_buf_to_flush].size >
-            Rails.configuration.crunch_limit_log_event_bytes_per_job)
-  end
-
-  def too_many_events_logged_for_job(j)
-    return (j[:events_logged] >= Rails.configuration.crunch_limit_log_events_per_job)
-  end
-
   def did_recently(thing, min_interval)
     @did_recently ||= {}
     if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
   def did_recently(thing, min_interval)
     @did_recently ||= {}
     if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
@@ -561,68 +610,34 @@ class Dispatcher
 
   # send message to log table. we want these records to be transient
   def write_log running_job
 
   # send message to log table. we want these records to be transient
   def write_log running_job
-    return if running_job[:log_truncated]
     return if running_job[:stderr_buf_to_flush] == ''
     return if running_job[:stderr_buf_to_flush] == ''
-    begin
-      now = Time.now
-      throttle_period = Rails.configuration.crunch_limit_log_event_throttle_period
+    return if running_job[:events_logged] > Rails.configuration.crunch_limit_log_events_per_job
 
 
-      if (now - running_job[:log_throttle_timestamp]) > throttle_period
-        # It has been more than throttle_period seconds since the last checkpoint so reset the
-        # throttle
-        if running_job[:log_throttle_bytes_skipped] > 0
-          running_job[:stderr_buf_to_flush] << "Skipped #{running_job[:log_throttle_bytes_skipped]} bytes of log"
+    # Send out to log event if buffer size exceeds the bytes per event or if
+    # it has been at least crunch_log_seconds_between_events seconds since
+    # the last flush.
+    if running_job[:stderr_buf_to_flush].size > Rails.configuration.crunch_log_bytes_per_event or
+        (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events
+      begin
+        # Just reached crunch_limit_log_events_per_job so replace log with notification.
+        if running_job[:events_logged] == Rails.configuration.crunch_limit_log_events_per_job
+          running_job[:stderr_buf_to_flush] =
+            "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
         end
         end
-
-        running_job[:log_throttle_timestamp] = now
-        running_job[:log_throttle_bytes_so_far] = 0
-        running_job[:log_throttle_bytes_skipped] = 0
-      end
-
-      if running_job[:log_throttle_bytes_skipped] > 0
-        # We've skipped some log in this time period already, so continue to
-        # skip the log
-        running_job[:log_throttle_bytes_skipped] += running_job[:stderr_buf_to_flush].size
-        return
+        log = Log.new(object_uuid: running_job[:job].uuid,
+                      event_type: 'stderr',
+                      owner_uuid: running_job[:job].owner_uuid,
+                      properties: {"text" => running_job[:stderr_buf_to_flush]})
+        log.save!
+        running_job[:events_logged] += 1
+      rescue => exception
+        $stderr.puts "Failed to write logs"
+        $stderr.puts exception.backtrace
       end
       end
-
-      # Record bytes logged so far in this period
-      running_job[:log_throttle_bytes_so_far] += running_job[:stderr_buf_to_flush].size
-
-      if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_limit_log_event_throttle_rate
-        # We've exceeded the throttle rate, so start skipping
-        running_job[:log_throttle_bytes_skipped] += running_job[:stderr_buf_to_flush].size
-
-        # Replace the message with a message about skipping the log and log that instead
-        remaining_time = throttle_period - (now - running_job[:log_throttle_timestamp])
-        running_job[:stderr_buf_to_flush] = "Exceeded log rate of #{Rails.configuration.crunch_limit_log_event_throttle_rate} per #{throttle_period} seconds, logging will be silenced for the next #{remaining_time} seconds\n"
-      end
-
-      # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
-      # or crunch_limit_log_events_per_job.
-      if (too_many_bytes_logged_for_job(running_job))
-        running_job[:log_truncated] = true
-        running_job[:stderr_buf_to_flush] =
-          "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
-      elsif (too_many_events_logged_for_job(running_job))
-        running_job[:log_truncated] = true
-        running_job[:stderr_buf_to_flush] =
-          "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
-      end
-      log = Log.new(object_uuid: running_job[:job].uuid,
-                    event_type: 'stderr',
-                    owner_uuid: running_job[:job].owner_uuid,
-                    properties: {"text" => running_job[:stderr_buf_to_flush]})
-      log.save!
-      running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
-      running_job[:events_logged] += 1
-    rescue
-      running_job[:buf][:stderr] = "Failed to write logs\n" + running_job[:buf][:stderr]
+      running_job[:stderr_buf_to_flush] = ''
+      running_job[:stderr_flushed_at] = Time.now
     end
     end
-    running_job[:stderr_buf_to_flush] = ''
-    running_job[:stderr_flushed_at] = Time.now.to_i
   end
   end
-
 end
 
 # This is how crunch-job child procs know where the "refresh" trigger file is
 end
 
 # This is how crunch-job child procs know where the "refresh" trigger file is