require File.dirname(__FILE__) + '/../config/environment'
require 'open3'
-$redis ||= Redis.new
-LOG_BUFFER_SIZE = 2**20
-
-$tmp_log_buffer = ''
-$previous_tmp_log_at = Time.now
-TMP_LOG_BUFFER_SIZE = 4096
-
class Dispatcher
include ApplicationHelper
def update_node_status
if Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
- @nodes_in_state = {idle: 0, alloc: 0, down: 0}
@node_state ||= {}
node_seen = {}
begin
next if node_seen[re[1]]
node_seen[re[1]] = true
- # count nodes in each state
- @nodes_in_state[re[2].to_sym] += 1
-
# update our database (and cache) when a node's state changes
if @node_state[re[1]] != re[2]
@node_state[re[1]] = re[2]
node = Node.where('hostname=?', re[1]).first
if node
$stderr.puts "dispatch: update #{re[1]} state to #{re[2]}"
- node.info[:slurm_state] = re[2]
- node.save
+ node.info['slurm_state'] = re[2]
+ if not node.save
+ $stderr.puts "dispatch: failed to update #{node.uuid}: #{node.errors.messages}"
+ end
elsif re[2] != 'down'
$stderr.puts "dispatch: sinfo reports '#{re[1]}' is not down, but no node has that name"
end
end
end
- rescue
+ rescue => error
+ $stderr.puts "dispatch: error updating node status: #{error}"
end
end
end
- def start_jobs
- @todo.each do |job|
+ def positive_int(raw_value, default=nil)
+ value = begin raw_value.to_i rescue 0 end
+ if value > 0
+ value
+ else
+ default
+ end
+ end
- min_nodes = 1
- begin
- if job.runtime_constraints['min_nodes']
- min_nodes = begin job.runtime_constraints['min_nodes'].to_i rescue 1 end
+ NODE_CONSTRAINT_MAP = {
+ # Map Job runtime_constraints keys to the corresponding Node info key.
+ 'min_ram_mb_per_node' => 'total_ram_mb',
+ 'min_scratch_mb_per_node' => 'total_scratch_mb',
+ 'min_cores_per_node' => 'total_cpu_cores',
+ }
+
+ def nodes_available_for_job_now(job)
+ # Find Nodes that satisfy a Job's runtime constraints (by building
+ # a list of Procs and using them to test each Node). If there
+ # enough to run the Job, return an array of their names.
+ # Otherwise, return nil.
+ need_procs = NODE_CONSTRAINT_MAP.each_pair.map do |job_key, node_key|
+ Proc.new do |node|
+ positive_int(node.info[node_key], 0) >=
+ positive_int(job.runtime_constraints[job_key], 0)
+ end
+ end
+ min_node_count = positive_int(job.runtime_constraints['min_nodes'], 1)
+ usable_nodes = []
+ Node.find_each do |node|
+ good_node = (node.info['slurm_state'] == 'idle')
+ need_procs.each { |node_test| good_node &&= node_test.call(node) }
+ if good_node
+ usable_nodes << node
+ if usable_nodes.count >= min_node_count
+ return usable_nodes.map { |node| node.hostname }
end
end
+ end
+ nil
+ end
- begin
- next if @nodes_in_state[:idle] < min_nodes
- rescue
- end
+ def nodes_available_for_job(job)
+ # Check if there are enough idle nodes with the Job's minimum
+ # hardware requirements to run it. If so, return an array of
+ # their names. If not, up to once per hour, signal start_jobs to
+ # hold off launching Jobs. This delay is meant to give the Node
+ # Manager an opportunity to make new resources available for new
+ # Jobs.
+ #
+ # The exact timing parameters here might need to be adjusted for
+ # the best balance between helping the longest-waiting Jobs run,
+ # and making efficient use of immediately available resources.
+ # These are all just first efforts until we have more data to work
+ # with.
+ nodelist = nodes_available_for_job_now(job)
+ if nodelist.nil? and not did_recently(:wait_for_available_nodes, 3600)
+ $stderr.puts "dispatch: waiting for nodes for #{job.uuid}"
+ @node_wait_deadline = Time.now + 5.minutes
+ end
+ nodelist
+ end
+ def start_jobs
+ @todo.each do |job|
next if @running[job.uuid]
- next if !take(job)
cmd_args = nil
case Server::Application.config.crunch_job_wrapper
when :none
cmd_args = []
when :slurm_immediate
+ nodelist = nodes_available_for_job(job)
+ if nodelist.nil?
+ if Time.now < @node_wait_deadline
+ break
+ else
+ next
+ end
+ end
cmd_args = ["salloc",
"--chdir=/",
"--immediate",
"--exclusive",
"--no-kill",
"--job-name=#{job.uuid}",
- "--nodes=#{min_nodes}"]
+ "--nodelist=#{nodelist.join(',')}"]
else
raise "Unknown crunch_job_wrapper: #{Server::Application.config.crunch_job_wrapper}"
end
+ next if !take(job)
+
if Server::Application.config.crunch_job_user
cmd_args.unshift("sudo", "-E", "-u",
Server::Application.config.crunch_job_user,
- "PERLLIB=#{ENV['PERLLIB']}")
+ "PATH=#{ENV['PATH']}",
+ "PERLLIB=#{ENV['PERLLIB']}",
+ "PYTHONPATH=#{ENV['PYTHONPATH']}",
+ "RUBYLIB=#{ENV['RUBYLIB']}",
+ "GEM_PATH=#{ENV['GEM_PATH']}")
end
job_auth = ApiClientAuthorization.
$stderr.puts "dispatch: job #{job.uuid}"
start_banner = "dispatch: child #{t.pid} start #{Time.now.ctime.to_s}"
$stderr.puts start_banner
- $redis.set job.uuid, start_banner + "\n"
- $redis.publish job.uuid, start_banner
- $redis.publish job.owner_uuid, start_banner
@running[job.uuid] = {
stdin: i,
stderr_buf: '',
started: false,
sent_int: 0,
- job_auth: job_auth
+ job_auth: job_auth,
+ stderr_buf_to_flush: '',
+ stderr_flushed_at: 0,
+ bytes_logged: 0,
+ events_logged: 0,
+ log_truncated: false
}
i.close
+ update_node_status
end
end
lines.each do |line|
$stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
$stderr.puts line
- pub_msg = "#{Time.now.ctime.to_s} #{line.strip}"
- $redis.publish job.owner_uuid, pub_msg
- $redis.publish job_uuid, pub_msg
- $redis.append job_uuid, pub_msg + "\n"
- if LOG_BUFFER_SIZE < $redis.strlen(job_uuid)
- $redis.set(job_uuid,
- $redis
- .getrange(job_uuid, (LOG_BUFFER_SIZE >> 1), -1)
- .sub(/^.*?\n/, ''))
- end
+ pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
+ j[:stderr_buf_to_flush] << pub_msg
+ end
- if (TMP_LOG_BUFFER_SIZE < $tmp_log_buffer.size) || ($previous_tmp_log_at+1 < Time.now)
- $tmp_log_buffer += (pub_msg + "\n")
- write_log job_uuid
- else
- $tmp_log_buffer += (pub_msg + "\n")
- end
+ if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
+ (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
+ write_log j
end
end
end
# Ensure every last drop of stdout and stderr is consumed
read_pipes
- write_log job_done.uuid # write any remaining logs
+ write_log j_done # write any remaining logs
if j_done[:stderr_buf] and j_done[:stderr_buf] != ''
$stderr.puts j_done[:stderr_buf] + "\n"
# Invalidate the per-job auth token
j_done[:job_auth].update_attributes expires_at: Time.now
- $redis.publish job_done.uuid, "end"
-
@running.delete job_done.uuid
end
protected
+ def too_many_bytes_logged_for_job(j)
+ return (j[:bytes_logged] + j[:stderr_buf_to_flush].size >
+ Rails.configuration.crunch_limit_log_event_bytes_per_job)
+ end
+
+ def too_many_events_logged_for_job(j)
+ return (j[:events_logged] >= Rails.configuration.crunch_limit_log_events_per_job)
+ end
+
def did_recently(thing, min_interval)
@did_recently ||= {}
if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
end
# send message to log table. we want these records to be transient
- def write_log job_uuid
- if $tmp_log_buffer == ''
- return
+ def write_log running_job
+ begin
+ if (running_job && running_job[:stderr_buf_to_flush] != '')
+ # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
+ # or crunch_limit_log_events_per_job.
+ if (too_many_bytes_logged_for_job(running_job))
+ return if running_job[:log_truncated]
+ running_job[:log_truncated] = true
+ running_job[:stderr_buf_to_flush] =
+ "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
+ elsif (too_many_events_logged_for_job(running_job))
+ return if running_job[:log_truncated]
+ running_job[:log_truncated] = true
+ running_job[:stderr_buf_to_flush] =
+ "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
+ end
+ log = Log.new(object_uuid: running_job[:job].uuid,
+ event_type: 'stderr',
+ owner_uuid: running_job[:job].owner_uuid,
+ properties: {"text" => running_job[:stderr_buf_to_flush]})
+ log.save!
+ running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
+ running_job[:events_logged] += 1
+ running_job[:stderr_buf_to_flush] = ''
+ running_job[:stderr_flushed_at] = Time.now.to_i
+ end
+ rescue
+ running_job[:stderr_buf] = "Failed to write logs \n"
+ running_job[:stderr_buf_to_flush] = ''
+ running_job[:stderr_flushed_at] = Time.now.to_i
end
- log = Log.new(object_uuid: job_uuid,
- event_type:'transient-log-entry',
- summary: $tmp_log_buffer)
- log.save!
- $tmp_log_buffer = ''
- $previous_tmp_log_at = Time.now
end
+
end
# This is how crunch-job child procs know where the "refresh" trigger file is