X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/facb17a2885e6b6d3e998e15862a3c6e970e8cf1..2a64eae3cf8363c596feda5337ea20ce356ca11f:/services/api/script/crunch-dispatch.rb diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb index ad406ef1a9..f49f21b04c 100755 --- a/services/api/script/crunch-dispatch.rb +++ b/services/api/script/crunch-dispatch.rb @@ -1,5 +1,7 @@ #!/usr/bin/env ruby +require 'trollop' + include Process $warned = {} @@ -20,6 +22,10 @@ if ENV["CRUNCH_DISPATCH_LOCKFILE"] end end +$trollopts = Trollop::options do + opt :use_env, "Pass selected environment variables (PATH, PYTHONPATH, RUBYLIB, GEM_PATH, PERLLIB) to crunch-job" +end + ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development" require File.dirname(__FILE__) + '/../config/boot' @@ -35,6 +41,24 @@ class Dispatcher return act_as_system_user end + def refresh_running + Job.running.each do |jobrecord| + if !@running[jobrecord.uuid] + f = Log.filter(["object_uuid", "=", jobrecord.uuid]).limit(1).order("created_at desc").results.first + if (Time.now - f.created_at) > 300 + # job is marked running, but not known to crunch-dispatcher, and + # hasn't produced any log entries for 5 minutes, so mark it as failed. + jobrecord.running = false + jobrecord.finished_at ||= Time.now + if jobrecord.success.nil? + jobrecord.success = false + end + jobrecord.save! + end + end + end + end + def refresh_todo @todo = Job.queue.select do |j| j.repository end @todo_pipelines = PipelineInstance.queue @@ -134,9 +158,23 @@ class Dispatcher end if Server::Application.config.crunch_job_user - cmd_args.unshift("sudo", "-E", "-u", - Server::Application.config.crunch_job_user, - "PERLLIB=#{ENV['PERLLIB']}") + cmd_args.unshift("sudo", "-E", "-u", Server::Application.config.crunch_job_user) + end + + cmd_args << "HOME=/dev/null" + cmd_args << "ARVADOS_API_HOST=#{ENV['ARVADOS_API_HOST']}" + cmd_args << "ARVADOS_API_HOST_INSECURE=#{ENV['ARVADOS_API_HOST_INSECURE']}" if ENV['ARVADOS_API_HOST_INSECURE'] + + ENV.each do |k, v| + cmd_args << "#{k}=#{v}" if k.starts_with? "CRUNCH_" + end + + if $trollopts.use_env + cmd_args << "PATH=#{ENV['PATH']}" + cmd_args << "PYTHONPATH=#{ENV['PYTHONPATH']}" + cmd_args << "PERLLIB=#{ENV['PERLLIB']}" + cmd_args << "RUBYLIB=#{ENV['RUBYLIB']}" + cmd_args << "GEM_PATH=#{ENV['GEM_PATH']}" end job_auth = ApiClientAuthorization. @@ -176,10 +214,10 @@ class Dispatcher cmd_args << '--git-dir' cmd_args << arvados_internal - $stderr.puts "dispatch: #{cmd_args.join ' '}" + $stderr.puts "dispatch: #{cmd_args}" begin - i, o, e, t = Open3.popen3(*cmd_args) + i, o, e, t = Open3.popen3({}, *cmd_args, { :unsetenv_others => true}) rescue $stderr.puts "dispatch: popen3: #{$!}" sleep 1 @@ -201,6 +239,7 @@ class Dispatcher started: false, sent_int: 0, job_auth: job_auth, + stderr_buf_to_flush: '', stderr_flushed_at: 0 } i.close @@ -235,18 +274,23 @@ class Dispatcher end if stderr_buf - if stderr_buf.index "\n" - lines = stderr_buf.lines("\n").to_a + j[:stderr_buf] << stderr_buf + if j[:stderr_buf].index "\n" + lines = j[:stderr_buf].lines("\n").to_a + if j[:stderr_buf][-1] == "\n" + j[:stderr_buf] = '' + else + j[:stderr_buf] = lines.pop + end lines.each do |line| $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) $stderr.puts line - log_msg = "#{Time.now.ctime.to_s} #{line.strip}" - j[:stderr_buf] << (log_msg + " \n") + pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n" + j[:stderr_buf_to_flush] << pub_msg end - if (LOG_BUFFER_SIZE < j[:stderr_buf].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i) + if (LOG_BUFFER_SIZE < j[:stderr_buf_to_flush].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i) write_log j - j[:stderr_flushed_at] = Time.now.to_i end end end @@ -362,6 +406,7 @@ class Dispatcher end end else + refresh_running unless did_recently(:refresh_running, 60.0) refresh_todo unless did_recently(:refresh_todo, 1.0) update_node_status unless @todo.empty? or did_recently(:start_jobs, 1.0) or $signal[:term] @@ -392,16 +437,18 @@ class Dispatcher # send message to log table. we want these records to be transient def write_log running_job begin - if (running_job && running_job[:stderr_buf] != '') + if (running_job && running_job[:stderr_buf_to_flush] != '') log = Log.new(object_uuid: running_job[:job].uuid, event_type: 'stderr', - properties: {"text" => running_job[:stderr_buf]}) + owner_uuid: running_job[:job].owner_uuid, + properties: {"text" => running_job[:stderr_buf_to_flush]}) log.save! - running_job[:stderr_buf] = '' + running_job[:stderr_buf_to_flush] = '' running_job[:stderr_flushed_at] = Time.now.to_i end rescue running_job[:stderr_buf] = "Failed to write logs \n" + running_job[:stderr_buf_to_flush] = '' running_job[:stderr_flushed_at] = Time.now.to_i end end