X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fb35b803418d6f732af4273dff8d0485f02f8290..319cc7b3862ff831798f92bcb2f0c921d208147d:/services/api/script/crunch-dispatch.rb diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb index eeabc16271..a9b7598203 100755 --- a/services/api/script/crunch-dispatch.rb +++ b/services/api/script/crunch-dispatch.rb @@ -26,8 +26,7 @@ require File.dirname(__FILE__) + '/../config/boot' require File.dirname(__FILE__) + '/../config/environment' require 'open3' -$redis ||= Redis.new -LOG_BUFFER_SIZE = 2**20 +LOG_BUFFER_SIZE = 4096 class Dispatcher include ApplicationHelper @@ -37,7 +36,7 @@ class Dispatcher end def refresh_todo - @todo = Job.queue + @todo = Job.queue.select do |j| j.repository end @todo_pipelines = PipelineInstance.queue end @@ -145,32 +144,38 @@ class Dispatcher api_client_id: 0) job_auth.save - cmd_args << (ENV['CRUNCH_JOB_BIN'] || `which crunch-job`.strip) - cmd_args << '--job-api-token' - cmd_args << job_auth.api_token - cmd_args << '--job' - cmd_args << job.uuid - - if cmd_args[0] == '' + crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip) + if crunch_job_bin == '' raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path." end - commit = Commit.where(sha1: job.script_version).first - if commit - cmd_args << '--git-dir' - if File.exists?(File. - join(Rails.configuration.git_repositories_dir, - commit.repository_name + '.git')) - cmd_args << File. - join(Rails.configuration.git_repositories_dir, - commit.repository_name + '.git') - else - cmd_args << File. - join(Rails.configuration.git_repositories_dir, - commit.repository_name, '.git') - end + require 'shellwords' + + arvados_internal = Rails.configuration.git_internal_dir + if not File.exists? arvados_internal + $stderr.puts `mkdir -p #{arvados_internal.shellescape} && cd #{arvados_internal.shellescape} && git init --bare` end + src_repo = File.join(Rails.configuration.git_repositories_dir, job.repository + '.git') + src_repo = File.join(Rails.configuration.git_repositories_dir, job.repository, '.git') unless File.exists? src_repo + + unless src_repo + $stderr.puts "dispatch: #{File.join Rails.configuration.git_repositories_dir, job.repository} doesn't exist" + sleep 1 + untake(job) + next + end + + $stderr.puts `cd #{arvados_internal.shellescape} && git fetch --no-tags #{src_repo.shellescape} && git tag #{job.uuid.shellescape} #{job.script_version.shellescape}` + + cmd_args << crunch_job_bin + cmd_args << '--job-api-token' + cmd_args << job_auth.api_token + cmd_args << '--job' + cmd_args << job.uuid + cmd_args << '--git-dir' + cmd_args << arvados_internal + $stderr.puts "dispatch: #{cmd_args.join ' '}" begin @@ -185,9 +190,6 @@ class Dispatcher $stderr.puts "dispatch: job #{job.uuid}" start_banner = "dispatch: child #{t.pid} start #{Time.now.ctime.to_s}" $stderr.puts start_banner - $redis.set job.uuid, start_banner + "\n" - $redis.publish job.uuid, start_banner - $redis.publish job.owner_uuid, start_banner @running[job.uuid] = { stdin: i, @@ -198,7 +200,9 @@ class Dispatcher stderr_buf: '', started: false, sent_int: 0, - job_auth: job_auth + job_auth: job_auth, + stderr_buf_to_flush: '', + stderr_flushed_at: 0 } i.close end @@ -243,16 +247,12 @@ class Dispatcher lines.each do |line| $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) $stderr.puts line - pub_msg = "#{Time.now.ctime.to_s} #{line.strip}" - $redis.publish job.owner_uuid, pub_msg - $redis.publish job_uuid, pub_msg - $redis.append job_uuid, pub_msg + "\n" - if LOG_BUFFER_SIZE < $redis.strlen(job_uuid) - $redis.set(job_uuid, - $redis - .getrange(job_uuid, (LOG_BUFFER_SIZE >> 1), -1) - .sub(/^.*?\n/, '')) - end + pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n" + j[:stderr_buf_to_flush] << pub_msg + end + + if (LOG_BUFFER_SIZE < j[:stderr_buf_to_flush].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i) + write_log j end end end @@ -297,10 +297,11 @@ class Dispatcher job_done = j_done[:job] $stderr.puts "dispatch: child #{pid_done} exit" $stderr.puts "dispatch: job #{job_done.uuid} end" - $redis.publish job_done.uuid, "end" # Ensure every last drop of stdout and stderr is consumed read_pipes + write_log j_done # write any remaining logs + if j_done[:stderr_buf] and j_done[:stderr_buf] != '' $stderr.puts j_done[:stderr_buf] + "\n" end @@ -308,6 +309,23 @@ class Dispatcher # Wait the thread j_done[:wait_thr].value + jobrecord = Job.find_by_uuid(job_done.uuid) + if jobrecord.started_at + # Clean up state fields in case crunch-job exited without + # putting the job in a suitable "finished" state. + jobrecord.running = false + jobrecord.finished_at ||= Time.now + if jobrecord.success.nil? + jobrecord.success = false + end + jobrecord.save! + else + # Don't fail the job if crunch-job didn't even get as far as + # starting it. If the job failed to run due to an infrastructure + # issue with crunch-job or slurm, we want the job to stay in the + # queue. + end + # Invalidate the per-job auth token j_done[:job_auth].update_attributes expires_at: Time.now @@ -315,20 +333,25 @@ class Dispatcher end def update_pipelines - puts @todo_pipelines + expire_tokens = @pipe_auth_tokens.dup @todo_pipelines.each do |p| - pipe_auth = ApiClientAuthorization. - new(user: User.where('uuid=?', p.modified_by_user_uuid).first, - api_client_id: 0) - pipe_auth.save + pipe_auth = (@pipe_auth_tokens[p.uuid] ||= ApiClientAuthorization. + create(user: User.where('uuid=?', p.modified_by_user_uuid).first, + api_client_id: 0)) + puts `export ARVADOS_API_TOKEN=#{pipe_auth.api_token} && arv-run-pipeline-instance --run-here --no-wait --instance #{p.uuid}` + expire_tokens.delete p.uuid + end - puts `export ARVADOS_API_TOKEN=#{pipe_auth.api_token} && arv-run-pipeline-instance --no-wait --instance #{p.uuid}` + expire_tokens.each do |k, v| + v.update_attributes expires_at: Time.now + @pipe_auth_tokens.delete k end end def run act_as_system_user @running ||= {} + @pipe_auth_tokens ||= { } $stderr.puts "dispatch: ready" while !$signal[:term] or @running.size > 0 read_pipes @@ -350,7 +373,7 @@ class Dispatcher unless @todo.empty? or did_recently(:start_jobs, 1.0) or $signal[:term] start_jobs end - unless @todo_pipelines.empty? or did_recently(:update_pipelines, 5.0) + unless (@todo_pipelines.empty? and @pipe_auth_tokens.empty?) or did_recently(:update_pipelines, 5.0) update_pipelines end end @@ -360,8 +383,6 @@ class Dispatcher end end - - protected def did_recently(thing, min_interval) @@ -373,6 +394,26 @@ class Dispatcher true end end + + # send message to log table. we want these records to be transient + def write_log running_job + begin + if (running_job && running_job[:stderr_buf_to_flush] != '') + log = Log.new(object_uuid: running_job[:job].uuid, + event_type: 'stderr', + owner_uuid: running_job[:job].owner_uuid, + properties: {"text" => running_job[:stderr_buf_to_flush]}) + log.save! + running_job[:stderr_buf_to_flush] = '' + running_job[:stderr_flushed_at] = Time.now.to_i + end + rescue + running_job[:stderr_buf] = "Failed to write logs \n" + running_job[:stderr_buf_to_flush] = '' + running_job[:stderr_flushed_at] = Time.now.to_i + end + end + end # This is how crunch-job child procs know where the "refresh" trigger file is