$stderr.puts "dispatch: git fetch-pack failed"
sleep 1
next
- end
+ end
end
# check if the commit needs to be tagged with this job uuid
stderr: e,
wait_thr: t,
job: job,
- stderr_buf: '',
+ buf: {stderr: '', stdout: ''},
started: false,
sent_int: 0,
job_auth: job_auth,
stderr_buf_to_flush: '',
- stderr_flushed_at: 0,
+ stderr_flushed_at: Time.new(0),
bytes_logged: 0,
events_logged: 0,
- log_truncated: false
+ log_throttle_is_open: true,
+ log_throttle_reset_time: Time.now + Rails.configuration.crunch_log_throttle_period,
+ log_throttle_bytes_so_far: 0,
+ log_throttle_lines_so_far: 0,
+ log_throttle_bytes_skipped: 0,
}
i.close
update_node_status
end
end
+ # Test for hard cap on total output and for log throttling. Returns whether
+ # the log line should go to output or not. Modifies "line" in place to
+ # replace it with an error if a logging limit is tripped.
+ def rate_limit running_job, line
+ message = false
+ linesize = line.size
+ if running_job[:log_throttle_is_open]
+ running_job[:log_throttle_lines_so_far] += 1
+ running_job[:log_throttle_bytes_so_far] += linesize
+ running_job[:bytes_logged] += linesize
+
+ if (running_job[:bytes_logged] >
+ Rails.configuration.crunch_limit_log_bytes_per_job)
+ message = "Exceeded log limit #{Rails.configuration.crunch_limit_log_bytes_per_job} bytes (crunch_limit_log_bytes_per_job). Log will be truncated."
+ running_job[:log_throttle_reset_time] = Time.now + 100.years
+ running_job[:log_throttle_is_open] = false
+
+ elsif (running_job[:log_throttle_bytes_so_far] >
+ Rails.configuration.crunch_log_throttle_bytes)
+ remaining_time = running_job[:log_throttle_reset_time] - Time.now
+ message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds.\n"
+ running_job[:log_throttle_is_open] = false
+
+ elsif (running_job[:log_throttle_lines_so_far] >
+ Rails.configuration.crunch_log_throttle_lines)
+ remaining_time = running_job[:log_throttle_reset_time] - Time.now
+ message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds.\n"
+ running_job[:log_throttle_is_open] = false
+ end
+ end
+
+ if not running_job[:log_throttle_is_open]
+ # Don't log anything if any limit has been exceeded. Just count lossage.
+ running_job[:log_throttle_bytes_skipped] += linesize
+ end
+
+ if message
+ # Yes, write to logs, but use our "rate exceeded" message
+ # instead of the log message that exceeded the limit.
+ line.replace message
+ true
+ else
+ running_job[:log_throttle_is_open]
+ end
+ end
+
def read_pipes
@running.each do |job_uuid, j|
job = j[:job]
- # Throw away child stdout
- begin
- j[:stdout].read_nonblock(2**20)
- rescue Errno::EAGAIN, EOFError
- end
+ now = Time.now
+ if now > j[:log_throttle_reset_time]
+ # It has been more than throttle_period seconds since the last
+ # checkpoint so reset the throttle
+ if j[:log_throttle_bytes_skipped] > 0
+ message = "#{job_uuid} ! Skipped #{j[:log_throttle_bytes_skipped]} bytes of log"
+ $stderr.puts message
+ j[:stderr_buf_to_flush] << "#{Time.now.ctime.to_s} #{message}\n"
+ end
- # Read whatever is available from child stderr
- stderr_buf = false
- begin
- stderr_buf = j[:stderr].read_nonblock(2**20)
- rescue Errno::EAGAIN, EOFError
+ j[:log_throttle_reset_time] = now + Rails.configuration.crunch_log_throttle_period
+ j[:log_throttle_bytes_so_far] = 0
+ j[:log_throttle_lines_so_far] = 0
+ j[:log_throttle_bytes_skipped] = 0
+ j[:log_throttle_is_open] = true
end
- if stderr_buf
- j[:stderr_buf] << stderr_buf
- if j[:stderr_buf].index "\n"
- lines = j[:stderr_buf].lines("\n").to_a
- if j[:stderr_buf][-1] == "\n"
- j[:stderr_buf] = ''
- else
- j[:stderr_buf] = lines.pop
+ j[:buf].each do |stream, streambuf|
+ # Read some data from the child stream
+ buf = ''
+ begin
+ # It's important to use a big enough buffer here. When we're
+ # being flooded with logs, we must read and discard many
+ # bytes at once. Otherwise, we can easily peg a CPU with
+ # time-checking and other loop overhead. (Quick tests show a
+ # 1MiB buffer working 2.5x as fast as a 64 KiB buffer.)
+ #
+ # So don't reduce this buffer size!
+ buf = j[stream].read_nonblock(2**20)
+ rescue Errno::EAGAIN, EOFError
+ end
+
+ # Short circuit the counting code if we're just going to throw
+ # away the data anyway.
+ if not j[:log_throttle_is_open]
+ j[:log_throttle_bytes_skipped] += streambuf.size + buf.size
+ streambuf.replace ''
+ next
+ elsif buf == ''
+ next
+ end
+
+ # Append to incomplete line from previous read, if any
+ streambuf << buf
+
+ bufend = ''
+ streambuf.each_line do |line|
+ if not line.end_with? $/
+ if line.size > Rails.configuration.crunch_log_throttle_bytes
+ # Without a limit here, we'll use 2x an arbitrary amount
+ # of memory, and waste a lot of time copying strings
+ # around, all without providing any feedback to anyone
+ # about what's going on _or_ hitting any of our throttle
+ # limits.
+ #
+ # Here we leave "line" alone, knowing it will never be
+ # sent anywhere: rate_limit() will reach
+ # crunch_log_throttle_bytes immediately. However, we'll
+ # leave [...] in bufend: if the trailing end of the long
+ # line does end up getting sent anywhere, it will have
+ # some indication that it incomplete.
+ bufend = "[...]"
+ else
+ # If line length is sane, we'll wait for the rest of the
+ # line to appear in the next read_pipes() call.
+ bufend = line
+ break
+ end
end
- lines.each do |line|
+ # rate_limit returns true or false as to whether to actually log
+ # the line or not. It also modifies "line" in place to replace
+ # it with an error if a logging limit is tripped.
+ if rate_limit j, line
$stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
$stderr.puts line
- pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
- if not j[:log_truncated]
- j[:stderr_buf_to_flush] << pub_msg
- end
- end
-
- if not j[:log_truncated]
- if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
- (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
- write_log j
- end
+ pub_msg = "#{Time.now.ctime.to_s} #{line.strip}\n"
+ j[:stderr_buf_to_flush] << pub_msg
end
end
+
+ # Leave the trailing incomplete line (if any) in streambuf for
+ # next time.
+ streambuf.replace bufend
end
+ # Flush buffered logs to the logs table, if appropriate. We have
+ # to do this even if we didn't collect any new logs this time:
+ # otherwise, buffered data older than seconds_between_events
+ # won't get flushed until new data arrives.
+ write_log j
end
end
$stderr.puts "dispatch: child #{pid_done} exit"
$stderr.puts "dispatch: job #{job_done.uuid} end"
- # Ensure every last drop of stdout and stderr is consumed
+ # Ensure every last drop of stdout and stderr is consumed.
read_pipes
- write_log j_done # write any remaining logs
-
- if j_done[:stderr_buf] and j_done[:stderr_buf] != ''
- $stderr.puts j_done[:stderr_buf] + "\n"
+ # Reset flush timestamp to make sure log gets written.
+ j_done[:stderr_flushed_at] = Time.new(0)
+ # Write any remaining logs.
+ write_log j_done
+
+ j_done[:buf].each do |stream, streambuf|
+ if streambuf != ''
+ $stderr.puts streambuf + "\n"
+ end
end
# Wait the thread (returns a Process::Status)
protected
- def too_many_bytes_logged_for_job(j)
- return (j[:bytes_logged] + j[:stderr_buf_to_flush].size >
- Rails.configuration.crunch_limit_log_event_bytes_per_job)
- end
-
- def too_many_events_logged_for_job(j)
- return (j[:events_logged] >= Rails.configuration.crunch_limit_log_events_per_job)
- end
-
def did_recently(thing, min_interval)
@did_recently ||= {}
if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
# send message to log table. we want these records to be transient
def write_log running_job
- return if running_job[:log_truncated]
return if running_job[:stderr_buf_to_flush] == ''
- begin
- # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
- # or crunch_limit_log_events_per_job.
- if (too_many_bytes_logged_for_job(running_job))
- running_job[:log_truncated] = true
- running_job[:stderr_buf_to_flush] =
- "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
- elsif (too_many_events_logged_for_job(running_job))
- running_job[:log_truncated] = true
- running_job[:stderr_buf_to_flush] =
- "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
+
+ # Send out to log event if buffer size exceeds the bytes per event or if
+ # it has been at least crunch_log_seconds_between_events seconds since
+ # the last flush.
+ if running_job[:stderr_buf_to_flush].size > Rails.configuration.crunch_log_bytes_per_event or
+ (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events
+ begin
+ log = Log.new(object_uuid: running_job[:job].uuid,
+ event_type: 'stderr',
+ owner_uuid: running_job[:job].owner_uuid,
+ properties: {"text" => running_job[:stderr_buf_to_flush]})
+ log.save!
+ running_job[:events_logged] += 1
+ rescue => exception
+ $stderr.puts "Failed to write logs"
+ $stderr.puts exception.backtrace
end
- log = Log.new(object_uuid: running_job[:job].uuid,
- event_type: 'stderr',
- owner_uuid: running_job[:job].owner_uuid,
- properties: {"text" => running_job[:stderr_buf_to_flush]})
- log.save!
- running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
- running_job[:events_logged] += 1
- rescue
- running_job[:stderr_buf] = "Failed to write logs\n" + running_job[:stderr_buf]
+ running_job[:stderr_buf_to_flush] = ''
+ running_job[:stderr_flushed_at] = Time.now
end
- running_job[:stderr_buf_to_flush] = ''
- running_job[:stderr_flushed_at] = Time.now.to_i
end
-
end
# This is how crunch-job child procs know where the "refresh" trigger file is