Merge branch '3052-crunch-log-stdout' into 3769-throttle-logs
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 1 Oct 2014 19:23:23 +0000 (15:23 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 1 Oct 2014 19:23:23 +0000 (15:23 -0400)
1  2 
services/api/script/crunch-dispatch.rb

index a5afb227a42f1f49ef9c851d150a34d3d15262a7,8d94970836bf714c1ee338ae1b3e6e83afded1f4..ff9d5f6c02dce2f24158ed28c39d916c814be6a9
@@@ -334,7 -334,7 +334,7 @@@ class Dispatche
          stderr: e,
          wait_thr: t,
          job: job,
-         stderr_buf: '',
+         buf: {:stdout => '', :stderr => ''},
          started: false,
          sent_int: 0,
          job_auth: job_auth,
          stderr_flushed_at: 0,
          bytes_logged: 0,
          events_logged: 0,
 -        log_truncated: false
 +        log_truncated: false,
 +        log_throttle_timestamp: 0,
 +        log_throttle_bytes_so_far: 0,
 +        log_throttle_bytes_skipped: 0,
        }
        i.close
        update_node_status
      @running.each do |job_uuid, j|
        job = j[:job]
  
-       # Throw away child stdout
-       begin
-         j[:stdout].read_nonblock(2**20)
-       rescue Errno::EAGAIN, EOFError
-       end
-       # Read whatever is available from child stderr
-       stderr_buf = false
-       begin
-         stderr_buf = j[:stderr].read_nonblock(2**20)
-       rescue Errno::EAGAIN, EOFError
-       end
+       j[:buf].each do |stream, streambuf|
+         # Read some data from the child stream
+         buf = false
+         begin
+           buf = j[stream].read_nonblock(2**16)
+         rescue Errno::EAGAIN, EOFError
+         end
  
-       if stderr_buf
-         j[:stderr_buf] << stderr_buf
-         if j[:stderr_buf].index "\n"
-           lines = j[:stderr_buf].lines("\n").to_a
-           if j[:stderr_buf][-1] == "\n"
-             j[:stderr_buf] = ''
-           else
-             j[:stderr_buf] = lines.pop
-           end
-           lines.each do |line|
-             $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
-             $stderr.puts line
-             pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
-             if not j[:log_truncated]
-               j[:stderr_buf_to_flush] << pub_msg
+         if buf
+           # Add to a the buffer
+           streambuf << buf
+           # Check for at least one complete line
+           if streambuf.index "\n"
+             lines = streambuf.lines("\n").to_a
+             # check if the last line is partial or not
+             j[:buf][stream] = if streambuf[-1] == "\n"
+                                 # nope
+                                 ''
+                               else
+                                 # Put the partial line back into the buffer
+                                 lines.pop
+                               end
+             # Now spool the lines to the log output buffer
+             lines.each do |line|
+               $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
+               $stderr.puts line
+               pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
+               if not j[:log_truncated]
+                 j[:stderr_buf_to_flush] << pub_msg
+               end
              end
-           end
  
-           if not j[:log_truncated]
-             if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
-                 (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
-               write_log j
+             # Now actually send the log output to the logs table
+             if not j[:log_truncated]
+               if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
+                   (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
+                 write_log j
+               end
              end
            end
          end
      read_pipes
      write_log j_done # write any remaining logs
  
-     if j_done[:stderr_buf] and j_done[:stderr_buf] != ''
-       $stderr.puts j_done[:stderr_buf] + "\n"
+     j_done[:buf].each do |stream, streambuf|
+       if streambuf != ''
+         $stderr.puts streambuf + "\n"
+       end
      end
  
      # Wait the thread (returns a Process::Status)
      return if running_job[:log_truncated]
      return if running_job[:stderr_buf_to_flush] == ''
      begin
 +      now = Time.now
 +      throttle_period = Rails.configuration.crunch_limit_log_event_throttle_period
 +
 +      if (now - running_job[:log_throttle_timestamp]) > throttle_period
 +        # It has been more than throttle_period seconds since the last checkpoint so reset the
 +        # throttle
 +        if running_job[:log_throttle_bytes_skipped] > 0
 +          running_job[:stderr_buf_to_flush] << "Skipped #{running_job[:log_throttle_bytes_skipped]} bytes of log"
 +        end
 +
 +        running_job[:log_throttle_timestamp] = now
 +        running_job[:log_throttle_bytes_so_far] = 0
 +        running_job[:log_throttle_bytes_skipped] = 0
 +      end
 +
 +      if running_job[:log_throttle_bytes_skipped] > 0
 +        # We've skipped some log in this time period already, so continue to
 +        # skip the log
 +        running_job[:log_throttle_bytes_skipped] += running_job[:stderr_buf_to_flush].size
 +        return
 +      end
 +
 +      # Record bytes logged so far in this period
 +      running_job[:log_throttle_bytes_so_far] += running_job[:stderr_buf_to_flush].size
 +
 +      if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_limit_log_event_throttle_rate
 +        # We've exceeded the throttle rate, so start skipping
 +        running_job[:log_throttle_bytes_skipped] += running_job[:stderr_buf_to_flush].size
 +
 +        # Replace the message with a message about skipping the log and log that instead
 +        remaining_time = throttle_period - (now - running_job[:log_throttle_timestamp])
 +        running_job[:stderr_buf_to_flush] = "Exceeded log rate of #{Rails.configuration.crunch_limit_log_event_throttle_rate} per #{throttle_period} seconds, logging will be silenced for the next #{remaining_time} seconds\n"
 +      end
 +
        # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
        # or crunch_limit_log_events_per_job.
        if (too_many_bytes_logged_for_job(running_job))
        running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
        running_job[:events_logged] += 1
      rescue
-       running_job[:stderr_buf] = "Failed to write logs\n" + running_job[:stderr_buf]
+       running_job[:buf][:stderr] = "Failed to write logs\n" + running_job[:buf][:stderr]
      end
      running_job[:stderr_buf_to_flush] = ''
      running_job[:stderr_flushed_at] = Time.now.to_i