X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/82c4697bf24b10f3fb66d303ae73499095b5742a..2945ea907a7c35da2d03d6775574fb1ad9be2b09:/services/api/script/crunch-dispatch.rb diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb index 59e3aff31e..5a990f0cb4 100755 --- a/services/api/script/crunch-dispatch.rb +++ b/services/api/script/crunch-dispatch.rb @@ -26,8 +26,6 @@ require File.dirname(__FILE__) + '/../config/boot' require File.dirname(__FILE__) + '/../config/environment' require 'open3' -LOG_BUFFER_SIZE = 4096 - class Dispatcher include ApplicationHelper @@ -80,7 +78,7 @@ class Dispatcher # update our database (and cache) when a node's state changes if @node_state[re[1]] != re[2] @node_state[re[1]] = re[2] - node = Node.where('hostname=?', re[1]).first + node = Node.where('hostname=?', re[1]).order(:last_ping_at).last if node $stderr.puts "dispatch: update #{re[1]} state to #{re[2]}" node.info['slurm_state'] = re[2] @@ -168,6 +166,10 @@ class Dispatcher cmd_args = nil case Server::Application.config.crunch_job_wrapper when :none + if @running.size > 0 + # Don't run more than one at a time. + return + end cmd_args = [] when :slurm_immediate nodelist = nodes_available_for_job(job) @@ -228,7 +230,7 @@ class Dispatcher next end - $stderr.puts `cd #{arvados_internal.shellescape} && git fetch --no-tags #{src_repo.shellescape} && git tag #{job.uuid.shellescape} #{job.script_version.shellescape}` + $stderr.puts `cd #{arvados_internal.shellescape} && git fetch-pack --all #{src_repo.shellescape} && git tag #{job.uuid.shellescape} #{job.script_version.shellescape}` cmd_args << crunch_job_bin cmd_args << '--job-api-token' @@ -264,7 +266,10 @@ class Dispatcher sent_int: 0, job_auth: job_auth, stderr_buf_to_flush: '', - stderr_flushed_at: 0 + stderr_flushed_at: 0, + bytes_logged: 0, + events_logged: 0, + log_truncated: false } i.close update_node_status @@ -314,7 +319,8 @@ class Dispatcher j[:stderr_buf_to_flush] << pub_msg end - if (LOG_BUFFER_SIZE < j[:stderr_buf_to_flush].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i) + if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or + (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i)) write_log j end end @@ -448,6 +454,15 @@ class Dispatcher protected + def too_many_bytes_logged_for_job(j) + return (j[:bytes_logged] + j[:stderr_buf_to_flush].size > + Rails.configuration.crunch_limit_log_event_bytes_per_job) + end + + def too_many_events_logged_for_job(j) + return (j[:events_logged] >= Rails.configuration.crunch_limit_log_events_per_job) + end + def did_recently(thing, min_interval) @did_recently ||= {} if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval @@ -462,11 +477,26 @@ class Dispatcher def write_log running_job begin if (running_job && running_job[:stderr_buf_to_flush] != '') + # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job + # or crunch_limit_log_events_per_job. + if (too_many_bytes_logged_for_job(running_job)) + return if running_job[:log_truncated] + running_job[:log_truncated] = true + running_job[:stderr_buf_to_flush] = + "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated" + elsif (too_many_events_logged_for_job(running_job)) + return if running_job[:log_truncated] + running_job[:log_truncated] = true + running_job[:stderr_buf_to_flush] = + "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated" + end log = Log.new(object_uuid: running_job[:job].uuid, event_type: 'stderr', owner_uuid: running_job[:job].owner_uuid, properties: {"text" => running_job[:stderr_buf_to_flush]}) log.save! + running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size + running_job[:events_logged] += 1 running_job[:stderr_buf_to_flush] = '' running_job[:stderr_flushed_at] = Time.now.to_i end