Merge branch 'master' into 7330-improved-sso-package
[arvados.git] / services / api / script / crunch-dispatch.rb
index 6faf931b28d0fed1587355a4125945befd3d37d6..4a1fdbce758d7b552f529419f7c37f970299d298 100755 (executable)
@@ -1,5 +1,9 @@
 #!/usr/bin/env ruby
 
+# We want files written by crunch-dispatch to be writable by other processes
+# with the same GID, see bug #7228
+File.umask(0002)
+
 require 'shellwords'
 include Process
 
@@ -52,7 +56,10 @@ end
 
 class Dispatcher
   include ApplicationHelper
-  include DbCurrentTime
+
+  EXIT_TEMPFAIL = 75
+  EXIT_RETRY_UNLOCKED = 93
+  RETRY_UNLOCKED_LIMIT = 3
 
   def initialize
     @crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
@@ -60,6 +67,8 @@ class Dispatcher
       raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
     end
 
+    @docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN']
+
     @arvados_internal = Rails.configuration.git_internal_dir
     if not File.exists? @arvados_internal
       $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
@@ -67,6 +76,7 @@ class Dispatcher
     end
 
     @repo_root = Rails.configuration.git_repositories_dir
+    @arvados_repo_path = Repository.where(name: "arvados").first.server_path
     @authorizations = {}
     @did_recently = {}
     @fetched_commits = {}
@@ -75,6 +85,8 @@ class Dispatcher
     @pipe_auth_tokens = {}
     @running = {}
     @todo = []
+    @todo_job_retries = {}
+    @job_retry_counts = Hash.new(0)
     @todo_pipelines = []
   end
 
@@ -84,7 +96,7 @@ class Dispatcher
 
   def refresh_todo
     if $options[:jobs]
-      @todo = Job.queue.select(&:repository)
+      @todo = @todo_job_retries.values + Job.queue.select(&:repository)
     end
     if $options[:pipelines]
       @todo_pipelines = PipelineInstance.queue
@@ -217,7 +229,7 @@ class Dispatcher
     nodelist = nodes_available_for_job_now(job)
     if nodelist.nil? and not did_recently(:wait_for_available_nodes, 3600)
       $stderr.puts "dispatch: waiting for nodes for #{job.uuid}"
-      @node_wait_deadline = db_current_time + 5.minutes
+      @node_wait_deadline = Time.now + 5.minutes
     end
     nodelist
   end
@@ -261,7 +273,7 @@ class Dispatcher
       # We already made a token for this job, but we need a new one
       # because modified_by_user_uuid has changed (the job will run
       # as a different user).
-      @authorizations[job.uuid].update_attributes expires_at: db_current_time
+      @authorizations[job.uuid].update_attributes expires_at: Time.now
       @authorizations[job.uuid] = nil
     end
     if not @authorizations[job.uuid]
@@ -277,35 +289,24 @@ class Dispatcher
     @authorizations[job.uuid]
   end
 
-  def get_commit(repo_name, commit_hash)
-    # @fetched_commits[V]==true if we know commit V exists in the
-    # arvados_internal git repository.
-    if !@fetched_commits[commit_hash]
-      src_repo = File.join(@repo_root, "#{repo_name}.git")
-      if not File.exists? src_repo
-        src_repo = File.join(@repo_root, repo_name, '.git')
-        if not File.exists? src_repo
-          fail_job job, "No #{repo_name}.git or #{repo_name}/.git at #{@repo_root}"
-          return nil
-        end
-      end
-
-      # check if the commit needs to be fetched or not
-      commit_rev = stdout_s(git_cmd("rev-list", "-n1", commit_hash),
-                            err: "/dev/null")
-      unless $? == 0 and commit_rev == commit_hash
-        # commit does not exist in internal repository, so import the source repository using git fetch-pack
-        cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo)
-        $stderr.puts "dispatch: #{cmd}"
-        $stderr.puts(stdout_s(cmd))
-        unless $? == 0
-          fail_job job, "git fetch-pack failed"
-          return nil
-        end
-      end
-      @fetched_commits[commit_hash] = true
+  def internal_repo_has_commit? sha1
+    if (not @fetched_commits[sha1] and
+        sha1 == stdout_s(git_cmd("rev-list", "-n1", sha1), err: "/dev/null") and
+        $? == 0)
+      @fetched_commits[sha1] = true
     end
-    @fetched_commits[commit_hash]
+    return @fetched_commits[sha1]
+  end
+
+  def get_commit src_repo, sha1
+    return true if internal_repo_has_commit? sha1
+
+    # commit does not exist in internal repository, so import the
+    # source repository using git fetch-pack
+    cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo)
+    $stderr.puts "dispatch: #{cmd}"
+    $stderr.puts(stdout_s(cmd))
+    @fetched_commits[sha1] = ($? == 0)
   end
 
   def tag_commit(commit_hash, tag_name)
@@ -357,7 +358,7 @@ class Dispatcher
       when :slurm_immediate
         nodelist = nodes_available_for_job(job)
         if nodelist.nil?
-          if db_current_time < @node_wait_deadline
+          if Time.now < @node_wait_deadline
             break
           else
             next
@@ -377,6 +378,7 @@ class Dispatcher
       if Server::Application.config.crunch_job_user
         cmd_args.unshift("sudo", "-E", "-u",
                          Server::Application.config.crunch_job_user,
+                         "LD_LIBRARY_PATH=#{ENV['LD_LIBRARY_PATH']}",
                          "PATH=#{ENV['PATH']}",
                          "PERLLIB=#{ENV['PERLLIB']}",
                          "PYTHONPATH=#{ENV['PYTHONPATH']}",
@@ -384,20 +386,56 @@ class Dispatcher
                          "GEM_PATH=#{ENV['GEM_PATH']}")
       end
 
-      ready = (get_authorization(job) and
-               get_commit(job.repository, job.script_version) and
-               tag_commit(job.script_version, job.uuid))
-      if ready and job.arvados_sdk_version
-        ready = (get_commit("arvados", job.arvados_sdk_version) and
-                 tag_commit(job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"))
+      next unless get_authorization job
+
+      ready = internal_repo_has_commit? job.script_version
+
+      if not ready
+        # Import the commit from the specified repository into the
+        # internal repository. This should have been done already when
+        # the job was created/updated; this code is obsolete except to
+        # avoid deployment races. Failing the job would be a
+        # reasonable thing to do at this point.
+        repo = Repository.where(name: job.repository).first
+        if repo.nil? or repo.server_path.nil?
+          fail_job "Repository #{job.repository} not found under #{@repo_root}"
+          next
+        end
+        ready &&= get_commit repo.server_path, job.script_version
+        ready &&= tag_commit job.script_version, job.uuid
+      end
+
+      # This should be unnecessary, because API server does it during
+      # job create/update, but it's still not a bad idea to verify the
+      # tag is correct before starting the job:
+      ready &&= tag_commit job.script_version, job.uuid
+
+      # The arvados_sdk_version doesn't support use of arbitrary
+      # remote URLs, so the requested version isn't necessarily copied
+      # into the internal repository yet.
+      if job.arvados_sdk_version
+        ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version
+        ready &&= tag_commit job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
+      end
+
+      if not ready
+        fail_job job, "commit not present in internal repository"
+        next
       end
-      next unless ready
 
       cmd_args += [@crunch_job_bin,
                    '--job-api-token', @authorizations[job.uuid].api_token,
                    '--job', job.uuid,
                    '--git-dir', @arvados_internal]
 
+      if @docker_bin
+        cmd_args += ['--docker-bin', @docker_bin]
+      end
+
+      if @todo_job_retries.include?(job.uuid)
+        cmd_args << "--force-unlock"
+      end
+
       $stderr.puts "dispatch: #{cmd_args.join ' '}"
 
       begin
@@ -427,12 +465,13 @@ class Dispatcher
         bytes_logged: 0,
         events_logged: 0,
         log_throttle_is_open: true,
-        log_throttle_reset_time: db_current_time + Rails.configuration.crunch_log_throttle_period,
+        log_throttle_reset_time: Time.now + Rails.configuration.crunch_log_throttle_period,
         log_throttle_bytes_so_far: 0,
         log_throttle_lines_so_far: 0,
         log_throttle_bytes_skipped: 0,
       }
       i.close
+      @todo_job_retries.delete(job.uuid)
       update_node_status
     end
   end
@@ -451,18 +490,18 @@ class Dispatcher
       if (running_job[:bytes_logged] >
           Rails.configuration.crunch_limit_log_bytes_per_job)
         message = "Exceeded log limit #{Rails.configuration.crunch_limit_log_bytes_per_job} bytes (crunch_limit_log_bytes_per_job). Log will be truncated."
-        running_job[:log_throttle_reset_time] = db_current_time + 100.years
+        running_job[:log_throttle_reset_time] = Time.now + 100.years
         running_job[:log_throttle_is_open] = false
 
       elsif (running_job[:log_throttle_bytes_so_far] >
              Rails.configuration.crunch_log_throttle_bytes)
-        remaining_time = running_job[:log_throttle_reset_time] - db_current_time
+        remaining_time = running_job[:log_throttle_reset_time] - Time.now
         message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds.\n"
         running_job[:log_throttle_is_open] = false
 
       elsif (running_job[:log_throttle_lines_so_far] >
              Rails.configuration.crunch_log_throttle_lines)
-        remaining_time = running_job[:log_throttle_reset_time] - db_current_time
+        remaining_time = running_job[:log_throttle_reset_time] - Time.now
         message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds.\n"
         running_job[:log_throttle_is_open] = false
       end
@@ -487,7 +526,7 @@ class Dispatcher
     @running.each do |job_uuid, j|
       job = j[:job]
 
-      now = db_current_time
+      now = Time.now
       if now > j[:log_throttle_reset_time]
         # It has been more than throttle_period seconds since the last
         # checkpoint so reset the throttle
@@ -615,8 +654,6 @@ class Dispatcher
     return if !pid_done
 
     job_done = j_done[:job]
-    $stderr.puts "dispatch: child #{pid_done} exit"
-    $stderr.puts "dispatch: job #{job_done.uuid} end"
 
     # Ensure every last drop of stdout and stderr is consumed.
     read_pipes
@@ -633,23 +670,49 @@ class Dispatcher
 
     # Wait the thread (returns a Process::Status)
     exit_status = j_done[:wait_thr].value.exitstatus
+    exit_tempfail = exit_status == EXIT_TEMPFAIL
+
+    $stderr.puts "dispatch: child #{pid_done} exit #{exit_status}"
+    $stderr.puts "dispatch: job #{job_done.uuid} end"
 
     jobrecord = Job.find_by_uuid(job_done.uuid)
-    if exit_status != 75 and jobrecord.state == "Running"
-      # crunch-job did not return exit code 75 (see below) and left the job in
-      # the "Running" state, which means there was an unhandled error.  Fail
-      # the job.
-      jobrecord.state = "Failed"
-      if not jobrecord.save
-        $stderr.puts "dispatch: jobrecord.save failed"
+
+    if exit_status == EXIT_RETRY_UNLOCKED
+      # The job failed because all of the nodes allocated to it
+      # failed.  Only this crunch-dispatch process can retry the job:
+      # it's already locked, and there's no way to put it back in the
+      # Queued state.  Put it in our internal todo list unless the job
+      # has failed this way excessively.
+      @job_retry_counts[jobrecord.uuid] += 1
+      exit_tempfail = @job_retry_counts[jobrecord.uuid] <= RETRY_UNLOCKED_LIMIT
+      if exit_tempfail
+        @todo_job_retries[jobrecord.uuid] = jobrecord
+      else
+        $stderr.puts("dispatch: job #{jobrecord.uuid} exceeded node failure retry limit -- giving up")
+      end
+    end
+
+    if !exit_tempfail
+      @job_retry_counts.delete(jobrecord.uuid)
+      if jobrecord.state == "Running"
+        # Apparently there was an unhandled error.  That could potentially
+        # include "all allocated nodes failed" when we don't to retry
+        # because the job has already been retried RETRY_UNLOCKED_LIMIT
+        # times.  Fail the job.
+        jobrecord.state = "Failed"
+        if not jobrecord.save
+          $stderr.puts "dispatch: jobrecord.save failed"
+        end
       end
     else
-      # Don't fail the job if crunch-job didn't even get as far as
-      # starting it. If the job failed to run due to an infrastructure
+      # If the job failed to run due to an infrastructure
       # issue with crunch-job or slurm, we want the job to stay in the
       # queue. If crunch-job exited after losing a race to another
       # crunch-job process, it exits 75 and we should leave the job
-      # record alone so the winner of the race do its thing.
+      # record alone so the winner of the race can do its thing.
+      # If crunch-job exited after all of its allocated nodes failed,
+      # it exits 93, and we want to retry it later (see the
+      # EXIT_RETRY_UNLOCKED `if` block).
       #
       # There is still an unhandled race condition: If our crunch-job
       # process is about to lose a race with another crunch-job
@@ -663,8 +726,8 @@ class Dispatcher
 
     # Invalidate the per-job auth token, unless the job is still queued and we
     # might want to try it again.
-    if jobrecord.state != "Queued"
-      j_done[:job_auth].update_attributes expires_at: db_current_time
+    if jobrecord.state != "Queued" and !@todo_job_retries.include?(jobrecord.uuid)
+      j_done[:job_auth].update_attributes expires_at: Time.now
     end
 
     @running.delete job_done.uuid
@@ -681,13 +744,14 @@ class Dispatcher
     end
 
     expire_tokens.each do |k, v|
-      v.update_attributes expires_at: db_current_time
+      v.update_attributes expires_at: Time.now
       @pipe_auth_tokens.delete k
     end
   end
 
   def run
     act_as_system_user
+    User.first.group_permissions
     $stderr.puts "dispatch: ready"
     while !$signal[:term] or @running.size > 0
       read_pipes
@@ -717,14 +781,21 @@ class Dispatcher
       select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten,
              [], [], 1)
     end
+    # If there are jobs we wanted to retry, we have to mark them as failed now.
+    # Other dispatchers can't pick them up because we hold their lock.
+    @todo_job_retries.each_key do |job_uuid|
+      job = Job.find_by_uuid(job_uuid)
+      if job.state == "Running"
+        fail_job(job, "crunch-dispatch was stopped during job's tempfail retry loop")
+      end
+    end
   end
 
   protected
 
   def did_recently(thing, min_interval)
-    current_time = db_current_time
-    if !@did_recently[thing] or @did_recently[thing] < current_time - min_interval
-      @did_recently[thing] = current_time
+    if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
+      @did_recently[thing] = Time.now
       false
     else
       true
@@ -739,7 +810,7 @@ class Dispatcher
     # it has been at least crunch_log_seconds_between_events seconds since
     # the last flush.
     if running_job[:stderr_buf_to_flush].size > Rails.configuration.crunch_log_bytes_per_event or
-        (db_current_time - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events
+        (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events
       begin
         log = Log.new(object_uuid: running_job[:job].uuid,
                       event_type: 'stderr',
@@ -752,7 +823,7 @@ class Dispatcher
         $stderr.puts exception.backtrace
       end
       running_job[:stderr_buf_to_flush] = ''
-      running_job[:stderr_flushed_at] = db_current_time
+      running_job[:stderr_flushed_at] = Time.now
     end
   end
 end
@@ -760,4 +831,10 @@ end
 # This is how crunch-job child procs know where the "refresh" trigger file is
 ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.crunch_refresh_trigger
 
+# If salloc can't allocate resources immediately, make it use our temporary
+# failure exit code.  This ensures crunch-dispatch won't mark a job failed
+# because of an issue with node allocation.  This often happens when
+# another dispatcher wins the race to allocate nodes.
+ENV["SLURM_EXIT_IMMEDIATE"] = Dispatcher::EXIT_TEMPFAIL.to_s
+
 Dispatcher.new.run