Merge branch 'master' into 3338-tab-counts
[arvados.git] / services / api / script / crunch-dispatch.rb
index 3ddf83da18ad878bc8f84efb5ab0810ecf3f6552..d316d92d605ce7c94e5c0eb8c0697d3f0f9c1ab2 100755 (executable)
@@ -2,6 +2,23 @@
 
 include Process
 
+$options = {}
+(ARGV.any? ? ARGV : ['--jobs', '--pipelines']).each do |arg|
+  case arg
+  when '--jobs'
+    $options[:jobs] = true
+  when '--pipelines'
+    $options[:pipelines] = true
+  else
+    abort "Unrecognized command line option '#{arg}'"
+  end
+end
+if not ($options[:jobs] or $options[:pipelines])
+  abort "Nothing to do. Please specify at least one of: --jobs, --pipelines."
+end
+
+ARGV.reject! { |a| a =~ /--jobs|--pipelines/ }
+
 $warned = {}
 $signal = {}
 %w{TERM INT}.each do |sig|
@@ -26,8 +43,6 @@ require File.dirname(__FILE__) + '/../config/boot'
 require File.dirname(__FILE__) + '/../config/environment'
 require 'open3'
 
-LOG_BUFFER_SIZE = 4096
-
 class Dispatcher
   include ApplicationHelper
 
@@ -36,8 +51,14 @@ class Dispatcher
   end
 
   def refresh_todo
-    @todo = Job.queue.select do |j| j.repository end
-    @todo_pipelines = PipelineInstance.queue
+    @todo = []
+    if $options[:jobs]
+      @todo = Job.queue.select(&:repository)
+    end
+    @todo_pipelines = []
+    if $options[:pipelines]
+      @todo_pipelines = PipelineInstance.queue
+    end
   end
 
   def sinfo
@@ -65,74 +86,139 @@ class Dispatcher
 
   def update_node_status
     if Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
-      @nodes_in_state = {idle: 0, alloc: 0, down: 0}
       @node_state ||= {}
       node_seen = {}
       begin
         sinfo.split("\n").
           each do |line|
-          re = line.match /(\S+?):+(idle|alloc|down)/
+          re = line.match /(\S+?):+(idle|alloc|down)?/
           next if !re
 
-          # sinfo tells us about a node N times if it is shared by N partitions
-          next if node_seen[re[1]]
-          node_seen[re[1]] = true
+          _, node_name, node_state = *re
+          node_state = 'down' unless %w(idle alloc down).include? node_state
 
-          # count nodes in each state
-          @nodes_in_state[re[2].to_sym] += 1
+          # sinfo tells us about a node N times if it is shared by N partitions
+          next if node_seen[node_name]
+          node_seen[node_name] = true
 
           # update our database (and cache) when a node's state changes
-          if @node_state[re[1]] != re[2]
-            @node_state[re[1]] = re[2]
-            node = Node.where('hostname=?', re[1]).first
+          if @node_state[node_name] != node_state
+            @node_state[node_name] = node_state
+            node = Node.where('hostname=?', node_name).order(:last_ping_at).last
             if node
-              $stderr.puts "dispatch: update #{re[1]} state to #{re[2]}"
-              node.info[:slurm_state] = re[2]
-              node.save
-            elsif re[2] != 'down'
-              $stderr.puts "dispatch: sinfo reports '#{re[1]}' is not down, but no node has that name"
+              $stderr.puts "dispatch: update #{node_name} state to #{node_state}"
+              node.info['slurm_state'] = node_state
+              if not node.save
+                $stderr.puts "dispatch: failed to update #{node.uuid}: #{node.errors.messages}"
+              end
+            elsif node_state != 'down'
+              $stderr.puts "dispatch: sinfo reports '#{node_name}' is not down, but no node has that name"
             end
           end
         end
-      rescue
+      rescue => error
+        $stderr.puts "dispatch: error updating node status: #{error}"
       end
     end
   end
 
-  def start_jobs
-    @todo.each do |job|
+  def positive_int(raw_value, default=nil)
+    value = begin raw_value.to_i rescue 0 end
+    if value > 0
+      value
+    else
+      default
+    end
+  end
 
-      min_nodes = 1
-      begin
-        if job.runtime_constraints['min_nodes']
-          min_nodes = begin job.runtime_constraints['min_nodes'].to_i rescue 1 end
+  NODE_CONSTRAINT_MAP = {
+    # Map Job runtime_constraints keys to the corresponding Node info key.
+    'min_ram_mb_per_node' => 'total_ram_mb',
+    'min_scratch_mb_per_node' => 'total_scratch_mb',
+    'min_cores_per_node' => 'total_cpu_cores',
+  }
+
+  def nodes_available_for_job_now(job)
+    # Find Nodes that satisfy a Job's runtime constraints (by building
+    # a list of Procs and using them to test each Node).  If there
+    # enough to run the Job, return an array of their names.
+    # Otherwise, return nil.
+    need_procs = NODE_CONSTRAINT_MAP.each_pair.map do |job_key, node_key|
+      Proc.new do |node|
+        positive_int(node.info[node_key], 0) >=
+          positive_int(job.runtime_constraints[job_key], 0)
+      end
+    end
+    min_node_count = positive_int(job.runtime_constraints['min_nodes'], 1)
+    usable_nodes = []
+    Node.find_each do |node|
+      good_node = (node.info['slurm_state'] == 'idle')
+      need_procs.each { |node_test| good_node &&= node_test.call(node) }
+      if good_node
+        usable_nodes << node
+        if usable_nodes.count >= min_node_count
+          return usable_nodes.map { |node| node.hostname }
         end
       end
+    end
+    nil
+  end
 
-      begin
-        next if @nodes_in_state[:idle] < min_nodes
-      rescue
-      end
+  def nodes_available_for_job(job)
+    # Check if there are enough idle nodes with the Job's minimum
+    # hardware requirements to run it.  If so, return an array of
+    # their names.  If not, up to once per hour, signal start_jobs to
+    # hold off launching Jobs.  This delay is meant to give the Node
+    # Manager an opportunity to make new resources available for new
+    # Jobs.
+    #
+    # The exact timing parameters here might need to be adjusted for
+    # the best balance between helping the longest-waiting Jobs run,
+    # and making efficient use of immediately available resources.
+    # These are all just first efforts until we have more data to work
+    # with.
+    nodelist = nodes_available_for_job_now(job)
+    if nodelist.nil? and not did_recently(:wait_for_available_nodes, 3600)
+      $stderr.puts "dispatch: waiting for nodes for #{job.uuid}"
+      @node_wait_deadline = Time.now + 5.minutes
+    end
+    nodelist
+  end
 
+  def start_jobs
+    @todo.each do |job|
       next if @running[job.uuid]
-      next if !take(job)
 
       cmd_args = nil
       case Server::Application.config.crunch_job_wrapper
       when :none
+        if @running.size > 0
+            # Don't run more than one at a time.
+            return
+        end
         cmd_args = []
       when :slurm_immediate
+        nodelist = nodes_available_for_job(job)
+        if nodelist.nil?
+          if Time.now < @node_wait_deadline
+            break
+          else
+            next
+          end
+        end
         cmd_args = ["salloc",
                     "--chdir=/",
                     "--immediate",
                     "--exclusive",
                     "--no-kill",
                     "--job-name=#{job.uuid}",
-                    "--nodes=#{min_nodes}"]
+                    "--nodelist=#{nodelist.join(',')}"]
       else
         raise "Unknown crunch_job_wrapper: #{Server::Application.config.crunch_job_wrapper}"
       end
 
+      next if !take(job)
+
       if Server::Application.config.crunch_job_user
         cmd_args.unshift("sudo", "-E", "-u",
                          Server::Application.config.crunch_job_user,
@@ -170,7 +256,7 @@ class Dispatcher
         next
       end
 
-      $stderr.puts `cd #{arvados_internal.shellescape} && git fetch --no-tags #{src_repo.shellescape} && git tag #{job.uuid.shellescape} #{job.script_version.shellescape}`
+      $stderr.puts `cd #{arvados_internal.shellescape} && git fetch-pack --all #{src_repo.shellescape} && git tag #{job.uuid.shellescape} #{job.script_version.shellescape}`
 
       cmd_args << crunch_job_bin
       cmd_args << '--job-api-token'
@@ -206,9 +292,13 @@ class Dispatcher
         sent_int: 0,
         job_auth: job_auth,
         stderr_buf_to_flush: '',
-        stderr_flushed_at: 0
+        stderr_flushed_at: 0,
+        bytes_logged: 0,
+        events_logged: 0,
+        log_truncated: false
       }
       i.close
+      update_node_status
     end
   end
 
@@ -252,11 +342,16 @@ class Dispatcher
             $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
             $stderr.puts line
             pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
-            j[:stderr_buf_to_flush] << pub_msg
+            if not j[:log_truncated]
+              j[:stderr_buf_to_flush] << pub_msg
+            end
           end
 
-          if (LOG_BUFFER_SIZE < j[:stderr_buf_to_flush].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i)
-            write_log j
+          if not j[:log_truncated]
+            if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
+                (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
+              write_log j
+            end
           end
         end
       end
@@ -310,11 +405,11 @@ class Dispatcher
       $stderr.puts j_done[:stderr_buf] + "\n"
     end
 
-    # Wait the thread
-    j_done[:wait_thr].value
+    # Wait the thread (returns a Process::Status)
+    exit_status = j_done[:wait_thr].value
 
     jobrecord = Job.find_by_uuid(job_done.uuid)
-    if jobrecord.started_at
+    if exit_status.to_i != 75 and jobrecord.started_at
       # Clean up state fields in case crunch-job exited without
       # putting the job in a suitable "finished" state.
       jobrecord.running = false
@@ -327,7 +422,18 @@ class Dispatcher
       # Don't fail the job if crunch-job didn't even get as far as
       # starting it. If the job failed to run due to an infrastructure
       # issue with crunch-job or slurm, we want the job to stay in the
-      # queue.
+      # queue. If crunch-job exited after losing a race to another
+      # crunch-job process, it exits 75 and we should leave the job
+      # record alone so the winner of the race do its thing.
+      #
+      # There is still an unhandled race condition: If our crunch-job
+      # process is about to lose a race with another crunch-job
+      # process, but crashes before getting to its "exit 75" (for
+      # example, "cannot fork" or "cannot reach API server") then we
+      # will assume incorrectly that it's our process's fault
+      # jobrecord.started_at is non-nil, and mark the job as failed
+      # even though the winner of the race is probably still doing
+      # fine.
     end
 
     # Invalidate the per-job auth token
@@ -389,6 +495,15 @@ class Dispatcher
 
   protected
 
+  def too_many_bytes_logged_for_job(j)
+    return (j[:bytes_logged] + j[:stderr_buf_to_flush].size >
+            Rails.configuration.crunch_limit_log_event_bytes_per_job)
+  end
+
+  def too_many_events_logged_for_job(j)
+    return (j[:events_logged] >= Rails.configuration.crunch_limit_log_events_per_job)
+  end
+
   def did_recently(thing, min_interval)
     @did_recently ||= {}
     if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
@@ -401,21 +516,32 @@ class Dispatcher
 
   # send message to log table. we want these records to be transient
   def write_log running_job
+    return if running_job[:log_truncated]
+    return if running_job[:stderr_buf_to_flush] == ''
     begin
-      if (running_job && running_job[:stderr_buf_to_flush] != '')
-        log = Log.new(object_uuid: running_job[:job].uuid,
-                      event_type: 'stderr',
-                      owner_uuid: running_job[:job].owner_uuid,
-                      properties: {"text" => running_job[:stderr_buf_to_flush]})
-        log.save!
-        running_job[:stderr_buf_to_flush] = ''
-        running_job[:stderr_flushed_at] = Time.now.to_i
+      # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
+      # or crunch_limit_log_events_per_job.
+      if (too_many_bytes_logged_for_job(running_job))
+        running_job[:log_truncated] = true
+        running_job[:stderr_buf_to_flush] =
+          "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
+      elsif (too_many_events_logged_for_job(running_job))
+        running_job[:log_truncated] = true
+        running_job[:stderr_buf_to_flush] =
+          "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
       end
+      log = Log.new(object_uuid: running_job[:job].uuid,
+                    event_type: 'stderr',
+                    owner_uuid: running_job[:job].owner_uuid,
+                    properties: {"text" => running_job[:stderr_buf_to_flush]})
+      log.save!
+      running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
+      running_job[:events_logged] += 1
     rescue
-      running_job[:stderr_buf] = "Failed to write logs \n"
-      running_job[:stderr_buf_to_flush] = ''
-      running_job[:stderr_flushed_at] = Time.now.to_i
+      running_job[:stderr_buf] = "Failed to write logs\n" + running_job[:stderr_buf]
     end
+    running_job[:stderr_buf_to_flush] = ''
+    running_job[:stderr_flushed_at] = Time.now.to_i
   end
 
 end