end
def start_jobs
- if Server::Application.config.whjobmanager_wrapper.to_s.match /^slurm/
+ if Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
@idle_slurm_nodes = 0
begin
`sinfo`.
next if !take(job)
cmd_args = nil
- case Server::Application.config.whjobmanager_wrapper
+ case Server::Application.config.crunch_job_wrapper
when :none
cmd_args = []
when :slurm_immediate
"--job-name=#{job.uuid}",
"--nodes=#{min_nodes}"]
else
- raise "Unknown whjobmanager_wrapper: #{Server::Application.config.whjobmanager_wrapper}"
+ raise "Unknown crunch_job_wrapper: #{Server::Application.config.crunch_job_wrapper}"
end
- cmd_args << 'whjobmanager'
- cmd_args << "id=#{job.uuid}"
- cmd_args << "mrfunction=#{job.script}"
- job.script_parameters.each do |k,v|
- k = k.to_s
- if k == 'input'
- k = 'inputkey'
- else
- k = k.upcase
- end
- cmd_args << "#{k}=#{v}"
- end
- cmd_args << "revision=#{job.script_version}"
-
- begin
- cmd_args << "stepspernode=#{job.resource_limits['max_tasks_per_node'].to_i}"
- rescue
- # OK if limit is not specified. OK to ignore if not integer.
+ job_auth = ApiClientAuthorization.
+ new(user: User.where('uuid=?', job.modified_by_user).first,
+ api_client_id: 0)
+ job_auth.save
+
+ cmd_args << 'crunch-job'
+ cmd_args << '--job-api-token'
+ cmd_args << job_auth.api_token
+ cmd_args << '--job'
+ cmd_args << job.uuid
+
+ commit = Commit.where(sha1: job.script_version).first
+ if commit
+ cmd_args << '--git-dir'
+ cmd_args << File.
+ join(Rails.configuration.git_repositories_dir,
+ commit.repository_name,
+ '.git')
end
$stderr.puts "dispatch: #{cmd_args.join ' '}"
job: job,
stderr_buf: '',
started: false,
- sent_int: 0
+ sent_int: 0,
+ job_auth: job_auth
}
i.close
end
end
def take(job)
- lock_ok = false
- ActiveRecord::Base.transaction do
- job.reload
- if job.is_locked_by.nil? and
- job.update_attributes(is_locked_by: sysuser.uuid)
- lock_ok = true
- end
- end
- lock_ok
+ # no-op -- let crunch-job take care of locking.
+ true
end
def untake(job)
- job.reload
- job.update_attributes is_locked_by: nil
+ # no-op -- let crunch-job take care of locking.
+ true
end
def read_pipes
if stderr_buf
j[:stderr_buf] << stderr_buf
if j[:stderr_buf].index "\n"
- lines = j[:stderr_buf].lines "\n"
+ lines = j[:stderr_buf].lines("\n").to_a
if j[:stderr_buf][-1] == "\n"
j[:stderr_buf] = ''
else
lines.each do |line|
$stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
$stderr.puts line
- line.chomp!
- if (re = line.match(/#{job_uuid} (\d+) (\S*) (.*)/))
- ignorethis, whjmpid, taskid, message = re.to_a
- if taskid == '' and message == 'start'
- $stderr.puts "dispatch: noticed #{job_uuid} started"
- j[:started] = true
- ActiveRecord::Base.transaction do
- j[:job].reload
- j[:job].update_attributes running: true, started_at: Time.now
- end
- elsif taskid == '' and (re = message.match /^revision (\S+)$/)
- $stderr.puts "dispatch: noticed #{job_uuid} version #{re[1]}"
- ActiveRecord::Base.transaction do
- j[:job].reload
- j[:job].script_version = re[1]
- j[:job].save
- end
- elsif taskid == '' and (re = message.match /^outputkey (\S+)$/)
- $stderr.puts "dispatch: noticed #{job_uuid} output #{re[1]}"
- j[:output] = re[1]
- elsif taskid == '' and (re = message.match /^meta key is (\S+)$/)
- $stderr.puts "dispatch: noticed #{job_uuid} log #{re[1]}"
- j[:log] = re[1]
- ActiveRecord::Base.transaction do
- j[:job].reload
- j[:job].update_attributes log: j[:log]
- end
- elsif taskid.match(/^\d+/) and (re = message.match /^failure /)
- $stderr.puts "dispatch: noticed #{job_uuid} task fail"
- ActiveRecord::Base.transaction do
- j[:job].reload
- j[:job].tasks_summary ||= {}
- j[:job].tasks_summary[:failed] ||= 0
- j[:job].tasks_summary[:failed] += 1
- j[:job].save
- end
- elsif (re = message.match(/^status: (\d+) done, (\d+) running, (\d+) todo/))
- $stderr.puts "dispatch: noticed #{job_uuid} #{message}"
- ActiveRecord::Base.transaction do
- j[:job].reload
- j[:job].tasks_summary ||= {}
- j[:job].tasks_summary[:done] = re[1].to_i
- j[:job].tasks_summary[:running] = re[2].to_i
- j[:job].tasks_summary[:todo] = re[3].to_i
- j[:job].save
- end
- if re[2].to_i == 0 and re[3].to_i == 0
- $stderr.puts "dispatch: noticed #{job_uuid} succeeded"
- j[:success] = true
- end
- end
- end
end
end
end
$stderr.puts j_done[:stderr_buf] + "\n"
end
- j_done[:wait_thr].value # wait the thread
+ # Wait the thread
+ j_done[:wait_thr].value
+
+ # Invalidate the per-job auth token
+ j_done[:job_auth].update_attributes expires_at: Time.now
- if !j_done[:started]
- # If the job never really started (due to a scheduling
- # failure), just put it back in the queue
- untake(job_done)
- $stderr.puts "dispatch: job #{job_done.uuid} requeued"
- else
- # Otherwise, mark the job as finished
- ActiveRecord::Base.transaction do
- job_done.reload
- job_done.log = j_done[:log]
- job_done.output = j_done[:output]
- job_done.success = j_done[:success]
- job_done.assert_finished
- end
- end
@running.delete job_done.uuid
end