X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b91db14a4dced9d6ea124e86be3c796e6f2c8e8c..f48482bd37d3ae5a5f1aa488fa330f77c5fd640d:/services/api/script/crunch-dispatch.rb diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb index 43a527afac..87acb651a4 100755 --- a/services/api/script/crunch-dispatch.rb +++ b/services/api/script/crunch-dispatch.rb @@ -1,5 +1,7 @@ #!/usr/bin/env ruby +require 'trollop' + include Process $warned = {} @@ -20,14 +22,17 @@ if ENV["CRUNCH_DISPATCH_LOCKFILE"] end end +$trollopts = Trollop::options do + opt :use_env, "Pass selected environment variables (PATH, PYTHONPATH, RUBYLIB, GEM_PATH, PERLLIB) to crunch-job" +end + ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development" require File.dirname(__FILE__) + '/../config/boot' require File.dirname(__FILE__) + '/../config/environment' require 'open3' -$redis ||= Redis.new -LOG_BUFFER_SIZE = 2**20 +LOG_BUFFER_SIZE = 4096 class Dispatcher include ApplicationHelper @@ -135,9 +140,23 @@ class Dispatcher end if Server::Application.config.crunch_job_user - cmd_args.unshift("sudo", "-E", "-u", - Server::Application.config.crunch_job_user, - "PERLLIB=#{ENV['PERLLIB']}") + cmd_args.unshift("sudo", "-E", "-u", Server::Application.config.crunch_job_user) + end + + cmd_args << "HOME=/dev/null" + cmd_args << "ARVADOS_API_HOST=#{ENV['ARVADOS_API_HOST']}" + cmd_args << "ARVADOS_API_HOST_INSECURE=#{ENV['ARVADOS_API_HOST_INSECURE']}" if ENV['ARVADOS_API_HOST_INSECURE'] + + ENV.each do |k, v| + cmd_args << "#{k}=#{v}" if k.starts_with? "CRUNCH_" + end + + if $trollopts.use_env + cmd_args << "PATH=#{ENV['PATH']}" + cmd_args << "PYTHONPATH=#{ENV['PYTHONPATH']}" + cmd_args << "PERLLIB=#{ENV['PERLLIB']}" + cmd_args << "RUBYLIB=#{ENV['RUBYLIB']}" + cmd_args << "GEM_PATH=#{ENV['GEM_PATH']}" end job_auth = ApiClientAuthorization. @@ -177,10 +196,10 @@ class Dispatcher cmd_args << '--git-dir' cmd_args << arvados_internal - $stderr.puts "dispatch: #{cmd_args.join ' '}" + $stderr.puts "dispatch: #{cmd_args}" begin - i, o, e, t = Open3.popen3(*cmd_args) + i, o, e, t = Open3.popen3({}, *cmd_args, { :unsetenv_others => true}) rescue $stderr.puts "dispatch: popen3: #{$!}" sleep 1 @@ -191,9 +210,6 @@ class Dispatcher $stderr.puts "dispatch: job #{job.uuid}" start_banner = "dispatch: child #{t.pid} start #{Time.now.ctime.to_s}" $stderr.puts start_banner - $redis.set job.uuid, start_banner + "\n" - $redis.publish job.uuid, start_banner - $redis.publish job.owner_uuid, start_banner @running[job.uuid] = { stdin: i, @@ -204,7 +220,9 @@ class Dispatcher stderr_buf: '', started: false, sent_int: 0, - job_auth: job_auth + job_auth: job_auth, + stderr_buf_to_flush: '', + stderr_flushed_at: 0 } i.close end @@ -249,16 +267,12 @@ class Dispatcher lines.each do |line| $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) $stderr.puts line - pub_msg = "#{Time.now.ctime.to_s} #{line.strip}" - $redis.publish job.owner_uuid, pub_msg - $redis.publish job_uuid, pub_msg - $redis.append job_uuid, pub_msg + "\n" - if LOG_BUFFER_SIZE < $redis.strlen(job_uuid) - $redis.set(job_uuid, - $redis - .getrange(job_uuid, (LOG_BUFFER_SIZE >> 1), -1) - .sub(/^.*?\n/, '')) - end + pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n" + j[:stderr_buf_to_flush] << pub_msg + end + + if (LOG_BUFFER_SIZE < j[:stderr_buf_to_flush].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i) + write_log j end end end @@ -306,6 +320,8 @@ class Dispatcher # Ensure every last drop of stdout and stderr is consumed read_pipes + write_log j_done # write any remaining logs + if j_done[:stderr_buf] and j_done[:stderr_buf] != '' $stderr.puts j_done[:stderr_buf] + "\n" end @@ -333,8 +349,6 @@ class Dispatcher # Invalidate the per-job auth token j_done[:job_auth].update_attributes expires_at: Time.now - $redis.publish job_done.uuid, "end" - @running.delete job_done.uuid end @@ -400,6 +414,26 @@ class Dispatcher true end end + + # send message to log table. we want these records to be transient + def write_log running_job + begin + if (running_job && running_job[:stderr_buf_to_flush] != '') + log = Log.new(object_uuid: running_job[:job].uuid, + event_type: 'stderr', + owner_uuid: running_job[:job].owner_uuid, + properties: {"text" => running_job[:stderr_buf_to_flush]}) + log.save! + running_job[:stderr_buf_to_flush] = '' + running_job[:stderr_flushed_at] = Time.now.to_i + end + rescue + running_job[:stderr_buf] = "Failed to write logs \n" + running_job[:stderr_buf_to_flush] = '' + running_job[:stderr_flushed_at] = Time.now.to_i + end + end + end # This is how crunch-job child procs know where the "refresh" trigger file is