X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a6010696b246cc43c721a35d56179c629d54e798..4c8ec1b2166a81b05b2b6cf5a6fae5b839876802:/services/api/script/crunch-dispatch.rb diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb index 58e6645763..d316d92d60 100755 --- a/services/api/script/crunch-dispatch.rb +++ b/services/api/script/crunch-dispatch.rb @@ -2,6 +2,23 @@ include Process +$options = {} +(ARGV.any? ? ARGV : ['--jobs', '--pipelines']).each do |arg| + case arg + when '--jobs' + $options[:jobs] = true + when '--pipelines' + $options[:pipelines] = true + else + abort "Unrecognized command line option '#{arg}'" + end +end +if not ($options[:jobs] or $options[:pipelines]) + abort "Nothing to do. Please specify at least one of: --jobs, --pipelines." +end + +ARGV.reject! { |a| a =~ /--jobs|--pipelines/ } + $warned = {} $signal = {} %w{TERM INT}.each do |sig| @@ -34,8 +51,14 @@ class Dispatcher end def refresh_todo - @todo = Job.queue.select do |j| j.repository end - @todo_pipelines = PipelineInstance.queue + @todo = [] + if $options[:jobs] + @todo = Job.queue.select(&:repository) + end + @todo_pipelines = [] + if $options[:pipelines] + @todo_pipelines = PipelineInstance.queue + end end def sinfo @@ -68,25 +91,28 @@ class Dispatcher begin sinfo.split("\n"). each do |line| - re = line.match /(\S+?):+(idle|alloc|down)/ + re = line.match /(\S+?):+(idle|alloc|down)?/ next if !re + _, node_name, node_state = *re + node_state = 'down' unless %w(idle alloc down).include? node_state + # sinfo tells us about a node N times if it is shared by N partitions - next if node_seen[re[1]] - node_seen[re[1]] = true + next if node_seen[node_name] + node_seen[node_name] = true # update our database (and cache) when a node's state changes - if @node_state[re[1]] != re[2] - @node_state[re[1]] = re[2] - node = Node.where('hostname=?', re[1]).order(:last_ping_at).last + if @node_state[node_name] != node_state + @node_state[node_name] = node_state + node = Node.where('hostname=?', node_name).order(:last_ping_at).last if node - $stderr.puts "dispatch: update #{re[1]} state to #{re[2]}" - node.info['slurm_state'] = re[2] + $stderr.puts "dispatch: update #{node_name} state to #{node_state}" + node.info['slurm_state'] = node_state if not node.save $stderr.puts "dispatch: failed to update #{node.uuid}: #{node.errors.messages}" end - elsif re[2] != 'down' - $stderr.puts "dispatch: sinfo reports '#{re[1]}' is not down, but no node has that name" + elsif node_state != 'down' + $stderr.puts "dispatch: sinfo reports '#{node_name}' is not down, but no node has that name" end end end @@ -316,12 +342,16 @@ class Dispatcher $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) $stderr.puts line pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n" - j[:stderr_buf_to_flush] << pub_msg + if not j[:log_truncated] + j[:stderr_buf_to_flush] << pub_msg + end end - if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or - (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i)) - write_log j + if not j[:log_truncated] + if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or + (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i)) + write_log j + end end end end @@ -379,7 +409,7 @@ class Dispatcher exit_status = j_done[:wait_thr].value jobrecord = Job.find_by_uuid(job_done.uuid) - if exit_status.to_i != 111 and jobrecord.started_at + if exit_status.to_i != 75 and jobrecord.started_at # Clean up state fields in case crunch-job exited without # putting the job in a suitable "finished" state. jobrecord.running = false @@ -392,7 +422,18 @@ class Dispatcher # Don't fail the job if crunch-job didn't even get as far as # starting it. If the job failed to run due to an infrastructure # issue with crunch-job or slurm, we want the job to stay in the - # queue. + # queue. If crunch-job exited after losing a race to another + # crunch-job process, it exits 75 and we should leave the job + # record alone so the winner of the race do its thing. + # + # There is still an unhandled race condition: If our crunch-job + # process is about to lose a race with another crunch-job + # process, but crashes before getting to its "exit 75" (for + # example, "cannot fork" or "cannot reach API server") then we + # will assume incorrectly that it's our process's fault + # jobrecord.started_at is non-nil, and mark the job as failed + # even though the winner of the race is probably still doing + # fine. end # Invalidate the per-job auth token @@ -475,36 +516,32 @@ class Dispatcher # send message to log table. we want these records to be transient def write_log running_job + return if running_job[:log_truncated] + return if running_job[:stderr_buf_to_flush] == '' begin - if (running_job && running_job[:stderr_buf_to_flush] != '') - # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job - # or crunch_limit_log_events_per_job. - if (too_many_bytes_logged_for_job(running_job)) - return if running_job[:log_truncated] - running_job[:log_truncated] = true - running_job[:stderr_buf_to_flush] = - "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated" - elsif (too_many_events_logged_for_job(running_job)) - return if running_job[:log_truncated] - running_job[:log_truncated] = true - running_job[:stderr_buf_to_flush] = - "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated" - end - log = Log.new(object_uuid: running_job[:job].uuid, - event_type: 'stderr', - owner_uuid: running_job[:job].owner_uuid, - properties: {"text" => running_job[:stderr_buf_to_flush]}) - log.save! - running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size - running_job[:events_logged] += 1 - running_job[:stderr_buf_to_flush] = '' - running_job[:stderr_flushed_at] = Time.now.to_i + # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job + # or crunch_limit_log_events_per_job. + if (too_many_bytes_logged_for_job(running_job)) + running_job[:log_truncated] = true + running_job[:stderr_buf_to_flush] = + "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated" + elsif (too_many_events_logged_for_job(running_job)) + running_job[:log_truncated] = true + running_job[:stderr_buf_to_flush] = + "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated" end + log = Log.new(object_uuid: running_job[:job].uuid, + event_type: 'stderr', + owner_uuid: running_job[:job].owner_uuid, + properties: {"text" => running_job[:stderr_buf_to_flush]}) + log.save! + running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size + running_job[:events_logged] += 1 rescue - running_job[:stderr_buf] = "Failed to write logs \n" - running_job[:stderr_buf_to_flush] = '' - running_job[:stderr_flushed_at] = Time.now.to_i + running_job[:stderr_buf] = "Failed to write logs\n" + running_job[:stderr_buf] end + running_job[:stderr_buf_to_flush] = '' + running_job[:stderr_flushed_at] = Time.now.to_i end end