13493: Update CORS OPTIONS test.
[arvados.git] / services / api / lib / crunch_dispatch.rb
index bd1591da37a855ef514da91bb28acbb4519dd474..73ad7606cc879ef58f7569c960196191c7fb7721 100644 (file)
@@ -1,7 +1,12 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 require 'open3'
 require 'shellwords'
 
 class CrunchDispatch
+  extend DbCurrentTime
   include ApplicationHelper
   include Process
 
@@ -22,9 +27,11 @@ class CrunchDispatch
     end
 
     @docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN']
+    @docker_run_args = ENV['CRUNCH_JOB_DOCKER_RUN_ARGS']
+    @cgroup_root = ENV['CRUNCH_CGROUP_ROOT']
 
     @arvados_internal = Rails.configuration.git_internal_dir
-    if not File.exists? @arvados_internal
+    if not File.exist? @arvados_internal
       $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
       raise "No internal git repository available" unless ($? == 0)
     end
@@ -70,7 +77,7 @@ class CrunchDispatch
       # into multiple rows with one hostname each.
       `#{cmd} --noheader -o '%N:#{outfmt}'`.each_line do |line|
         tokens = line.chomp.split(":", max_fields)
-        if (re = tokens[0].match /^(.*?)\[([-,\d]+)\]$/)
+        if (re = tokens[0].match(/^(.*?)\[([-,\d]+)\]$/))
           tokens.shift
           re[2].split(",").each do |range|
             range = range.split("-").collect(&:to_i)
@@ -92,7 +99,7 @@ class CrunchDispatch
       # hasn't been able to communicate with it recently.
       state.sub!(/^idle\*/, "down")
       state.sub!(/\W+$/, "")
-      state = "down" unless %w(idle alloc down).include?(state)
+      state = "down" unless %w(idle alloc comp mix drng down).include?(state)
       slurm_nodes[hostname] = {state: state, job: nil}
     end
     each_slurm_line("squeue", "%j") do |hostname, job_uuid|
@@ -102,7 +109,7 @@ class CrunchDispatch
   end
 
   def update_node_status
-    return unless Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
+    return unless Server::Application.config.crunch_job_wrapper.to_s.match(/^slurm/)
     slurm_status.each_pair do |hostname, slurmdata|
       next if @node_state[hostname] == slurmdata
       begin
@@ -166,7 +173,25 @@ class CrunchDispatch
       end
       usable_nodes << node
       if usable_nodes.count >= min_node_count
-        return usable_nodes.map { |node| node.hostname }
+        hostnames = usable_nodes.map(&:hostname)
+        log_nodes = usable_nodes.map do |n|
+          "#{n.hostname} #{n.uuid} #{n.properties.to_json}"
+        end
+        log_job = "#{job.uuid} #{job.runtime_constraints}"
+        log_text = "dispatching job #{log_job} to #{log_nodes.join(", ")}"
+        $stderr.puts log_text
+        begin
+          act_as_system_user do
+            Log.new(object_uuid: job.uuid,
+                    event_type: 'dispatch',
+                    owner_uuid: system_user_uuid,
+                    summary: "dispatching to #{hostnames.join(", ")}",
+                    properties: {'text' => log_text}).save!
+          end
+        rescue => e
+          $stderr.puts "dispatch: log.create failed: #{e}"
+        end
+        return hostnames
       end
     end
     nil
@@ -193,7 +218,7 @@ class CrunchDispatch
     nodelist
   end
 
-  def fail_job job, message
+  def fail_job job, message, skip_lock: false
     $stderr.puts "dispatch: #{job.uuid}: #{message}"
     begin
       Log.new(object_uuid: job.uuid,
@@ -201,18 +226,22 @@ class CrunchDispatch
               owner_uuid: job.owner_uuid,
               summary: message,
               properties: {"text" => message}).save!
-    rescue
-      $stderr.puts "dispatch: log.create failed"
+    rescue => e
+      $stderr.puts "dispatch: log.create failed: #{e}"
     end
 
-    begin
-      job.lock @authorizations[job.uuid].user.uuid
-      job.state = "Failed"
-      if not job.save
-        $stderr.puts "dispatch: save failed setting job #{job.uuid} to failed"
+    if not skip_lock and not have_job_lock?(job)
+      begin
+        job.lock @authorizations[job.uuid].user.uuid
+      rescue ArvadosModel::AlreadyLockedError
+        $stderr.puts "dispatch: tried to mark job #{job.uuid} as failed but it was already locked by someone else"
+        return
       end
-    rescue ArvadosModel::AlreadyLockedError
-      $stderr.puts "dispatch: tried to mark job #{job.uuid} as failed but it was already locked by someone else"
+    end
+
+    job.state = "Failed"
+    if not job.save
+      $stderr.puts "dispatch: save failed setting job #{job.uuid} to failed"
     end
   end
 
@@ -268,7 +297,7 @@ class CrunchDispatch
     @fetched_commits[sha1] = ($? == 0)
   end
 
-  def tag_commit(commit_hash, tag_name)
+  def tag_commit(job, commit_hash, tag_name)
     # @git_tags[T]==V if we know commit V has been tagged T in the
     # arvados_internal repository.
     if not @git_tags[tag_name]
@@ -334,16 +363,7 @@ class CrunchDispatch
         raise "Unknown crunch_job_wrapper: #{Server::Application.config.crunch_job_wrapper}"
       end
 
-      if Server::Application.config.crunch_job_user
-        cmd_args.unshift("sudo", "-E", "-u",
-                         Server::Application.config.crunch_job_user,
-                         "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
-                         "PATH=#{ENV['PATH']}",
-                         "PERLLIB=#{ENV['PERLLIB']}",
-                         "PYTHONPATH=#{ENV['PYTHONPATH']}",
-                         "RUBYLIB=#{ENV['RUBYLIB']}",
-                         "GEM_PATH=#{ENV['GEM_PATH']}")
-      end
+      cmd_args = sudo_preface + cmd_args
 
       next unless get_authorization job
 
@@ -357,24 +377,24 @@ class CrunchDispatch
         # reasonable thing to do at this point.
         repo = Repository.where(name: job.repository).first
         if repo.nil? or repo.server_path.nil?
-          fail_job "Repository #{job.repository} not found under #{@repo_root}"
+          fail_job job, "Repository #{job.repository} not found under #{@repo_root}"
           next
         end
         ready &&= get_commit repo.server_path, job.script_version
-        ready &&= tag_commit job.script_version, job.uuid
+        ready &&= tag_commit job, job.script_version, job.uuid
       end
 
       # This should be unnecessary, because API server does it during
       # job create/update, but it's still not a bad idea to verify the
       # tag is correct before starting the job:
-      ready &&= tag_commit job.script_version, job.uuid
+      ready &&= tag_commit job, job.script_version, job.uuid
 
       # The arvados_sdk_version doesn't support use of arbitrary
       # remote URLs, so the requested version isn't necessarily copied
       # into the internal repository yet.
       if job.arvados_sdk_version
         ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version
-        ready &&= tag_commit job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
+        ready &&= tag_commit job, job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
       end
 
       if not ready
@@ -387,11 +407,19 @@ class CrunchDispatch
                    '--job', job.uuid,
                    '--git-dir', @arvados_internal]
 
+      if @cgroup_root
+        cmd_args += ['--cgroup-root', @cgroup_root]
+      end
+
       if @docker_bin
         cmd_args += ['--docker-bin', @docker_bin]
       end
 
-      if @todo_job_retries.include?(job.uuid)
+      if @docker_run_args
+        cmd_args += ['--docker-run-args', @docker_run_args]
+      end
+
+      if have_job_lock?(job)
         cmd_args << "--force-unlock"
       end
 
@@ -401,8 +429,11 @@ class CrunchDispatch
         i, o, e, t = Open3.popen3(*cmd_args)
       rescue
         $stderr.puts "dispatch: popen3: #{$!}"
-        sleep 1
-        next
+        # This is a dispatch problem like "Too many open files";
+        # retrying another job right away would be futile. Just return
+        # and hope things are better next time, after (at least) a
+        # did_recently() delay.
+        return
       end
 
       $stderr.puts "dispatch: job #{job.uuid}"
@@ -428,6 +459,8 @@ class CrunchDispatch
         log_throttle_bytes_so_far: 0,
         log_throttle_lines_so_far: 0,
         log_throttle_bytes_skipped: 0,
+        log_throttle_partial_line_last_at: Time.new(0),
+        log_throttle_first_partial_line: true,
       }
       i.close
       @todo_job_retries.delete(job.uuid)
@@ -442,9 +475,23 @@ class CrunchDispatch
     message = false
     linesize = line.size
     if running_job[:log_throttle_is_open]
-      running_job[:log_throttle_lines_so_far] += 1
-      running_job[:log_throttle_bytes_so_far] += linesize
-      running_job[:bytes_logged] += linesize
+      partial_line = false
+      skip_counts = false
+      matches = line.match(/^\S+ \S+ \d+ \d+ stderr (.*)/)
+      if matches and matches[1] and matches[1].start_with?('[...]') and matches[1].end_with?('[...]')
+        partial_line = true
+        if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.crunch_log_partial_line_throttle_period
+          running_job[:log_throttle_partial_line_last_at] = Time.now
+        else
+          skip_counts = true
+        end
+      end
+
+      if !skip_counts
+        running_job[:log_throttle_lines_so_far] += 1
+        running_job[:log_throttle_bytes_so_far] += linesize
+        running_job[:bytes_logged] += linesize
+      end
 
       if (running_job[:bytes_logged] >
           Rails.configuration.crunch_limit_log_bytes_per_job)
@@ -455,14 +502,18 @@ class CrunchDispatch
       elsif (running_job[:log_throttle_bytes_so_far] >
              Rails.configuration.crunch_log_throttle_bytes)
         remaining_time = running_job[:log_throttle_reset_time] - Time.now
-        message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds.\n"
+        message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds."
         running_job[:log_throttle_is_open] = false
 
       elsif (running_job[:log_throttle_lines_so_far] >
              Rails.configuration.crunch_log_throttle_lines)
         remaining_time = running_job[:log_throttle_reset_time] - Time.now
-        message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds.\n"
+        message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds."
         running_job[:log_throttle_is_open] = false
+
+      elsif partial_line and running_job[:log_throttle_first_partial_line]
+        running_job[:log_throttle_first_partial_line] = false
+        message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.crunch_log_partial_line_throttle_period} seconds."
       end
     end
 
@@ -474,8 +525,11 @@ class CrunchDispatch
     if message
       # Yes, write to logs, but use our "rate exceeded" message
       # instead of the log message that exceeded the limit.
+      message += " A complete log is still being written to Keep, and will be available when the job finishes.\n"
       line.replace message
       true
+    elsif partial_line
+      false
     else
       running_job[:log_throttle_is_open]
     end
@@ -483,8 +537,6 @@ class CrunchDispatch
 
   def read_pipes
     @running.each do |job_uuid, j|
-      job = j[:job]
-
       now = Time.now
       if now > j[:log_throttle_reset_time]
         # It has been more than throttle_period seconds since the last
@@ -500,6 +552,8 @@ class CrunchDispatch
         j[:log_throttle_lines_so_far] = 0
         j[:log_throttle_bytes_skipped] = 0
         j[:log_throttle_is_open] = true
+        j[:log_throttle_partial_line_last_at] = Time.new(0)
+        j[:log_throttle_first_partial_line] = true
       end
 
       j[:buf].each do |stream, streambuf|
@@ -582,31 +636,11 @@ class CrunchDispatch
     pid_done = nil
     j_done = nil
 
-    if false
-      begin
-        pid_done = waitpid(-1, Process::WNOHANG | Process::WUNTRACED)
-        if pid_done
-          j_done = @running.values.
-            select { |j| j[:wait_thr].pid == pid_done }.
-            first
-        end
-      rescue SystemCallError
-        # I have @running processes but system reports I have no
-        # children. This is likely to happen repeatedly if it happens at
-        # all; I will log this no more than once per child process I
-        # start.
-        if 0 < @running.select { |uuid,j| j[:warned_waitpid_error].nil? }.size
-          children = @running.values.collect { |j| j[:wait_thr].pid }.join ' '
-          $stderr.puts "dispatch: IPC bug: waitpid() error (#{$!}), but I have children #{children}"
-        end
-        @running.each do |uuid,j| j[:warned_waitpid_error] = true end
-      end
-    else
-      @running.each do |uuid, j|
-        if j[:wait_thr].status == false
-          pid_done = j[:wait_thr].pid
-          j_done = j
-        end
+    @running.each do |uuid, j|
+      if !j[:wait_thr].status
+        pid_done = j[:wait_thr].pid
+        j_done = j
+        break
       end
     end
 
@@ -636,19 +670,21 @@ class CrunchDispatch
 
     jobrecord = Job.find_by_uuid(job_done.uuid)
 
-    if exit_status == EXIT_RETRY_UNLOCKED
-      # The job failed because all of the nodes allocated to it
-      # failed.  Only this crunch-dispatch process can retry the job:
+    if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid)
+      $stderr.puts("dispatch: job #{jobrecord.uuid} was interrupted by node failure")
+      # Only this crunch-dispatch process can retry the job:
       # it's already locked, and there's no way to put it back in the
       # Queued state.  Put it in our internal todo list unless the job
       # has failed this way excessively.
       @job_retry_counts[jobrecord.uuid] += 1
       exit_tempfail = @job_retry_counts[jobrecord.uuid] <= RETRY_UNLOCKED_LIMIT
+      do_what_next = "give up now"
       if exit_tempfail
         @todo_job_retries[jobrecord.uuid] = jobrecord
-      else
-        $stderr.puts("dispatch: job #{jobrecord.uuid} exceeded node failure retry limit -- giving up")
+        do_what_next = "re-attempt"
       end
+      $stderr.puts("dispatch: job #{jobrecord.uuid} has been interrupted " +
+                   "#{@job_retry_counts[jobrecord.uuid]}x, will #{do_what_next}")
     end
 
     if !exit_tempfail
@@ -786,6 +822,9 @@ class CrunchDispatch
         unless (@todo_pipelines.empty? and @pipe_auth_tokens.empty?) or did_recently(:update_pipelines, 5.0)
           update_pipelines
         end
+        unless did_recently('check_orphaned_slurm_jobs', 60)
+          check_orphaned_slurm_jobs
+        end
       end
       reap_children
       select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten,
@@ -801,8 +840,82 @@ class CrunchDispatch
     end
   end
 
+  def fail_jobs before: nil
+    act_as_system_user do
+      threshold = nil
+      if before == 'reboot'
+        boottime = nil
+        open('/proc/stat').map(&:split).each do |stat, t|
+          if stat == 'btime'
+            boottime = t
+          end
+        end
+        if not boottime
+          raise "Could not find btime in /proc/stat"
+        end
+        threshold = Time.at(boottime.to_i)
+      elsif before
+        threshold = Time.parse(before, Time.now)
+      else
+        threshold = db_current_time
+      end
+      Rails.logger.info "fail_jobs: threshold is #{threshold}"
+
+      squeue = squeue_jobs
+      Job.where('state = ? and started_at < ?', Job::Running, threshold).
+        each do |job|
+        Rails.logger.debug "fail_jobs: #{job.uuid} started #{job.started_at}"
+        squeue.each do |slurm_name|
+          if slurm_name == job.uuid
+            Rails.logger.info "fail_jobs: scancel #{job.uuid}"
+            scancel slurm_name
+          end
+        end
+        fail_job(job, "cleaned up stale job: started before #{threshold}",
+                 skip_lock: true)
+      end
+    end
+  end
+
+  def check_orphaned_slurm_jobs
+    act_as_system_user do
+      squeue_uuids = squeue_jobs.select{|uuid| uuid.match(/^[0-9a-z]{5}-8i9sb-[0-9a-z]{15}$/)}.
+                                  select{|uuid| !@running.has_key?(uuid)}
+
+      return if squeue_uuids.size == 0
+
+      scancel_uuids = squeue_uuids - Job.where('uuid in (?) and (state in (?) or modified_at>?)',
+                                               squeue_uuids,
+                                               ['Running', 'Queued'],
+                                               (Time.now - 60)).
+                                         collect(&:uuid)
+      scancel_uuids.each do |uuid|
+        Rails.logger.info "orphaned job: scancel #{uuid}"
+        scancel uuid
+      end
+    end
+  end
+
+  def sudo_preface
+    return [] if not Server::Application.config.crunch_job_user
+    ["sudo", "-E", "-u",
+     Server::Application.config.crunch_job_user,
+     "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
+     "PATH=#{ENV['PATH']}",
+     "PERLLIB=#{ENV['PERLLIB']}",
+     "PYTHONPATH=#{ENV['PYTHONPATH']}",
+     "RUBYLIB=#{ENV['RUBYLIB']}",
+     "GEM_PATH=#{ENV['GEM_PATH']}"]
+  end
+
   protected
 
+  def have_job_lock?(job)
+    # Return true if the given job is locked by this crunch-dispatch, normally
+    # because we've run crunch-job for it.
+    @todo_job_retries.include?(job.uuid)
+  end
+
   def did_recently(thing, min_interval)
     if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
       @did_recently[thing] = Time.now
@@ -836,4 +949,28 @@ class CrunchDispatch
       running_job[:stderr_flushed_at] = Time.now
     end
   end
+
+  # An array of job_uuids in squeue
+  def squeue_jobs
+    if Rails.configuration.crunch_job_wrapper == :slurm_immediate
+      p = IO.popen(['squeue', '-a', '-h', '-o', '%j'])
+      begin
+        p.readlines.map {|line| line.strip}
+      ensure
+        p.close
+      end
+    else
+      []
+    end
+  end
+
+  def scancel slurm_name
+    cmd = sudo_preface + ['scancel', '-n', slurm_name]
+    IO.popen(cmd) do |scancel_pipe|
+      puts scancel_pipe.read
+    end
+    if not $?.success?
+      Rails.logger.error "scancel #{slurm_name.shellescape}: $?"
+    end
+  end
 end