8406: @job_retry_counts.include? jobrecord.uuid because @job_retry_counts has a defau...
[arvados.git] / services / api / lib / crunch_dispatch.rb
index 15491ad722175b84a9024687c70918fbbd881c10..131386d765f66b66d958d244a6590131d0189769 100644 (file)
@@ -2,6 +2,7 @@ require 'open3'
 require 'shellwords'
 
 class CrunchDispatch
+  extend DbCurrentTime
   include ApplicationHelper
   include Process
 
@@ -22,6 +23,7 @@ class CrunchDispatch
     end
 
     @docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN']
+    @docker_run_args = ENV['CRUNCH_JOB_DOCKER_RUN_ARGS']
 
     @arvados_internal = Rails.configuration.git_internal_dir
     if not File.exists? @arvados_internal
@@ -160,7 +162,8 @@ class CrunchDispatch
       # Prefer nodes with no price, then cheap nodes, then expensive nodes
       node.properties['cloud_node']['price'].to_f rescue 0
     end.each do |node|
-      if need_procs.select { |node_test| not node_test.call(node) }.any?
+      if need_procs.select { |need_proc| not need_proc.call(node) }.any?
+        # At least one runtime constraint is not satisfied by this node
         next
       end
       usable_nodes << node
@@ -192,7 +195,7 @@ class CrunchDispatch
     nodelist
   end
 
-  def fail_job job, message
+  def fail_job job, message, skip_lock: false
     $stderr.puts "dispatch: #{job.uuid}: #{message}"
     begin
       Log.new(object_uuid: job.uuid,
@@ -204,14 +207,18 @@ class CrunchDispatch
       $stderr.puts "dispatch: log.create failed"
     end
 
-    begin
-      job.lock @authorizations[job.uuid].user.uuid
-      job.state = "Failed"
-      if not job.save
-        $stderr.puts "dispatch: save failed setting job #{job.uuid} to failed"
+    if not skip_lock and not have_job_lock?(job)
+      begin
+        job.lock @authorizations[job.uuid].user.uuid
+      rescue ArvadosModel::AlreadyLockedError
+        $stderr.puts "dispatch: tried to mark job #{job.uuid} as failed but it was already locked by someone else"
+        return
       end
-    rescue ArvadosModel::AlreadyLockedError
-      $stderr.puts "dispatch: tried to mark job #{job.uuid} as failed but it was already locked by someone else"
+    end
+
+    job.state = "Failed"
+    if not job.save
+      $stderr.puts "dispatch: save failed setting job #{job.uuid} to failed"
     end
   end
 
@@ -333,16 +340,7 @@ class CrunchDispatch
         raise "Unknown crunch_job_wrapper: #{Server::Application.config.crunch_job_wrapper}"
       end
 
-      if Server::Application.config.crunch_job_user
-        cmd_args.unshift("sudo", "-E", "-u",
-                         Server::Application.config.crunch_job_user,
-                         "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
-                         "PATH=#{ENV['PATH']}",
-                         "PERLLIB=#{ENV['PERLLIB']}",
-                         "PYTHONPATH=#{ENV['PYTHONPATH']}",
-                         "RUBYLIB=#{ENV['RUBYLIB']}",
-                         "GEM_PATH=#{ENV['GEM_PATH']}")
-      end
+      cmd_args = sudo_preface + cmd_args
 
       next unless get_authorization job
 
@@ -356,7 +354,7 @@ class CrunchDispatch
         # reasonable thing to do at this point.
         repo = Repository.where(name: job.repository).first
         if repo.nil? or repo.server_path.nil?
-          fail_job "Repository #{job.repository} not found under #{@repo_root}"
+          fail_job job, "Repository #{job.repository} not found under #{@repo_root}"
           next
         end
         ready &&= get_commit repo.server_path, job.script_version
@@ -390,7 +388,11 @@ class CrunchDispatch
         cmd_args += ['--docker-bin', @docker_bin]
       end
 
-      if @todo_job_retries.include?(job.uuid)
+      if @docker_run_args
+        cmd_args += ['--docker-run-args', @docker_run_args]
+      end
+
+      if have_job_lock?(job)
         cmd_args << "--force-unlock"
       end
 
@@ -635,7 +637,7 @@ class CrunchDispatch
 
     jobrecord = Job.find_by_uuid(job_done.uuid)
 
-    if exit_status == EXIT_RETRY_UNLOCKED
+    if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid)
       # The job failed because all of the nodes allocated to it
       # failed.  Only this crunch-dispatch process can retry the job:
       # it's already locked, and there's no way to put it back in the
@@ -800,8 +802,59 @@ class CrunchDispatch
     end
   end
 
+  def fail_jobs before: nil
+    act_as_system_user do
+      threshold = nil
+      if before == 'reboot'
+        boottime = nil
+        open('/proc/stat').map(&:split).each do |stat, t|
+          if stat == 'btime'
+            boottime = t
+          end
+        end
+        if not boottime
+          raise "Could not find btime in /proc/stat"
+        end
+        threshold = Time.at(boottime.to_i)
+      elsif before
+        threshold = Time.parse(before, Time.now)
+      else
+        threshold = db_current_time
+      end
+      Rails.logger.info "fail_jobs: threshold is #{threshold}"
+
+      if Rails.configuration.crunch_job_wrapper == :slurm_immediate
+        # [["slurm_job_id", "slurm_job_name"], ...]
+        squeue = File.popen(['squeue', '-h', '-o', '%i %j']).readlines.map do |line|
+          line.strip.split(' ', 2)
+        end
+      else
+        squeue = []
+      end
+
+      Job.where('state = ? and started_at < ?', Job::Running, threshold).
+        each do |job|
+        Rails.logger.debug "fail_jobs: #{job.uuid} started #{job.started_at}"
+        squeue.each do |slurm_id, slurm_name|
+          if slurm_name == job.uuid
+            Rails.logger.info "fail_jobs: scancel #{slurm_id} for #{job.uuid}"
+            scancel slurm_id
+          end
+        end
+        fail_job(job, "cleaned up stale job: started before #{threshold}",
+                 skip_lock: true)
+      end
+    end
+  end
+
   protected
 
+  def have_job_lock?(job)
+    # Return true if the given job is locked by this crunch-dispatch, normally
+    # because we've run crunch-job for it.
+    @todo_job_retries.include?(job.uuid)
+  end
+
   def did_recently(thing, min_interval)
     if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
       @did_recently[thing] = Time.now
@@ -835,4 +888,24 @@ class CrunchDispatch
       running_job[:stderr_flushed_at] = Time.now
     end
   end
+
+  def scancel slurm_id
+    cmd = sudo_preface + ['scancel', slurm_id]
+    puts File.popen(cmd).read
+    if not $?.success?
+      Rails.logger.error "scancel #{slurm_id.shellescape}: $?"
+    end
+  end
+
+  def sudo_preface
+    return [] if not Server::Application.config.crunch_job_user
+    ["sudo", "-E", "-u",
+     Server::Application.config.crunch_job_user,
+     "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
+     "PATH=#{ENV['PATH']}",
+     "PERLLIB=#{ENV['PERLLIB']}",
+     "PYTHONPATH=#{ENV['PYTHONPATH']}",
+     "RUBYLIB=#{ENV['RUBYLIB']}",
+     "GEM_PATH=#{ENV['GEM_PATH']}"]
+  end
 end