require File.dirname(__FILE__) + '/../config/environment'
require 'open3'
-LOG_BUFFER_SIZE = 4096
-
class Dispatcher
include ApplicationHelper
sent_int: 0,
job_auth: job_auth,
stderr_buf_to_flush: '',
- stderr_flushed_at: 0
+ stderr_flushed_at: 0,
+ bytes_logged: 0,
+ events_logged: 0,
+ log_truncated: false
}
i.close
update_node_status
j[:stderr_buf_to_flush] << pub_msg
end
- if (LOG_BUFFER_SIZE < j[:stderr_buf_to_flush].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i)
+ if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
+ (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
write_log j
end
end
protected
+ def too_many_bytes_logged_for_job(j)
+ return (j[:bytes_logged] + j[:stderr_buf_to_flush].size >
+ Rails.configuration.crunch_limit_log_event_bytes_per_job)
+ end
+
+ def too_many_events_logged_for_job(j)
+ return (j[:events_logged] >= Rails.configuration.crunch_limit_log_events_per_job)
+ end
+
def did_recently(thing, min_interval)
@did_recently ||= {}
if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
def write_log running_job
begin
if (running_job && running_job[:stderr_buf_to_flush] != '')
+ # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
+ # or crunch_limit_log_events_per_job.
+ if (too_many_bytes_logged_for_job(running_job))
+ return if running_job[:log_truncated]
+ running_job[:log_truncated] = true
+ running_job[:stderr_buf_to_flush] =
+ "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
+ elsif (too_many_events_logged_for_job(running_job))
+ return if running_job[:log_truncated]
+ running_job[:log_truncated] = true
+ running_job[:stderr_buf_to_flush] =
+ "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
+ end
log = Log.new(object_uuid: running_job[:job].uuid,
event_type: 'stderr',
owner_uuid: running_job[:job].owner_uuid,
properties: {"text" => running_job[:stderr_buf_to_flush]})
log.save!
+ running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
+ running_job[:events_logged] += 1
running_job[:stderr_buf_to_flush] = ''
running_job[:stderr_flushed_at] = Time.now.to_i
end