13996: More config updates
[arvados.git] / services / api / lib / crunch_dispatch.rb
index 3ef1803031e8ecfc9ad2da6202f6bd3547f5dd08..eceada5a7a17a7d92c82e322038a81c15ea02d00 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 require 'open3'
 require 'shellwords'
 
@@ -25,14 +29,15 @@ class CrunchDispatch
     @docker_bin = ENV['CRUNCH_JOB_DOCKER_BIN']
     @docker_run_args = ENV['CRUNCH_JOB_DOCKER_RUN_ARGS']
     @cgroup_root = ENV['CRUNCH_CGROUP_ROOT']
+    @srun_sync_timeout = ENV['CRUNCH_SRUN_SYNC_TIMEOUT']
 
-    @arvados_internal = Rails.configuration.git_internal_dir
+    @arvados_internal = Rails.configuration.Containers["JobsAPI"]["GitInternalDir"]
     if not File.exist? @arvados_internal
       $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
       raise "No internal git repository available" unless ($? == 0)
     end
 
-    @repo_root = Rails.configuration.git_repositories_dir
+    @repo_root = Rails.configuration.Git["Repositories"]
     @arvados_repo_path = Repository.where(name: "arvados").first.server_path
     @authorizations = {}
     @did_recently = {}
@@ -293,7 +298,7 @@ class CrunchDispatch
     @fetched_commits[sha1] = ($? == 0)
   end
 
-  def tag_commit(commit_hash, tag_name)
+  def tag_commit(job, commit_hash, tag_name)
     # @git_tags[T]==V if we know commit V has been tagged T in the
     # arvados_internal repository.
     if not @git_tags[tag_name]
@@ -377,20 +382,20 @@ class CrunchDispatch
           next
         end
         ready &&= get_commit repo.server_path, job.script_version
-        ready &&= tag_commit job.script_version, job.uuid
+        ready &&= tag_commit job, job.script_version, job.uuid
       end
 
       # This should be unnecessary, because API server does it during
       # job create/update, but it's still not a bad idea to verify the
       # tag is correct before starting the job:
-      ready &&= tag_commit job.script_version, job.uuid
+      ready &&= tag_commit job, job.script_version, job.uuid
 
       # The arvados_sdk_version doesn't support use of arbitrary
       # remote URLs, so the requested version isn't necessarily copied
       # into the internal repository yet.
       if job.arvados_sdk_version
         ready &&= get_commit @arvados_repo_path, job.arvados_sdk_version
-        ready &&= tag_commit job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
+        ready &&= tag_commit job, job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"
       end
 
       if not ready
@@ -415,6 +420,10 @@ class CrunchDispatch
         cmd_args += ['--docker-run-args', @docker_run_args]
       end
 
+      if @srun_sync_timeout
+        cmd_args += ['--srun-sync-timeout', @srun_sync_timeout]
+      end
+
       if have_job_lock?(job)
         cmd_args << "--force-unlock"
       end
@@ -425,8 +434,11 @@ class CrunchDispatch
         i, o, e, t = Open3.popen3(*cmd_args)
       rescue
         $stderr.puts "dispatch: popen3: #{$!}"
-        sleep 1
-        next
+        # This is a dispatch problem like "Too many open files";
+        # retrying another job right away would be futile. Just return
+        # and hope things are better next time, after (at least) a
+        # did_recently() delay.
+        return
       end
 
       $stderr.puts "dispatch: job #{job.uuid}"
@@ -448,7 +460,7 @@ class CrunchDispatch
         bytes_logged: 0,
         events_logged: 0,
         log_throttle_is_open: true,
-        log_throttle_reset_time: Time.now + Rails.configuration.crunch_log_throttle_period,
+        log_throttle_reset_time: Time.now + Rails.configuration.Containers["Logging"]["LogThrottlePeriod"],
         log_throttle_bytes_so_far: 0,
         log_throttle_lines_so_far: 0,
         log_throttle_bytes_skipped: 0,
@@ -473,7 +485,7 @@ class CrunchDispatch
       matches = line.match(/^\S+ \S+ \d+ \d+ stderr (.*)/)
       if matches and matches[1] and matches[1].start_with?('[...]') and matches[1].end_with?('[...]')
         partial_line = true
-        if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.crunch_log_partial_line_throttle_period
+        if Time.now > running_job[:log_throttle_partial_line_last_at] + Rails.configuration.Containers["Logging"]["LogPartialLineThrottlePeriod"]
           running_job[:log_throttle_partial_line_last_at] = Time.now
         else
           skip_counts = true
@@ -487,26 +499,26 @@ class CrunchDispatch
       end
 
       if (running_job[:bytes_logged] >
-          Rails.configuration.crunch_limit_log_bytes_per_job)
-        message = "Exceeded log limit #{Rails.configuration.crunch_limit_log_bytes_per_job} bytes (crunch_limit_log_bytes_per_job). Log will be truncated."
+          Rails.configuration.Containers["Logging"]["LimitLogBytesPerJob"])
+        message = "Exceeded log limit #{Rails.configuration.Containers["Logging"]["LimitLogBytesPerJob"]} bytes (LimitLogBytesPerJob). Log will be truncated."
         running_job[:log_throttle_reset_time] = Time.now + 100.years
         running_job[:log_throttle_is_open] = false
 
       elsif (running_job[:log_throttle_bytes_so_far] >
-             Rails.configuration.crunch_log_throttle_bytes)
+             Rails.configuration.Containers["Logging"]["LogThrottleBytes"])
         remaining_time = running_job[:log_throttle_reset_time] - Time.now
-        message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds."
+        message = "Exceeded rate #{Rails.configuration.Containers["Logging"]["LogThrottleBytes"]} bytes per #{Rails.configuration.Containers["Logging"]["LogThrottlePeriod"]} seconds (LogThrottleBytes). Logging will be silenced for the next #{remaining_time.round} seconds."
         running_job[:log_throttle_is_open] = false
 
       elsif (running_job[:log_throttle_lines_so_far] >
-             Rails.configuration.crunch_log_throttle_lines)
+             Rails.configuration.Containers["Logging"]["LogThrottleLines"])
         remaining_time = running_job[:log_throttle_reset_time] - Time.now
-        message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds."
+        message = "Exceeded rate #{Rails.configuration.Containers["Logging"]["LogThrottleLines"]} lines per #{Rails.configuration.Containers["Logging"]["LogThrottlePeriod"]} seconds (LogThrottleLines), logging will be silenced for the next #{remaining_time.round} seconds."
         running_job[:log_throttle_is_open] = false
 
       elsif partial_line and running_job[:log_throttle_first_partial_line]
         running_job[:log_throttle_first_partial_line] = false
-        message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.crunch_log_partial_line_throttle_period} seconds."
+        message = "Rate-limiting partial segments of long lines to one every #{Rails.configuration.Containers["Logging"]["LogPartialLineThrottlePeriod"]} seconds."
       end
     end
 
@@ -540,7 +552,7 @@ class CrunchDispatch
           j[:stderr_buf_to_flush] << "#{LogTime.now} #{message}\n"
         end
 
-        j[:log_throttle_reset_time] = now + Rails.configuration.crunch_log_throttle_period
+        j[:log_throttle_reset_time] = now + Rails.configuration.Containers["Logging"]["LogThrottlePeriod"]
         j[:log_throttle_bytes_so_far] = 0
         j[:log_throttle_lines_so_far] = 0
         j[:log_throttle_bytes_skipped] = 0
@@ -580,7 +592,7 @@ class CrunchDispatch
         bufend = ''
         streambuf.each_line do |line|
           if not line.end_with? $/
-            if line.size > Rails.configuration.crunch_log_throttle_bytes
+            if line.size > Rails.configuration.Containers["Logging"]["LogThrottleBytes"]
               # Without a limit here, we'll use 2x an arbitrary amount
               # of memory, and waste a lot of time copying strings
               # around, all without providing any feedback to anyone
@@ -629,31 +641,11 @@ class CrunchDispatch
     pid_done = nil
     j_done = nil
 
-    if false
-      begin
-        pid_done = waitpid(-1, Process::WNOHANG | Process::WUNTRACED)
-        if pid_done
-          j_done = @running.values.
-            select { |j| j[:wait_thr].pid == pid_done }.
-            first
-        end
-      rescue SystemCallError
-        # I have @running processes but system reports I have no
-        # children. This is likely to happen repeatedly if it happens at
-        # all; I will log this no more than once per child process I
-        # start.
-        if 0 < @running.select { |uuid,j| j[:warned_waitpid_error].nil? }.size
-          children = @running.values.collect { |j| j[:wait_thr].pid }.join ' '
-          $stderr.puts "dispatch: IPC bug: waitpid() error (#{$!}), but I have children #{children}"
-        end
-        @running.each do |uuid,j| j[:warned_waitpid_error] = true end
-      end
-    else
-      @running.each do |uuid, j|
-        if j[:wait_thr].status == false
-          pid_done = j[:wait_thr].pid
-          j_done = j
-        end
+    @running.each do |uuid, j|
+      if !j[:wait_thr].status
+        pid_done = j[:wait_thr].pid
+        j_done = j
+        break
       end
     end
 
@@ -783,7 +775,7 @@ class CrunchDispatch
 
     # This is how crunch-job child procs know where the "refresh"
     # trigger file is
-    ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.crunch_refresh_trigger
+    ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.Containers["JobsAPI"]["CrunchRefreshTrigger"]
 
     # If salloc can't allocate resources immediately, make it use our
     # temporary failure exit code.  This ensures crunch-dispatch won't
@@ -945,8 +937,8 @@ class CrunchDispatch
     # Send out to log event if buffer size exceeds the bytes per event or if
     # it has been at least crunch_log_seconds_between_events seconds since
     # the last flush.
-    if running_job[:stderr_buf_to_flush].size > Rails.configuration.crunch_log_bytes_per_event or
-        (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events
+    if running_job[:stderr_buf_to_flush].size > Rails.configuration.Containers["Logging"]["LogBytesPerEvent"] or
+        (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.Containers["Logging"]["LogSecondsBetweenEvents"]
       begin
         log = Log.new(object_uuid: running_job[:job].uuid,
                       event_type: 'stderr',
@@ -965,7 +957,7 @@ class CrunchDispatch
 
   # An array of job_uuids in squeue
   def squeue_jobs
-    if Rails.configuration.crunch_job_wrapper == :slurm_immediate
+    if Rails.configuration.Containers["JobsAPI"]["CrunchJobWrapper"].to_sym == :slurm_immediate
       p = IO.popen(['squeue', '-a', '-h', '-o', '%j'])
       begin
         p.readlines.map {|line| line.strip}