#!/usr/bin/env ruby
+require 'shellwords'
include Process
$options = {}
require File.dirname(__FILE__) + '/../config/environment'
require 'open3'
+class LogTime < Time
+ def to_s
+ self.utc.strftime "%Y-%m-%d_%H:%M:%S"
+ end
+end
+
class Dispatcher
include ApplicationHelper
+ def initialize
+ @crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
+ if @crunch_job_bin.empty?
+ raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
+ end
+
+ @arvados_internal = Rails.configuration.git_internal_dir
+ if not File.exists? @arvados_internal
+ $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
+ raise "No internal git repository available" unless ($? == 0)
+ end
+
+ @repo_root = Rails.configuration.git_repositories_dir
+ @authorizations = {}
+ @did_recently = {}
+ @fetched_commits = {}
+ @git_tags = {}
+ @node_state = {}
+ @pipe_auth_tokens = {}
+ @running = {}
+ @todo = []
+ @todo_pipelines = []
+ end
+
def sysuser
return act_as_system_user
end
def refresh_todo
- @todo = []
if $options[:jobs]
@todo = Job.queue.select(&:repository)
end
- @todo_pipelines = []
if $options[:pipelines]
@todo_pipelines = PipelineInstance.queue
end
def slurm_status
slurm_nodes = {}
each_slurm_line("sinfo", "%t") do |hostname, state|
+ # Treat nodes in idle* state as down, because the * means that slurm
+ # hasn't been able to communicate with it recently.
+ state.sub!(/^idle\*/, "down")
state.sub!(/\W+$/, "")
state = "down" unless %w(idle alloc down).include?(state)
slurm_nodes[hostname] = {state: state, job: nil}
def update_node_status
return unless Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
- @node_state ||= {}
slurm_status.each_pair do |hostname, slurmdata|
next if @node_state[hostname] == slurmdata
begin
nodelist
end
+ def fail_job job, message
+ $stderr.puts "dispatch: #{job.uuid}: #{message}"
+ begin
+ Log.new(object_uuid: job.uuid,
+ event_type: 'dispatch',
+ owner_uuid: job.owner_uuid,
+ summary: message,
+ properties: {"text" => message}).save!
+ rescue
+ $stderr.puts "dispatch: log.create failed"
+ end
+
+ begin
+ job.lock @authorizations[job.uuid].user.uuid
+ job.state = "Failed"
+ if not job.save
+ $stderr.puts "dispatch: save failed setting job #{job.uuid} to failed"
+ end
+ rescue ArvadosModel::AlreadyLockedError
+ $stderr.puts "dispatch: tried to mark job #{job.uuid} as failed but it was already locked by someone else"
+ end
+ end
+
+ def stdout_s(cmd_a, opts={})
+ IO.popen(cmd_a, "r", opts) do |pipe|
+ return pipe.read.chomp
+ end
+ end
+
+ def git_cmd(*cmd_a)
+ ["git", "--git-dir=#{@arvados_internal}"] + cmd_a
+ end
+
+ def get_authorization(job)
+ if @authorizations[job.uuid] and
+ @authorizations[job.uuid].user.uuid != job.modified_by_user_uuid
+ # We already made a token for this job, but we need a new one
+ # because modified_by_user_uuid has changed (the job will run
+ # as a different user).
+ @authorizations[job.uuid].update_attributes expires_at: Time.now
+ @authorizations[job.uuid] = nil
+ end
+ if not @authorizations[job.uuid]
+ auth = ApiClientAuthorization.
+ new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
+ api_client_id: 0)
+ if not auth.save
+ $stderr.puts "dispatch: auth.save failed for #{job.uuid}"
+ else
+ @authorizations[job.uuid] = auth
+ end
+ end
+ @authorizations[job.uuid]
+ end
+
+ def get_commit(repo_name, commit_hash)
+ # @fetched_commits[V]==true if we know commit V exists in the
+ # arvados_internal git repository.
+ if !@fetched_commits[commit_hash]
+ src_repo = File.join(@repo_root, "#{repo_name}.git")
+ if not File.exists? src_repo
+ src_repo = File.join(@repo_root, repo_name, '.git')
+ if not File.exists? src_repo
+ fail_job job, "No #{repo_name}.git or #{repo_name}/.git at #{@repo_root}"
+ return nil
+ end
+ end
+
+ # check if the commit needs to be fetched or not
+ commit_rev = stdout_s(git_cmd("rev-list", "-n1", commit_hash),
+ err: "/dev/null")
+ unless $? == 0 and commit_rev == commit_hash
+ # commit does not exist in internal repository, so import the source repository using git fetch-pack
+ cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo)
+ $stderr.puts "dispatch: #{cmd}"
+ $stderr.puts(stdout_s(cmd))
+ unless $? == 0
+ fail_job job, "git fetch-pack failed"
+ return nil
+ end
+ end
+ @fetched_commits[commit_hash] = true
+ end
+ @fetched_commits[commit_hash]
+ end
+
+ def tag_commit(commit_hash, tag_name)
+ # @git_tags[T]==V if we know commit V has been tagged T in the
+ # arvados_internal repository.
+ if not @git_tags[tag_name]
+ cmd = git_cmd("tag", tag_name, commit_hash)
+ $stderr.puts "dispatch: #{cmd}"
+ $stderr.puts(stdout_s(cmd, err: "/dev/null"))
+ unless $? == 0
+ # git tag failed. This may be because the tag already exists, so check for that.
+ tag_rev = stdout_s(git_cmd("rev-list", "-n1", tag_name))
+ if $? == 0
+ # We got a revision back
+ if tag_rev != commit_hash
+ # Uh oh, the tag doesn't point to the revision we were expecting.
+ # Someone has been monkeying with the job record and/or git.
+ fail_job job, "Existing tag #{tag_name} points to commit #{tag_rev} but expected commit #{commit_hash}"
+ return nil
+ end
+ # we're okay (fall through to setting @git_tags below)
+ else
+ # git rev-list failed for some reason.
+ fail_job job, "'git tag' for #{tag_name} failed but did not find any existing tag using 'git rev-list'"
+ return nil
+ end
+ end
+ # 'git tag' was successful, or there is an existing tag that points to the same revision.
+ @git_tags[tag_name] = commit_hash
+ elsif @git_tags[tag_name] != commit_hash
+ fail_job job, "Existing tag #{tag_name} points to commit #{@git_tags[tag_name]} but this job uses commit #{commit_hash}"
+ return nil
+ end
+ @git_tags[tag_name]
+ end
+
def start_jobs
@todo.each do |job|
next if @running[job.uuid]
"GEM_PATH=#{ENV['GEM_PATH']}")
end
- job_auth = ApiClientAuthorization.
- new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
- api_client_id: 0)
- if not job_auth.save
- $stderr.puts "dispatch: job_auth.save failed"
- next
- end
-
- crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
- if crunch_job_bin == ''
- raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
- end
-
- require 'shellwords'
-
- arvados_internal = Rails.configuration.git_internal_dir
- if not File.exists? arvados_internal
- $stderr.puts `mkdir -p #{arvados_internal.shellescape} && cd #{arvados_internal.shellescape} && git init --bare`
- end
-
- repo_root = Rails.configuration.git_repositories_dir
- src_repo = File.join(repo_root, job.repository + '.git')
- if not File.exists? src_repo
- src_repo = File.join(repo_root, job.repository, '.git')
- if not File.exists? src_repo
- $stderr.puts "dispatch: No #{job.repository}.git or #{job.repository}/.git at #{repo_root}"
- sleep 1
- next
- end
- end
-
- git = "git --git-dir=#{arvados_internal.shellescape}"
-
- # check if the commit needs to be fetched or not
- commit_rev = `#{git} rev-list -n1 #{job.script_version.shellescape} 2>/dev/null`.chomp
- unless $? == 0 and commit_rev == job.script_version
- # commit does not exist in internal repository, so import the source repository using git fetch-pack
- cmd = "#{git} fetch-pack --no-progress --all #{src_repo.shellescape}"
- $stderr.puts cmd
- $stderr.puts `#{cmd}`
- unless $? == 0
- $stderr.puts "dispatch: git fetch-pack failed"
- sleep 1
- next
- end
- end
-
- # check if the commit needs to be tagged with this job uuid
- tag_rev = `#{git} rev-list -n1 #{job.uuid.shellescape} 2>/dev/null`.chomp
- if $? != 0
- # no job tag found, so create one
- cmd = "#{git} tag #{job.uuid.shellescape} #{job.script_version.shellescape}"
- $stderr.puts cmd
- $stderr.puts `#{cmd}`
- unless $? == 0
- $stderr.puts "dispatch: git tag failed"
- sleep 1
- next
- end
- else
- # job tag found, check that it has the expected revision
- unless tag_rev == job.script_version
- # Uh oh, the tag doesn't point to the revision we were expecting.
- # Someone has been monkeying with the job record and/or git.
- $stderr.puts "dispatch: Already a tag #{job.script_version} pointing to commit #{tag_rev} but expected commit #{job.script_version}"
- job.state = "Failed"
- if not job.save
- $stderr.puts "dispatch: job.save failed"
- next
- end
- next
- end
+ ready = (get_authorization(job) and
+ get_commit(job.repository, job.script_version) and
+ tag_commit(job.script_version, job.uuid))
+ if ready and job.arvados_sdk_version
+ ready = (get_commit("arvados", job.arvados_sdk_version) and
+ tag_commit(job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"))
end
+ next unless ready
- cmd_args << crunch_job_bin
- cmd_args << '--job-api-token'
- cmd_args << job_auth.api_token
- cmd_args << '--job'
- cmd_args << job.uuid
- cmd_args << '--git-dir'
- cmd_args << arvados_internal
+ cmd_args += [@crunch_job_bin,
+ '--job-api-token', @authorizations[job.uuid].api_token,
+ '--job', job.uuid,
+ '--git-dir', @arvados_internal]
$stderr.puts "dispatch: #{cmd_args.join ' '}"
end
$stderr.puts "dispatch: job #{job.uuid}"
- start_banner = "dispatch: child #{t.pid} start #{Time.now.ctime.to_s}"
+ start_banner = "dispatch: child #{t.pid} start #{LogTime.now}"
$stderr.puts start_banner
@running[job.uuid] = {
buf: {stderr: '', stdout: ''},
started: false,
sent_int: 0,
- job_auth: job_auth,
+ job_auth: @authorizations[job.uuid],
stderr_buf_to_flush: '',
- stderr_flushed_at: 0,
+ stderr_flushed_at: Time.new(0),
bytes_logged: 0,
events_logged: 0,
- log_truncated: false
+ log_throttle_is_open: true,
+ log_throttle_reset_time: Time.now + Rails.configuration.crunch_log_throttle_period,
+ log_throttle_bytes_so_far: 0,
+ log_throttle_lines_so_far: 0,
+ log_throttle_bytes_skipped: 0,
}
i.close
update_node_status
end
end
+ # Test for hard cap on total output and for log throttling. Returns whether
+ # the log line should go to output or not. Modifies "line" in place to
+ # replace it with an error if a logging limit is tripped.
+ def rate_limit running_job, line
+ message = false
+ linesize = line.size
+ if running_job[:log_throttle_is_open]
+ running_job[:log_throttle_lines_so_far] += 1
+ running_job[:log_throttle_bytes_so_far] += linesize
+ running_job[:bytes_logged] += linesize
+
+ if (running_job[:bytes_logged] >
+ Rails.configuration.crunch_limit_log_bytes_per_job)
+ message = "Exceeded log limit #{Rails.configuration.crunch_limit_log_bytes_per_job} bytes (crunch_limit_log_bytes_per_job). Log will be truncated."
+ running_job[:log_throttle_reset_time] = Time.now + 100.years
+ running_job[:log_throttle_is_open] = false
+
+ elsif (running_job[:log_throttle_bytes_so_far] >
+ Rails.configuration.crunch_log_throttle_bytes)
+ remaining_time = running_job[:log_throttle_reset_time] - Time.now
+ message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds.\n"
+ running_job[:log_throttle_is_open] = false
+
+ elsif (running_job[:log_throttle_lines_so_far] >
+ Rails.configuration.crunch_log_throttle_lines)
+ remaining_time = running_job[:log_throttle_reset_time] - Time.now
+ message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds.\n"
+ running_job[:log_throttle_is_open] = false
+ end
+ end
+
+ if not running_job[:log_throttle_is_open]
+ # Don't log anything if any limit has been exceeded. Just count lossage.
+ running_job[:log_throttle_bytes_skipped] += linesize
+ end
+
+ if message
+ # Yes, write to logs, but use our "rate exceeded" message
+ # instead of the log message that exceeded the limit.
+ line.replace message
+ true
+ else
+ running_job[:log_throttle_is_open]
+ end
+ end
+
def read_pipes
@running.each do |job_uuid, j|
job = j[:job]
+ now = Time.now
+ if now > j[:log_throttle_reset_time]
+ # It has been more than throttle_period seconds since the last
+ # checkpoint so reset the throttle
+ if j[:log_throttle_bytes_skipped] > 0
+ message = "#{job_uuid} ! Skipped #{j[:log_throttle_bytes_skipped]} bytes of log"
+ $stderr.puts message
+ j[:stderr_buf_to_flush] << "#{LogTime.now} #{message}\n"
+ end
+
+ j[:log_throttle_reset_time] = now + Rails.configuration.crunch_log_throttle_period
+ j[:log_throttle_bytes_so_far] = 0
+ j[:log_throttle_lines_so_far] = 0
+ j[:log_throttle_bytes_skipped] = 0
+ j[:log_throttle_is_open] = true
+ end
+
j[:buf].each do |stream, streambuf|
# Read some data from the child stream
- buf = false
+ buf = ''
begin
- buf = j[stream].read_nonblock(2**16)
+ # It's important to use a big enough buffer here. When we're
+ # being flooded with logs, we must read and discard many
+ # bytes at once. Otherwise, we can easily peg a CPU with
+ # time-checking and other loop overhead. (Quick tests show a
+ # 1MiB buffer working 2.5x as fast as a 64 KiB buffer.)
+ #
+ # So don't reduce this buffer size!
+ buf = j[stream].read_nonblock(2**20)
rescue Errno::EAGAIN, EOFError
end
- if buf
- # Add to a the buffer
- streambuf << buf
-
- # Check for at least one complete line
- if streambuf.index "\n"
- lines = streambuf.lines("\n").to_a
-
- # check if the last line is partial or not
- j[:buf][stream] = if streambuf[-1] == "\n"
- # nope
- ''
- else
- # Put the partial line back into the buffer
- lines.pop
- end
-
- # Now spool the lines to the log output buffer
- lines.each do |line|
- $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
- $stderr.puts line
- pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
- if not j[:log_truncated]
- j[:stderr_buf_to_flush] << pub_msg
- end
- end
+ # Short circuit the counting code if we're just going to throw
+ # away the data anyway.
+ if not j[:log_throttle_is_open]
+ j[:log_throttle_bytes_skipped] += streambuf.size + buf.size
+ streambuf.replace ''
+ next
+ elsif buf == ''
+ next
+ end
- # Now actually send the log output to the logs table
- if not j[:log_truncated]
- if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
- (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
- write_log j
- end
+ # Append to incomplete line from previous read, if any
+ streambuf << buf
+
+ bufend = ''
+ streambuf.each_line do |line|
+ if not line.end_with? $/
+ if line.size > Rails.configuration.crunch_log_throttle_bytes
+ # Without a limit here, we'll use 2x an arbitrary amount
+ # of memory, and waste a lot of time copying strings
+ # around, all without providing any feedback to anyone
+ # about what's going on _or_ hitting any of our throttle
+ # limits.
+ #
+ # Here we leave "line" alone, knowing it will never be
+ # sent anywhere: rate_limit() will reach
+ # crunch_log_throttle_bytes immediately. However, we'll
+ # leave [...] in bufend: if the trailing end of the long
+ # line does end up getting sent anywhere, it will have
+ # some indication that it is incomplete.
+ bufend = "[...]"
+ else
+ # If line length is sane, we'll wait for the rest of the
+ # line to appear in the next read_pipes() call.
+ bufend = line
+ break
end
end
+ # rate_limit returns true or false as to whether to actually log
+ # the line or not. It also modifies "line" in place to replace
+ # it with an error if a logging limit is tripped.
+ if rate_limit j, line
+ $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
+ $stderr.puts line
+ pub_msg = "#{LogTime.now} #{line.strip}\n"
+ j[:stderr_buf_to_flush] << pub_msg
+ end
end
+
+ # Leave the trailing incomplete line (if any) in streambuf for
+ # next time.
+ streambuf.replace bufend
end
+ # Flush buffered logs to the logs table, if appropriate. We have
+ # to do this even if we didn't collect any new logs this time:
+ # otherwise, buffered data older than seconds_between_events
+ # won't get flushed until new data arrives.
+ write_log j
end
end
$stderr.puts "dispatch: child #{pid_done} exit"
$stderr.puts "dispatch: job #{job_done.uuid} end"
- # Ensure every last drop of stdout and stderr is consumed
+ # Ensure every last drop of stdout and stderr is consumed.
read_pipes
- write_log j_done # write any remaining logs
+ # Reset flush timestamp to make sure log gets written.
+ j_done[:stderr_flushed_at] = Time.new(0)
+ # Write any remaining logs.
+ write_log j_done
j_done[:buf].each do |stream, streambuf|
if streambuf != ''
end
# Wait the thread (returns a Process::Status)
- exit_status = j_done[:wait_thr].value
+ exit_status = j_done[:wait_thr].value.exitstatus
jobrecord = Job.find_by_uuid(job_done.uuid)
- if exit_status.to_i != 75 and jobrecord.state == "Running"
+ if exit_status != 75 and jobrecord.state == "Running"
# crunch-job did not return exit code 75 (see below) and left the job in
# the "Running" state, which means there was an unhandled error. Fail
# the job.
jobrecord.state = "Failed"
- jobrecord.save!
+ if not jobrecord.save
+ $stderr.puts "dispatch: jobrecord.save failed"
+ end
else
# Don't fail the job if crunch-job didn't even get as far as
# starting it. If the job failed to run due to an infrastructure
# fine.
end
- # Invalidate the per-job auth token
- j_done[:job_auth].update_attributes expires_at: Time.now
+ # Invalidate the per-job auth token, unless the job is still queued and we
+ # might want to try it again.
+ if jobrecord.state != "Queued"
+ j_done[:job_auth].update_attributes expires_at: Time.now
+ end
@running.delete job_done.uuid
end
def run
act_as_system_user
- @running ||= {}
- @pipe_auth_tokens ||= { }
$stderr.puts "dispatch: ready"
while !$signal[:term] or @running.size > 0
read_pipes
end
else
refresh_todo unless did_recently(:refresh_todo, 1.0)
- update_node_status
+ update_node_status unless did_recently(:update_node_status, 1.0)
unless @todo.empty? or did_recently(:start_jobs, 1.0) or $signal[:term]
start_jobs
end
protected
- def too_many_bytes_logged_for_job(j)
- return (j[:bytes_logged] + j[:stderr_buf_to_flush].size >
- Rails.configuration.crunch_limit_log_event_bytes_per_job)
- end
-
- def too_many_events_logged_for_job(j)
- return (j[:events_logged] >= Rails.configuration.crunch_limit_log_events_per_job)
- end
-
def did_recently(thing, min_interval)
- @did_recently ||= {}
if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
@did_recently[thing] = Time.now
false
# send message to log table. we want these records to be transient
def write_log running_job
- return if running_job[:log_truncated]
return if running_job[:stderr_buf_to_flush] == ''
- begin
- # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
- # or crunch_limit_log_events_per_job.
- if (too_many_bytes_logged_for_job(running_job))
- running_job[:log_truncated] = true
- running_job[:stderr_buf_to_flush] =
- "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
- elsif (too_many_events_logged_for_job(running_job))
- running_job[:log_truncated] = true
- running_job[:stderr_buf_to_flush] =
- "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
+
+ # Send out to log event if buffer size exceeds the bytes per event or if
+ # it has been at least crunch_log_seconds_between_events seconds since
+ # the last flush.
+ if running_job[:stderr_buf_to_flush].size > Rails.configuration.crunch_log_bytes_per_event or
+ (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events
+ begin
+ log = Log.new(object_uuid: running_job[:job].uuid,
+ event_type: 'stderr',
+ owner_uuid: running_job[:job].owner_uuid,
+ properties: {"text" => running_job[:stderr_buf_to_flush]})
+ log.save!
+ running_job[:events_logged] += 1
+ rescue => exception
+ $stderr.puts "Failed to write logs"
+ $stderr.puts exception.backtrace
end
- log = Log.new(object_uuid: running_job[:job].uuid,
- event_type: 'stderr',
- owner_uuid: running_job[:job].owner_uuid,
- properties: {"text" => running_job[:stderr_buf_to_flush]})
- log.save!
- running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
- running_job[:events_logged] += 1
- rescue
- running_job[:buf][:stderr] = "Failed to write logs\n" + running_job[:buf][:stderr]
+ running_job[:stderr_buf_to_flush] = ''
+ running_job[:stderr_flushed_at] = Time.now
end
- running_job[:stderr_buf_to_flush] = ''
- running_job[:stderr_flushed_at] = Time.now.to_i
end
-
end
# This is how crunch-job child procs know where the "refresh" trigger file is