include Process
+$options = {}
+(ARGV.any? ? ARGV : ['--jobs', '--pipelines']).each do |arg|
+ case arg
+ when '--jobs'
+ $options[:jobs] = true
+ when '--pipelines'
+ $options[:pipelines] = true
+ else
+ abort "Unrecognized command line option '#{arg}'"
+ end
+end
+if not ($options[:jobs] or $options[:pipelines])
+ abort "Nothing to do. Please specify at least one of: --jobs, --pipelines."
+end
+
+ARGV.reject! { |a| a =~ /--jobs|--pipelines/ }
+
$warned = {}
$signal = {}
%w{TERM INT}.each do |sig|
end
def refresh_todo
- @todo = Job.queue.select do |j| j.repository end
- @todo_pipelines = PipelineInstance.queue
+ @todo = []
+ if $options[:jobs]
+ @todo = Job.queue.select(&:repository)
+ end
+ @todo_pipelines = []
+ if $options[:pipelines]
+ @todo_pipelines = PipelineInstance.queue
+ end
end
def sinfo
begin
sinfo.split("\n").
each do |line|
- re = line.match /(\S+?):+(idle|alloc|down)/
+ re = line.match /(\S+?):+(idle|alloc|down)?/
next if !re
+ _, node_name, node_state = *re
+ node_state = 'down' unless %w(idle alloc down).include? node_state
+
# sinfo tells us about a node N times if it is shared by N partitions
- next if node_seen[re[1]]
- node_seen[re[1]] = true
+ next if node_seen[node_name]
+ node_seen[node_name] = true
# update our database (and cache) when a node's state changes
- if @node_state[re[1]] != re[2]
- @node_state[re[1]] = re[2]
- node = Node.where('hostname=?', re[1]).first
+ if @node_state[node_name] != node_state
+ @node_state[node_name] = node_state
+ node = Node.where('hostname=?', node_name).order(:last_ping_at).last
if node
- $stderr.puts "dispatch: update #{re[1]} state to #{re[2]}"
- node.info['slurm_state'] = re[2]
+ $stderr.puts "dispatch: update #{node_name} state to #{node_state}"
+ node.info['slurm_state'] = node_state
if not node.save
$stderr.puts "dispatch: failed to update #{node.uuid}: #{node.errors.messages}"
end
- elsif re[2] != 'down'
- $stderr.puts "dispatch: sinfo reports '#{re[1]}' is not down, but no node has that name"
+ elsif node_state != 'down'
+ $stderr.puts "dispatch: sinfo reports '#{node_name}' is not down, but no node has that name"
end
end
end
cmd_args = nil
case Server::Application.config.crunch_job_wrapper
when :none
+ if @running.size > 0
+ # Don't run more than one at a time.
+ return
+ end
cmd_args = []
when :slurm_immediate
nodelist = nodes_available_for_job(job)
next
end
- $stderr.puts `cd #{arvados_internal.shellescape} && git fetch --no-tags #{src_repo.shellescape} && git tag #{job.uuid.shellescape} #{job.script_version.shellescape}`
+ $stderr.puts `cd #{arvados_internal.shellescape} && git fetch-pack --all #{src_repo.shellescape} && git tag #{job.uuid.shellescape} #{job.script_version.shellescape}`
cmd_args << crunch_job_bin
cmd_args << '--job-api-token'
$stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
$stderr.puts line
pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
- j[:stderr_buf_to_flush] << pub_msg
+ if not j[:log_truncated]
+ j[:stderr_buf_to_flush] << pub_msg
+ end
end
- if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
- (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
- write_log j
+ if not j[:log_truncated]
+ if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
+ (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
+ write_log j
+ end
end
end
end
$stderr.puts j_done[:stderr_buf] + "\n"
end
- # Wait the thread
- j_done[:wait_thr].value
+ # Wait the thread (returns a Process::Status)
+ exit_status = j_done[:wait_thr].value
jobrecord = Job.find_by_uuid(job_done.uuid)
- if jobrecord.started_at
+ if exit_status.to_i != 75 and jobrecord.started_at
# Clean up state fields in case crunch-job exited without
# putting the job in a suitable "finished" state.
jobrecord.running = false
# Don't fail the job if crunch-job didn't even get as far as
# starting it. If the job failed to run due to an infrastructure
# issue with crunch-job or slurm, we want the job to stay in the
- # queue.
+ # queue. If crunch-job exited after losing a race to another
+ # crunch-job process, it exits 75 and we should leave the job
+ # record alone so the winner of the race do its thing.
+ #
+ # There is still an unhandled race condition: If our crunch-job
+ # process is about to lose a race with another crunch-job
+ # process, but crashes before getting to its "exit 75" (for
+ # example, "cannot fork" or "cannot reach API server") then we
+ # will assume incorrectly that it's our process's fault
+ # jobrecord.started_at is non-nil, and mark the job as failed
+ # even though the winner of the race is probably still doing
+ # fine.
end
# Invalidate the per-job auth token
# send message to log table. we want these records to be transient
def write_log running_job
+ return if running_job[:log_truncated]
+ return if running_job[:stderr_buf_to_flush] == ''
begin
- if (running_job && running_job[:stderr_buf_to_flush] != '')
- # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
- # or crunch_limit_log_events_per_job.
- if (too_many_bytes_logged_for_job(running_job))
- return if running_job[:log_truncated]
- running_job[:log_truncated] = true
- running_job[:stderr_buf_to_flush] =
- "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
- elsif (too_many_events_logged_for_job(running_job))
- return if running_job[:log_truncated]
- running_job[:log_truncated] = true
- running_job[:stderr_buf_to_flush] =
- "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
- end
- log = Log.new(object_uuid: running_job[:job].uuid,
- event_type: 'stderr',
- owner_uuid: running_job[:job].owner_uuid,
- properties: {"text" => running_job[:stderr_buf_to_flush]})
- log.save!
- running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
- running_job[:events_logged] += 1
- running_job[:stderr_buf_to_flush] = ''
- running_job[:stderr_flushed_at] = Time.now.to_i
+ # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
+ # or crunch_limit_log_events_per_job.
+ if (too_many_bytes_logged_for_job(running_job))
+ running_job[:log_truncated] = true
+ running_job[:stderr_buf_to_flush] =
+ "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
+ elsif (too_many_events_logged_for_job(running_job))
+ running_job[:log_truncated] = true
+ running_job[:stderr_buf_to_flush] =
+ "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
end
+ log = Log.new(object_uuid: running_job[:job].uuid,
+ event_type: 'stderr',
+ owner_uuid: running_job[:job].owner_uuid,
+ properties: {"text" => running_job[:stderr_buf_to_flush]})
+ log.save!
+ running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
+ running_job[:events_logged] += 1
rescue
- running_job[:stderr_buf] = "Failed to write logs \n"
- running_job[:stderr_buf_to_flush] = ''
- running_job[:stderr_flushed_at] = Time.now.to_i
+ running_job[:stderr_buf] = "Failed to write logs\n" + running_job[:stderr_buf]
end
+ running_job[:stderr_buf_to_flush] = ''
+ running_job[:stderr_flushed_at] = Time.now.to_i
end
end