Merge branch '11221-always-restart-services'
[arvados.git] / services / api / lib / crunch_dispatch.rb
index ce94f737a2467f855a7156ba76873db57cd183ee..bea1657de22b72c0d5296a4c571afcee3ffc0993 100644 (file)
@@ -27,7 +27,7 @@ class CrunchDispatch
     @cgroup_root = ENV['CRUNCH_CGROUP_ROOT']
 
     @arvados_internal = Rails.configuration.git_internal_dir
-    if not File.exists? @arvados_internal
+    if not File.exist? @arvados_internal
       $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
       raise "No internal git repository available" unless ($? == 0)
     end
@@ -73,7 +73,7 @@ class CrunchDispatch
       # into multiple rows with one hostname each.
       `#{cmd} --noheader -o '%N:#{outfmt}'`.each_line do |line|
         tokens = line.chomp.split(":", max_fields)
-        if (re = tokens[0].match /^(.*?)\[([-,\d]+)\]$/)
+        if (re = tokens[0].match(/^(.*?)\[([-,\d]+)\]$/))
           tokens.shift
           re[2].split(",").each do |range|
             range = range.split("-").collect(&:to_i)
@@ -95,7 +95,7 @@ class CrunchDispatch
       # hasn't been able to communicate with it recently.
       state.sub!(/^idle\*/, "down")
       state.sub!(/\W+$/, "")
-      state = "down" unless %w(idle alloc down).include?(state)
+      state = "down" unless %w(idle alloc comp mix drng down).include?(state)
       slurm_nodes[hostname] = {state: state, job: nil}
     end
     each_slurm_line("squeue", "%j") do |hostname, job_uuid|
@@ -105,7 +105,7 @@ class CrunchDispatch
   end
 
   def update_node_status
-    return unless Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
+    return unless Server::Application.config.crunch_job_wrapper.to_s.match(/^slurm/)
     slurm_status.each_pair do |hostname, slurmdata|
       next if @node_state[hostname] == slurmdata
       begin
@@ -169,7 +169,25 @@ class CrunchDispatch
       end
       usable_nodes << node
       if usable_nodes.count >= min_node_count
-        return usable_nodes.map { |node| node.hostname }
+        hostnames = usable_nodes.map(&:hostname)
+        log_nodes = usable_nodes.map do |n|
+          "#{n.hostname} #{n.uuid} #{n.properties.to_json}"
+        end
+        log_job = "#{job.uuid} #{job.runtime_constraints}"
+        log_text = "dispatching job #{log_job} to #{log_nodes.join(", ")}"
+        $stderr.puts log_text
+        begin
+          act_as_system_user do
+            Log.new(object_uuid: job.uuid,
+                    event_type: 'dispatch',
+                    owner_uuid: system_user_uuid,
+                    summary: "dispatching to #{hostnames.join(", ")}",
+                    properties: {'text' => log_text}).save!
+          end
+        rescue => e
+          $stderr.puts "dispatch: log.create failed: #{e}"
+        end
+        return hostnames
       end
     end
     nil
@@ -204,8 +222,8 @@ class CrunchDispatch
               owner_uuid: job.owner_uuid,
               summary: message,
               properties: {"text" => message}).save!
-    rescue
-      $stderr.puts "dispatch: log.create failed"
+    rescue => e
+      $stderr.puts "dispatch: log.create failed: #{e}"
     end
 
     if not skip_lock and not have_job_lock?(job)
@@ -512,8 +530,6 @@ class CrunchDispatch
 
   def read_pipes
     @running.each do |job_uuid, j|
-      job = j[:job]
-
       now = Time.now
       if now > j[:log_throttle_reset_time]
         # It has been more than throttle_period seconds since the last
@@ -816,6 +832,9 @@ class CrunchDispatch
         unless (@todo_pipelines.empty? and @pipe_auth_tokens.empty?) or did_recently(:update_pipelines, 5.0)
           update_pipelines
         end
+        unless did_recently('check_orphaned_slurm_jobs', 60)
+          check_orphaned_slurm_jobs
+        end
       end
       reap_children
       select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten,
@@ -852,22 +871,14 @@ class CrunchDispatch
       end
       Rails.logger.info "fail_jobs: threshold is #{threshold}"
 
-      if Rails.configuration.crunch_job_wrapper == :slurm_immediate
-        # [["slurm_job_id", "slurm_job_name"], ...]
-        squeue = File.popen(['squeue', '-h', '-o', '%i %j']).readlines.map do |line|
-          line.strip.split(' ', 2)
-        end
-      else
-        squeue = []
-      end
-
+      squeue = squeue_jobs
       Job.where('state = ? and started_at < ?', Job::Running, threshold).
         each do |job|
         Rails.logger.debug "fail_jobs: #{job.uuid} started #{job.started_at}"
-        squeue.each do |slurm_id, slurm_name|
+        squeue.each do |slurm_name|
           if slurm_name == job.uuid
-            Rails.logger.info "fail_jobs: scancel #{slurm_id} for #{job.uuid}"
-            scancel slurm_id
+            Rails.logger.info "fail_jobs: scancel #{job.uuid}"
+            scancel slurm_name
           end
         end
         fail_job(job, "cleaned up stale job: started before #{threshold}",
@@ -876,6 +887,37 @@ class CrunchDispatch
     end
   end
 
+  def check_orphaned_slurm_jobs
+    act_as_system_user do
+      squeue_uuids = squeue_jobs.select{|uuid| uuid.match(HasUuid::UUID_REGEX)}.
+                                  select{|uuid| !@running.has_key?(uuid)}
+
+      return if squeue_uuids.size == 0
+
+      scancel_uuids = squeue_uuids - Job.where('uuid in (?) and (state in (?) or modified_at>?)',
+                                               squeue_uuids,
+                                               ['Running', 'Queued'],
+                                               (Time.now - 60)).
+                                         collect(&:uuid)
+      scancel_uuids.each do |uuid|
+        Rails.logger.info "orphaned job: scancel #{uuid}"
+        scancel uuid
+      end
+    end
+  end
+
+  def sudo_preface
+    return [] if not Server::Application.config.crunch_job_user
+    ["sudo", "-E", "-u",
+     Server::Application.config.crunch_job_user,
+     "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
+     "PATH=#{ENV['PATH']}",
+     "PERLLIB=#{ENV['PERLLIB']}",
+     "PYTHONPATH=#{ENV['PYTHONPATH']}",
+     "RUBYLIB=#{ENV['RUBYLIB']}",
+     "GEM_PATH=#{ENV['GEM_PATH']}"]
+  end
+
   protected
 
   def have_job_lock?(job)
@@ -918,23 +960,27 @@ class CrunchDispatch
     end
   end
 
-  def scancel slurm_id
-    cmd = sudo_preface + ['scancel', slurm_id]
-    puts File.popen(cmd).read
-    if not $?.success?
-      Rails.logger.error "scancel #{slurm_id.shellescape}: $?"
+  # An array of job_uuids in squeue
+  def squeue_jobs
+    if Rails.configuration.crunch_job_wrapper == :slurm_immediate
+      p = IO.popen(['squeue', '-a', '-h', '-o', '%j'])
+      begin
+        p.readlines.map {|line| line.strip}
+      ensure
+        p.close
+      end
+    else
+      []
     end
   end
 
-  def sudo_preface
-    return [] if not Server::Application.config.crunch_job_user
-    ["sudo", "-E", "-u",
-     Server::Application.config.crunch_job_user,
-     "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
-     "PATH=#{ENV['PATH']}",
-     "PERLLIB=#{ENV['PERLLIB']}",
-     "PYTHONPATH=#{ENV['PYTHONPATH']}",
-     "RUBYLIB=#{ENV['RUBYLIB']}",
-     "GEM_PATH=#{ENV['GEM_PATH']}"]
+  def scancel slurm_name
+    cmd = sudo_preface + ['scancel', '-n', slurm_name]
+    IO.popen(cmd) do |scancel_pipe|
+      puts scancel_pipe.read
+    end
+    if not $?.success?
+      Rails.logger.error "scancel #{slurm_name.shellescape}: $?"
+    end
   end
 end