stderr: e,
wait_thr: t,
job: job,
- stderr_buf: '',
+ buf: {:stdout => '', :stderr => ''},
started: false,
sent_int: 0,
job_auth: job_auth,
stderr_flushed_at: 0,
bytes_logged: 0,
events_logged: 0,
- log_truncated: false
+ log_truncated: false,
+ log_throttle_timestamp: 0,
+ log_throttle_bytes_so_far: 0,
+ log_throttle_bytes_skipped: 0,
}
i.close
update_node_status
@running.each do |job_uuid, j|
job = j[:job]
- # Throw away child stdout
- begin
- j[:stdout].read_nonblock(2**20)
- rescue Errno::EAGAIN, EOFError
- end
-
- # Read whatever is available from child stderr
- stderr_buf = false
- begin
- stderr_buf = j[:stderr].read_nonblock(2**20)
- rescue Errno::EAGAIN, EOFError
- end
+ j[:buf].each do |stream, streambuf|
+ # Read some data from the child stream
+ buf = false
+ begin
+ buf = j[stream].read_nonblock(2**16)
+ rescue Errno::EAGAIN, EOFError
+ end
- if stderr_buf
- j[:stderr_buf] << stderr_buf
- if j[:stderr_buf].index "\n"
- lines = j[:stderr_buf].lines("\n").to_a
- if j[:stderr_buf][-1] == "\n"
- j[:stderr_buf] = ''
- else
- j[:stderr_buf] = lines.pop
- end
- lines.each do |line|
- $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
- $stderr.puts line
- pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
- if not j[:log_truncated]
- j[:stderr_buf_to_flush] << pub_msg
+ if buf
+ # Add to a the buffer
+ streambuf << buf
+
+ # Check for at least one complete line
+ if streambuf.index "\n"
+ lines = streambuf.lines("\n").to_a
+
+ # check if the last line is partial or not
+ j[:buf][stream] = if streambuf[-1] == "\n"
+ # nope
+ ''
+ else
+ # Put the partial line back into the buffer
+ lines.pop
+ end
+
+ # Now spool the lines to the log output buffer
+ lines.each do |line|
+ $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
+ $stderr.puts line
+ pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
+ if not j[:log_truncated]
+ j[:stderr_buf_to_flush] << pub_msg
+ end
end
- end
- if not j[:log_truncated]
- if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
- (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
- write_log j
+ # Now actually send the log output to the logs table
+ if not j[:log_truncated]
+ if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
+ (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
+ write_log j
+ end
end
end
end
read_pipes
write_log j_done # write any remaining logs
- if j_done[:stderr_buf] and j_done[:stderr_buf] != ''
- $stderr.puts j_done[:stderr_buf] + "\n"
+ j_done[:buf].each do |stream, streambuf|
+ if streambuf != ''
+ $stderr.puts streambuf + "\n"
+ end
end
# Wait the thread (returns a Process::Status)
return if running_job[:log_truncated]
return if running_job[:stderr_buf_to_flush] == ''
begin
+ now = Time.now
+ throttle_period = Rails.configuration.crunch_limit_log_event_throttle_period
+
+ if (now - running_job[:log_throttle_timestamp]) > throttle_period
+ # It has been more than throttle_period seconds since the last checkpoint so reset the
+ # throttle
+ if running_job[:log_throttle_bytes_skipped] > 0
+ running_job[:stderr_buf_to_flush] << "Skipped #{running_job[:log_throttle_bytes_skipped]} bytes of log"
+ end
+
+ running_job[:log_throttle_timestamp] = now
+ running_job[:log_throttle_bytes_so_far] = 0
+ running_job[:log_throttle_bytes_skipped] = 0
+ end
+
+ if running_job[:log_throttle_bytes_skipped] > 0
+ # We've skipped some log in this time period already, so continue to
+ # skip the log
+ running_job[:log_throttle_bytes_skipped] += running_job[:stderr_buf_to_flush].size
+ return
+ end
+
+ # Record bytes logged so far in this period
+ running_job[:log_throttle_bytes_so_far] += running_job[:stderr_buf_to_flush].size
+
+ if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_limit_log_event_throttle_rate
+ # We've exceeded the throttle rate, so start skipping
+ running_job[:log_throttle_bytes_skipped] += running_job[:stderr_buf_to_flush].size
+
+ # Replace the message with a message about skipping the log and log that instead
+ remaining_time = throttle_period - (now - running_job[:log_throttle_timestamp])
+ running_job[:stderr_buf_to_flush] = "Exceeded log rate of #{Rails.configuration.crunch_limit_log_event_throttle_rate} per #{throttle_period} seconds, logging will be silenced for the next #{remaining_time} seconds\n"
+ end
+
# Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
# or crunch_limit_log_events_per_job.
if (too_many_bytes_logged_for_job(running_job))
running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
running_job[:events_logged] += 1
rescue
- running_job[:stderr_buf] = "Failed to write logs\n" + running_job[:stderr_buf]
+ running_job[:buf][:stderr] = "Failed to write logs\n" + running_job[:buf][:stderr]
end
running_job[:stderr_buf_to_flush] = ''
running_job[:stderr_flushed_at] = Time.now.to_i