Merge branch 'master' into 3138-wiselinks
[arvados.git] / services / api / script / crunch-dispatch.rb
index 59e3aff31e692c4003479615017f768624e49f9e..5a990f0cb41ad30bd8832399a0d40c1839f2460e 100755 (executable)
@@ -26,8 +26,6 @@ require File.dirname(__FILE__) + '/../config/boot'
 require File.dirname(__FILE__) + '/../config/environment'
 require 'open3'
 
-LOG_BUFFER_SIZE = 4096
-
 class Dispatcher
   include ApplicationHelper
 
@@ -80,7 +78,7 @@ class Dispatcher
           # update our database (and cache) when a node's state changes
           if @node_state[re[1]] != re[2]
             @node_state[re[1]] = re[2]
-            node = Node.where('hostname=?', re[1]).first
+            node = Node.where('hostname=?', re[1]).order(:last_ping_at).last
             if node
               $stderr.puts "dispatch: update #{re[1]} state to #{re[2]}"
               node.info['slurm_state'] = re[2]
@@ -168,6 +166,10 @@ class Dispatcher
       cmd_args = nil
       case Server::Application.config.crunch_job_wrapper
       when :none
+        if @running.size > 0
+            # Don't run more than one at a time.
+            return
+        end
         cmd_args = []
       when :slurm_immediate
         nodelist = nodes_available_for_job(job)
@@ -228,7 +230,7 @@ class Dispatcher
         next
       end
 
-      $stderr.puts `cd #{arvados_internal.shellescape} && git fetch --no-tags #{src_repo.shellescape} && git tag #{job.uuid.shellescape} #{job.script_version.shellescape}`
+      $stderr.puts `cd #{arvados_internal.shellescape} && git fetch-pack --all #{src_repo.shellescape} && git tag #{job.uuid.shellescape} #{job.script_version.shellescape}`
 
       cmd_args << crunch_job_bin
       cmd_args << '--job-api-token'
@@ -264,7 +266,10 @@ class Dispatcher
         sent_int: 0,
         job_auth: job_auth,
         stderr_buf_to_flush: '',
-        stderr_flushed_at: 0
+        stderr_flushed_at: 0,
+        bytes_logged: 0,
+        events_logged: 0,
+        log_truncated: false
       }
       i.close
       update_node_status
@@ -314,7 +319,8 @@ class Dispatcher
             j[:stderr_buf_to_flush] << pub_msg
           end
 
-          if (LOG_BUFFER_SIZE < j[:stderr_buf_to_flush].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i)
+          if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
+              (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
             write_log j
           end
         end
@@ -448,6 +454,15 @@ class Dispatcher
 
   protected
 
+  def too_many_bytes_logged_for_job(j)
+    return (j[:bytes_logged] + j[:stderr_buf_to_flush].size >
+            Rails.configuration.crunch_limit_log_event_bytes_per_job)
+  end
+
+  def too_many_events_logged_for_job(j)
+    return (j[:events_logged] >= Rails.configuration.crunch_limit_log_events_per_job)
+  end
+
   def did_recently(thing, min_interval)
     @did_recently ||= {}
     if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
@@ -462,11 +477,26 @@ class Dispatcher
   def write_log running_job
     begin
       if (running_job && running_job[:stderr_buf_to_flush] != '')
+        # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
+        # or crunch_limit_log_events_per_job.
+        if (too_many_bytes_logged_for_job(running_job))
+          return if running_job[:log_truncated]
+          running_job[:log_truncated] = true
+          running_job[:stderr_buf_to_flush] =
+              "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
+        elsif (too_many_events_logged_for_job(running_job))
+          return if running_job[:log_truncated]
+          running_job[:log_truncated] = true
+          running_job[:stderr_buf_to_flush] =
+              "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
+        end
         log = Log.new(object_uuid: running_job[:job].uuid,
                       event_type: 'stderr',
                       owner_uuid: running_job[:job].owner_uuid,
                       properties: {"text" => running_job[:stderr_buf_to_flush]})
         log.save!
+        running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
+        running_job[:events_logged] += 1
         running_job[:stderr_buf_to_flush] = ''
         running_job[:stderr_flushed_at] = Time.now.to_i
       end