require File.dirname(__FILE__) + '/../config/environment'
require 'open3'
-LOG_BUFFER_SIZE = 4096
-
class Dispatcher
include ApplicationHelper
sent_int: 0,
job_auth: job_auth,
stderr_buf_to_flush: '',
- stderr_flushed_at: 0
+ stderr_flushed_at: 0,
+ log_truncated: false
}
i.close
update_node_status
j[:stderr_buf_to_flush] << pub_msg
end
- if (LOG_BUFFER_SIZE < j[:stderr_buf_to_flush].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i)
+ if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
+ (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
write_log j
end
end
protected
+ def too_many_bytes_logged_for_job(running_job)
+ bytes_logged = Log.where(uuid: running_job[:uuid]).map { |event|
+ (event.properties["text"] || "").length
+ }.sum
+ return (bytes_logged + j[:stderr_buf_to_flush].size >
+ Rails.configuration.crunch_limit_log_event_bytes_per_job)
+ end
+
+ def too_many_events_logged_for_job(running_job)
+ log_count = Log.where(uuid: running_job[:uuid]).count
+ return (log_count >= Rails.configuration.crunch_limit_log_events_per_job)
+ end
+
def did_recently(thing, min_interval)
@did_recently ||= {}
if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
def write_log running_job
begin
if (running_job && running_job[:stderr_buf_to_flush] != '')
+ # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
+ # or crunch_limit_log_events_per_job.
+ if (too_many_bytes_logged_for_job(running_job))
+ return if running_job[:log_truncated]
+ running_job[:log_truncated] = true
+ running_job[:stderr_buf_to_flush] =
+ "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
+ elsif (too_many_events_logged_for_job(running_job))
+ return if running_job[:log_truncated]
+ running_job[:log_truncated] = true
+ running_job[:stderr_buf_to_flush] =
+ "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
+ end
log = Log.new(object_uuid: running_job[:job].uuid,
event_type: 'stderr',
owner_uuid: running_job[:job].owner_uuid,
--- /dev/null
+require 'pp'
+require 'test_helper'
+load 'test/functional/arvados/v1/git_setup.rb'
+
+class CrunchDispatchTest < ActionDispatch::IntegrationTest
+ include CurrentApiClient
+ include GitSetup
+
+ fixtures :all
+
+ test "job runs" do
+ authorize_with :admin
+ set_user_from_auth :admin
+ post "/arvados/v1/jobs", {
+ format: "json",
+ job: {
+ script: "log",
+ repository: "bar",
+ script_version: "143fec09e988160673c63457fa12a0f70b5b8a26",
+ script_parameters: {}
+ }
+ }, {HTTP_AUTHORIZATION: "OAuth2 #{current_api_client_authorization}"}
+ p "response: #{@response.body}"
+ assert_response :success
+ resp = JSON.parse(@response.body)
+ end
+end