Merge branch 'master' into 3338-tab-counts
[arvados.git] / services / api / script / crunch-dispatch.rb
index 154fcf31455ac821b76287029114441e2c3849aa..d316d92d605ce7c94e5c0eb8c0697d3f0f9c1ab2 100755 (executable)
@@ -2,6 +2,23 @@
 
 include Process
 
+$options = {}
+(ARGV.any? ? ARGV : ['--jobs', '--pipelines']).each do |arg|
+  case arg
+  when '--jobs'
+    $options[:jobs] = true
+  when '--pipelines'
+    $options[:pipelines] = true
+  else
+    abort "Unrecognized command line option '#{arg}'"
+  end
+end
+if not ($options[:jobs] or $options[:pipelines])
+  abort "Nothing to do. Please specify at least one of: --jobs, --pipelines."
+end
+
+ARGV.reject! { |a| a =~ /--jobs|--pipelines/ }
+
 $warned = {}
 $signal = {}
 %w{TERM INT}.each do |sig|
@@ -34,8 +51,14 @@ class Dispatcher
   end
 
   def refresh_todo
-    @todo = Job.queue.select do |j| j.repository end
-    @todo_pipelines = PipelineInstance.queue
+    @todo = []
+    if $options[:jobs]
+      @todo = Job.queue.select(&:repository)
+    end
+    @todo_pipelines = []
+    if $options[:pipelines]
+      @todo_pipelines = PipelineInstance.queue
+    end
   end
 
   def sinfo
@@ -68,25 +91,28 @@ class Dispatcher
       begin
         sinfo.split("\n").
           each do |line|
-          re = line.match /(\S+?):+(idle|alloc|down)/
+          re = line.match /(\S+?):+(idle|alloc|down)?/
           next if !re
 
+          _, node_name, node_state = *re
+          node_state = 'down' unless %w(idle alloc down).include? node_state
+
           # sinfo tells us about a node N times if it is shared by N partitions
-          next if node_seen[re[1]]
-          node_seen[re[1]] = true
+          next if node_seen[node_name]
+          node_seen[node_name] = true
 
           # update our database (and cache) when a node's state changes
-          if @node_state[re[1]] != re[2]
-            @node_state[re[1]] = re[2]
-            node = Node.where('hostname=?', re[1]).order(:last_ping_at).last
+          if @node_state[node_name] != node_state
+            @node_state[node_name] = node_state
+            node = Node.where('hostname=?', node_name).order(:last_ping_at).last
             if node
-              $stderr.puts "dispatch: update #{re[1]} state to #{re[2]}"
-              node.info['slurm_state'] = re[2]
+              $stderr.puts "dispatch: update #{node_name} state to #{node_state}"
+              node.info['slurm_state'] = node_state
               if not node.save
                 $stderr.puts "dispatch: failed to update #{node.uuid}: #{node.errors.messages}"
               end
-            elsif re[2] != 'down'
-              $stderr.puts "dispatch: sinfo reports '#{re[1]}' is not down, but no node has that name"
+            elsif node_state != 'down'
+              $stderr.puts "dispatch: sinfo reports '#{node_name}' is not down, but no node has that name"
             end
           end
         end
@@ -316,12 +342,16 @@ class Dispatcher
             $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
             $stderr.puts line
             pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
-            j[:stderr_buf_to_flush] << pub_msg
+            if not j[:log_truncated]
+              j[:stderr_buf_to_flush] << pub_msg
+            end
           end
 
-          if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
-              (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
-            write_log j
+          if not j[:log_truncated]
+            if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
+                (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
+              write_log j
+            end
           end
         end
       end
@@ -486,36 +516,32 @@ class Dispatcher
 
   # send message to log table. we want these records to be transient
   def write_log running_job
+    return if running_job[:log_truncated]
+    return if running_job[:stderr_buf_to_flush] == ''
     begin
-      if (running_job && running_job[:stderr_buf_to_flush] != '')
-        # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
-        # or crunch_limit_log_events_per_job.
-        if (too_many_bytes_logged_for_job(running_job))
-          return if running_job[:log_truncated]
-          running_job[:log_truncated] = true
-          running_job[:stderr_buf_to_flush] =
-              "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
-        elsif (too_many_events_logged_for_job(running_job))
-          return if running_job[:log_truncated]
-          running_job[:log_truncated] = true
-          running_job[:stderr_buf_to_flush] =
-              "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
-        end
-        log = Log.new(object_uuid: running_job[:job].uuid,
-                      event_type: 'stderr',
-                      owner_uuid: running_job[:job].owner_uuid,
-                      properties: {"text" => running_job[:stderr_buf_to_flush]})
-        log.save!
-        running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
-        running_job[:events_logged] += 1
-        running_job[:stderr_buf_to_flush] = ''
-        running_job[:stderr_flushed_at] = Time.now.to_i
+      # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
+      # or crunch_limit_log_events_per_job.
+      if (too_many_bytes_logged_for_job(running_job))
+        running_job[:log_truncated] = true
+        running_job[:stderr_buf_to_flush] =
+          "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
+      elsif (too_many_events_logged_for_job(running_job))
+        running_job[:log_truncated] = true
+        running_job[:stderr_buf_to_flush] =
+          "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
       end
+      log = Log.new(object_uuid: running_job[:job].uuid,
+                    event_type: 'stderr',
+                    owner_uuid: running_job[:job].owner_uuid,
+                    properties: {"text" => running_job[:stderr_buf_to_flush]})
+      log.save!
+      running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
+      running_job[:events_logged] += 1
     rescue
-      running_job[:stderr_buf] = "Failed to write logs \n"
-      running_job[:stderr_buf_to_flush] = ''
-      running_job[:stderr_flushed_at] = Time.now.to_i
+      running_job[:stderr_buf] = "Failed to write logs\n" + running_job[:stderr_buf]
     end
+    running_job[:stderr_buf_to_flush] = ''
+    running_job[:stderr_flushed_at] = Time.now.to_i
   end
 
 end