X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c3b30a4104264f78088133b7d758e1920b1525af..ae09643622ecea00bab110f20029f01c83e1cf30:/services/api/script/crunch-dispatch.rb diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb index 3b29f1089f..c09be81fd2 100755 --- a/services/api/script/crunch-dispatch.rb +++ b/services/api/script/crunch-dispatch.rb @@ -1,5 +1,7 @@ #!/usr/bin/env ruby +require 'trollop' + include Process $warned = {} @@ -20,6 +22,10 @@ if ENV["CRUNCH_DISPATCH_LOCKFILE"] end end +$trollopts = Trollop::options do + opt :use_env, "Pass selected environment variables (PATH, PYTHONPATH, RUBYLIB, GEM_PATH, PERLLIB) to crunch-job" +end + ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development" require File.dirname(__FILE__) + '/../config/boot' @@ -35,6 +41,26 @@ class Dispatcher return act_as_system_user end + def refresh_running + Job.running.each do |jobrecord| + if !@running[jobrecord.uuid] + f = Log.where("object_uuid=?", jobrecord.uuid).limit(1).order("created_at desc").first + age = (Time.now - f.created_at) + if age > 300 + $stderr.puts "dispatch: failing orphan job #{jobrecord.uuid}, last log is #{age} seconds old" + # job is marked running, but not known to crunch-dispatcher, and + # hasn't produced any log entries for 5 minutes, so mark it as failed. + jobrecord.running = false + jobrecord.finished_at ||= Time.now + if jobrecord.success.nil? + jobrecord.success = false + end + jobrecord.save! + end + end + end + end + def refresh_todo @todo = Job.queue.select do |j| j.repository end @todo_pipelines = PipelineInstance.queue @@ -134,9 +160,23 @@ class Dispatcher end if Server::Application.config.crunch_job_user - cmd_args.unshift("sudo", "-E", "-u", - Server::Application.config.crunch_job_user, - "PERLLIB=#{ENV['PERLLIB']}") + cmd_args.unshift("sudo", "-E", "-u", Server::Application.config.crunch_job_user) + end + + cmd_args << "HOME=/dev/null" + cmd_args << "ARVADOS_API_HOST=#{ENV['ARVADOS_API_HOST']}" + cmd_args << "ARVADOS_API_HOST_INSECURE=#{ENV['ARVADOS_API_HOST_INSECURE']}" if ENV['ARVADOS_API_HOST_INSECURE'] + + ENV.each do |k, v| + cmd_args << "#{k}=#{v}" if k.starts_with? "CRUNCH_" + end + + if $trollopts.use_env + cmd_args << "PATH=#{ENV['PATH']}" + cmd_args << "PYTHONPATH=#{ENV['PYTHONPATH']}" + cmd_args << "PERLLIB=#{ENV['PERLLIB']}" + cmd_args << "RUBYLIB=#{ENV['RUBYLIB']}" + cmd_args << "GEM_PATH=#{ENV['GEM_PATH']}" end job_auth = ApiClientAuthorization. @@ -176,10 +216,10 @@ class Dispatcher cmd_args << '--git-dir' cmd_args << arvados_internal - $stderr.puts "dispatch: #{cmd_args.join ' '}" + $stderr.puts "dispatch: #{cmd_args}" begin - i, o, e, t = Open3.popen3(*cmd_args) + i, o, e, t = Open3.popen3({}, *cmd_args, { :unsetenv_others => true}) rescue $stderr.puts "dispatch: popen3: #{$!}" sleep 1 @@ -201,6 +241,7 @@ class Dispatcher started: false, sent_int: 0, job_auth: job_auth, + stderr_buf_to_flush: '', stderr_flushed_at: 0 } i.close @@ -235,18 +276,23 @@ class Dispatcher end if stderr_buf - if stderr_buf.index "\n" - lines = stderr_buf.lines("\n").to_a + j[:stderr_buf] << stderr_buf + if j[:stderr_buf].index "\n" + lines = j[:stderr_buf].lines("\n").to_a + if j[:stderr_buf][-1] == "\n" + j[:stderr_buf] = '' + else + j[:stderr_buf] = lines.pop + end lines.each do |line| $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) $stderr.puts line - log_msg = "#{Time.now.ctime.to_s} #{line.strip}" - j[:stderr_buf] << (log_msg + " \n") + pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n" + j[:stderr_buf_to_flush] << pub_msg end - if (LOG_BUFFER_SIZE < j[:stderr_buf].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i) + if (LOG_BUFFER_SIZE < j[:stderr_buf_to_flush].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i) write_log j - j[:stderr_flushed_at] = Time.now.to_i end end end @@ -362,6 +408,7 @@ class Dispatcher end end else + refresh_running unless did_recently(:refresh_running, 60.0) refresh_todo unless did_recently(:refresh_todo, 1.0) update_node_status unless @todo.empty? or did_recently(:start_jobs, 1.0) or $signal[:term] @@ -392,24 +439,18 @@ class Dispatcher # send message to log table. we want these records to be transient def write_log running_job begin - owner_uuid = nil - begin - Group.find_by_uuid(running_job[:job].owner_uuid) - owner_uuid = running_job[:job].owner_uuid - rescue - end - - if (running_job && running_job[:stderr_buf] != '') + if (running_job && running_job[:stderr_buf_to_flush] != '') log = Log.new(object_uuid: running_job[:job].uuid, event_type: 'stderr', - owner_uuid: owner_uuid, - properties: {"text" => running_job[:stderr_buf]}) + owner_uuid: running_job[:job].owner_uuid, + properties: {"text" => running_job[:stderr_buf_to_flush]}) log.save! - running_job[:stderr_buf] = '' + running_job[:stderr_buf_to_flush] = '' running_job[:stderr_flushed_at] = Time.now.to_i end rescue running_job[:stderr_buf] = "Failed to write logs \n" + running_job[:stderr_buf_to_flush] = '' running_job[:stderr_flushed_at] = Time.now.to_i end end