#!/usr/bin/env ruby
+require 'shellwords'
include Process
$options = {}
nodelist
end
+ def fail_job job, message
+ $stderr.puts "dispatch: #{job.uuid}: #{message}"
+ begin
+ Log.new(object_uuid: job.uuid,
+ event_type: 'dispatch',
+ owner_uuid: job.owner_uuid,
+ summary: message,
+ properties: {"text" => message}).save!
+ rescue
+ $stderr.puts "dispatch: log.create failed"
+ end
+
+ begin
+ job.lock @authorizations[job.uuid].user.uuid
+ job.state = "Failed"
+ if not job.save
+ $stderr.puts "dispatch: save failed setting job #{job.uuid} to failed"
+ end
+ rescue AlreadyLockedError
+ $stderr.puts "dispatch: tried to mark job #{job.uuid} as failed but it was already locked by someone else"
+ end
+ end
+
def start_jobs
@todo.each do |job|
next if @running[job.uuid]
"GEM_PATH=#{ENV['GEM_PATH']}")
end
- job_auth = ApiClientAuthorization.
- new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
- api_client_id: 0)
- if not job_auth.save
- $stderr.puts "dispatch: job_auth.save failed"
- next
+ @authorizations ||= {}
+ if @authorizations[job.uuid] and
+ @authorizations[job.uuid].user.uuid != job.modified_by_user_uuid
+ # We already made a token for this job, but we need a new one
+ # because modified_by_user_uuid has changed (the job will run
+ # as a different user).
+ @authorizations[job.uuid].update_attributes expires_at: Time.now
+ @authorizations[job.uuid] = nil
+ end
+ if not @authorizations[job.uuid]
+ auth = ApiClientAuthorization.
+ new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
+ api_client_id: 0)
+ if not auth.save
+ $stderr.puts "dispatch: auth.save failed"
+ next
+ end
+ @authorizations[job.uuid] = auth
end
crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
end
- require 'shellwords'
-
arvados_internal = Rails.configuration.git_internal_dir
if not File.exists? arvados_internal
$stderr.puts `mkdir -p #{arvados_internal.shellescape} && cd #{arvados_internal.shellescape} && git init --bare`
end
- repo_root = Rails.configuration.git_repositories_dir
- src_repo = File.join(repo_root, job.repository + '.git')
- if not File.exists? src_repo
- src_repo = File.join(repo_root, job.repository, '.git')
+ git = "git --git-dir=#{arvados_internal.shellescape}"
+
+ # @fetched_commits[V]==true if we know commit V exists in the
+ # arvados_internal git repository.
+ @fetched_commits ||= {}
+ if !@fetched_commits[job.script_version]
+
+ repo_root = Rails.configuration.git_repositories_dir
+ src_repo = File.join(repo_root, job.repository + '.git')
if not File.exists? src_repo
- $stderr.puts "dispatch: No #{job.repository}.git or #{job.repository}/.git at #{repo_root}"
- sleep 1
- next
+ src_repo = File.join(repo_root, job.repository, '.git')
+ if not File.exists? src_repo
+ fail_job job, "No #{job.repository}.git or #{job.repository}/.git at #{repo_root}"
+ next
+ end
end
- end
- git = "git --git-dir=#{arvados_internal.shellescape}"
-
- # check if the commit needs to be fetched or not
- commit_rev = `#{git} rev-list -n1 #{job.script_version.shellescape} 2>/dev/null`.chomp
- unless $? == 0 and commit_rev == job.script_version
- # commit does not exist in internal repository, so import the source repository using git fetch-pack
- cmd = "#{git} fetch-pack --no-progress --all #{src_repo.shellescape}"
- $stderr.puts cmd
- $stderr.puts `#{cmd}`
- unless $? == 0
- $stderr.puts "dispatch: git fetch-pack failed"
- sleep 1
- next
+ # check if the commit needs to be fetched or not
+ commit_rev = `#{git} rev-list -n1 #{job.script_version.shellescape} 2>/dev/null`.chomp
+ unless $? == 0 and commit_rev == job.script_version
+ # commit does not exist in internal repository, so import the source repository using git fetch-pack
+ cmd = "#{git} fetch-pack --no-progress --all #{src_repo.shellescape}"
+ $stderr.puts "dispatch: #{cmd}"
+ $stderr.puts `#{cmd}`
+ unless $? == 0
+ fail_job job, "git fetch-pack failed"
+ next
+ end
end
+ @fetched_commits[job.script_version] = true
end
- # check if the commit needs to be tagged with this job uuid
- tag_rev = `#{git} rev-list -n1 #{job.uuid.shellescape} 2>/dev/null`.chomp
- if $? != 0
- # no job tag found, so create one
- cmd = "#{git} tag #{job.uuid.shellescape} #{job.script_version.shellescape}"
- $stderr.puts cmd
+ # @job_tags[J]==V if we know commit V has been tagged J in the
+ # arvados_internal repository. (J is a job UUID, V is a commit
+ # sha1.)
+ @job_tags ||= {}
+ if not @job_tags[job.uuid]
+ cmd = "#{git} tag #{job.uuid.shellescape} #{job.script_version.shellescape} 2>/dev/null"
+ $stderr.puts "dispatch: #{cmd}"
$stderr.puts `#{cmd}`
unless $? == 0
- $stderr.puts "dispatch: git tag failed"
- sleep 1
- next
- end
- else
- # job tag found, check that it has the expected revision
- unless tag_rev == job.script_version
- # Uh oh, the tag doesn't point to the revision we were expecting.
- # Someone has been monkeying with the job record and/or git.
- $stderr.puts "dispatch: Already a tag #{job.script_version} pointing to commit #{tag_rev} but expected commit #{job.script_version}"
- job.state = "Failed"
- if not job.save
- $stderr.puts "dispatch: job.save failed"
+ # git tag failed. This may be because the tag already exists, so check for that.
+ tag_rev = `#{git} rev-list -n1 #{job.uuid.shellescape}`.chomp
+ if $? == 0
+ # We got a revision back
+ if tag_rev != job.script_version
+ # Uh oh, the tag doesn't point to the revision we were expecting.
+ # Someone has been monkeying with the job record and/or git.
+ fail_job job, "Existing tag #{job.uuid} points to commit #{tag_rev} but expected commit #{job.script_version}"
+ next
+ end
+ # we're okay (fall through to setting @job_tags below)
+ else
+ # git rev-list failed for some reason.
+ fail_job job, "'git tag' for #{job.uuid} failed but did not find any existing tag using 'git rev-list'"
next
end
- next
end
+ # 'git tag' was successful, or there is an existing tag that points to the same revision.
+ @job_tags[job.uuid] = job.script_version
+ elsif @job_tags[job.uuid] != job.script_version
+ fail_job job, "Existing tag #{job.uuid} points to commit #{@job_tags[job.uuid]} but this job uses commit #{job.script_version}"
+ next
end
cmd_args << crunch_job_bin
cmd_args << '--job-api-token'
- cmd_args << job_auth.api_token
+ cmd_args << @authorizations[job.uuid].api_token
cmd_args << '--job'
cmd_args << job.uuid
cmd_args << '--git-dir'
buf: {stderr: '', stdout: ''},
started: false,
sent_int: 0,
- job_auth: job_auth,
+ job_auth: @authorizations[job.uuid],
stderr_buf_to_flush: '',
stderr_flushed_at: Time.new(0),
bytes_logged: 0,
if not j[:log_throttle_is_open]
j[:log_throttle_bytes_skipped] += streambuf.size + buf.size
streambuf.replace ''
+ next
elsif buf == ''
next
end
streambuf << buf
bufend = ''
- streambuf.lines("\n").each do |line|
- if not line.end_with? "\n"
- bufend = line
- break
+ streambuf.each_line do |line|
+ if not line.end_with? $/
+ if line.size > Rails.configuration.crunch_log_throttle_bytes
+ # Without a limit here, we'll use 2x an arbitrary amount
+ # of memory, and waste a lot of time copying strings
+ # around, all without providing any feedback to anyone
+ # about what's going on _or_ hitting any of our throttle
+ # limits.
+ #
+ # Here we leave "line" alone, knowing it will never be
+ # sent anywhere: rate_limit() will reach
+ # crunch_log_throttle_bytes immediately. However, we'll
+ # leave [...] in bufend: if the trailing end of the long
+ # line does end up getting sent anywhere, it will have
+ # some indication that it is incomplete.
+ bufend = "[...]"
+ else
+ # If line length is sane, we'll wait for the rest of the
+ # line to appear in the next read_pipes() call.
+ bufend = line
+ break
+ end
end
# rate_limit returns true or false as to whether to actually log
# the line or not. It also modifies "line" in place to replace
j[:stderr_buf_to_flush] << pub_msg
end
end
+
+ # Leave the trailing incomplete line (if any) in streambuf for
+ # next time.
streambuf.replace bufend
end
# Flush buffered logs to the logs table, if appropriate. We have
# fine.
end
- # Invalidate the per-job auth token
- j_done[:job_auth].update_attributes expires_at: Time.now
+ # Invalidate the per-job auth token, unless the job is still queued and we
+ # might want to try it again.
+ if jobrecord.state != "Queued"
+ j_done[:job_auth].update_attributes expires_at: Time.now
+ end
@running.delete job_done.uuid
end
end
else
refresh_todo unless did_recently(:refresh_todo, 1.0)
- update_node_status
+ update_node_status unless did_recently(:update_node_status, 1.0)
unless @todo.empty? or did_recently(:start_jobs, 1.0) or $signal[:term]
start_jobs
end