From: Peter Amstutz Date: Wed, 1 Oct 2014 19:23:23 +0000 (-0400) Subject: Merge branch '3052-crunch-log-stdout' into 3769-throttle-logs X-Git-Tag: 1.1.0~2137^2~4 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/e910d13fc5fc63a86f20be3b758c08db3d429bc6?ds=sidebyside;hp=-c Merge branch '3052-crunch-log-stdout' into 3769-throttle-logs --- e910d13fc5fc63a86f20be3b758c08db3d429bc6 diff --combined services/api/script/crunch-dispatch.rb index a5afb227a4,8d94970836..ff9d5f6c02 --- a/services/api/script/crunch-dispatch.rb +++ b/services/api/script/crunch-dispatch.rb @@@ -334,7 -334,7 +334,7 @@@ class Dispatche stderr: e, wait_thr: t, job: job, - stderr_buf: '', + buf: {:stdout => '', :stderr => ''}, started: false, sent_int: 0, job_auth: job_auth, @@@ -342,10 -342,7 +342,10 @@@ stderr_flushed_at: 0, bytes_logged: 0, events_logged: 0, - log_truncated: false + log_truncated: false, + log_throttle_timestamp: 0, + log_throttle_bytes_so_far: 0, + log_throttle_bytes_skipped: 0, } i.close update_node_status @@@ -356,41 -353,47 +356,47 @@@ @running.each do |job_uuid, j| job = j[:job] - # Throw away child stdout - begin - j[:stdout].read_nonblock(2**20) - rescue Errno::EAGAIN, EOFError - end - - # Read whatever is available from child stderr - stderr_buf = false - begin - stderr_buf = j[:stderr].read_nonblock(2**20) - rescue Errno::EAGAIN, EOFError - end + j[:buf].each do |stream, streambuf| + # Read some data from the child stream + buf = false + begin + buf = j[stream].read_nonblock(2**16) + rescue Errno::EAGAIN, EOFError + end - if stderr_buf - j[:stderr_buf] << stderr_buf - if j[:stderr_buf].index "\n" - lines = j[:stderr_buf].lines("\n").to_a - if j[:stderr_buf][-1] == "\n" - j[:stderr_buf] = '' - else - j[:stderr_buf] = lines.pop - end - lines.each do |line| - $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) - $stderr.puts line - pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n" - if not j[:log_truncated] - j[:stderr_buf_to_flush] << pub_msg + if buf + # Add to a the buffer + streambuf << buf + + # Check for at least one complete line + if streambuf.index "\n" + lines = streambuf.lines("\n").to_a + + # check if the last line is partial or not + j[:buf][stream] = if streambuf[-1] == "\n" + # nope + '' + else + # Put the partial line back into the buffer + lines.pop + end + + # Now spool the lines to the log output buffer + lines.each do |line| + $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) + $stderr.puts line + pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n" + if not j[:log_truncated] + j[:stderr_buf_to_flush] << pub_msg + end end - end - if not j[:log_truncated] - if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or - (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i)) - write_log j + # Now actually send the log output to the logs table + if not j[:log_truncated] + if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or + (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i)) + write_log j + end end end end @@@ -441,8 -444,10 +447,10 @@@ read_pipes write_log j_done # write any remaining logs - if j_done[:stderr_buf] and j_done[:stderr_buf] != '' - $stderr.puts j_done[:stderr_buf] + "\n" + j_done[:buf].each do |stream, streambuf| + if streambuf != '' + $stderr.puts streambuf + "\n" + end end # Wait the thread (returns a Process::Status) @@@ -556,40 -561,6 +564,40 @@@ return if running_job[:log_truncated] return if running_job[:stderr_buf_to_flush] == '' begin + now = Time.now + throttle_period = Rails.configuration.crunch_limit_log_event_throttle_period + + if (now - running_job[:log_throttle_timestamp]) > throttle_period + # It has been more than throttle_period seconds since the last checkpoint so reset the + # throttle + if running_job[:log_throttle_bytes_skipped] > 0 + running_job[:stderr_buf_to_flush] << "Skipped #{running_job[:log_throttle_bytes_skipped]} bytes of log" + end + + running_job[:log_throttle_timestamp] = now + running_job[:log_throttle_bytes_so_far] = 0 + running_job[:log_throttle_bytes_skipped] = 0 + end + + if running_job[:log_throttle_bytes_skipped] > 0 + # We've skipped some log in this time period already, so continue to + # skip the log + running_job[:log_throttle_bytes_skipped] += running_job[:stderr_buf_to_flush].size + return + end + + # Record bytes logged so far in this period + running_job[:log_throttle_bytes_so_far] += running_job[:stderr_buf_to_flush].size + + if running_job[:log_throttle_bytes_so_far] > Rails.configuration.crunch_limit_log_event_throttle_rate + # We've exceeded the throttle rate, so start skipping + running_job[:log_throttle_bytes_skipped] += running_job[:stderr_buf_to_flush].size + + # Replace the message with a message about skipping the log and log that instead + remaining_time = throttle_period - (now - running_job[:log_throttle_timestamp]) + running_job[:stderr_buf_to_flush] = "Exceeded log rate of #{Rails.configuration.crunch_limit_log_event_throttle_rate} per #{throttle_period} seconds, logging will be silenced for the next #{remaining_time} seconds\n" + end + # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job # or crunch_limit_log_events_per_job. if (too_many_bytes_logged_for_job(running_job)) @@@ -609,7 -580,7 +617,7 @@@ running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size running_job[:events_logged] += 1 rescue - running_job[:stderr_buf] = "Failed to write logs\n" + running_job[:stderr_buf] + running_job[:buf][:stderr] = "Failed to write logs\n" + running_job[:buf][:stderr] end running_job[:stderr_buf_to_flush] = '' running_job[:stderr_flushed_at] = Time.now.to_i