Merge branch 'master' into 2955-fail-orphan-jobs
[arvados.git] / services / api / script / crunch-dispatch.rb
index abed9ddf65e80e078ae34edf01a07fe63a49adf6..ee8076b305fad984aee6bdde12ef5cfe803e14d4 100755 (executable)
@@ -1,5 +1,7 @@
 #!/usr/bin/env ruby
 
+require 'trollop'
+
 include Process
 
 $warned = {}
@@ -20,6 +22,10 @@ if ENV["CRUNCH_DISPATCH_LOCKFILE"]
   end
 end
 
+$trollopts = Trollop::options do
+    opt :use_env, "Pass selected environment variables (PATH, PYTHONPATH, RUBYLIB, GEM_PATH, PERLLIB) to crunch-job"
+end
+
 ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development"
 
 require File.dirname(__FILE__) + '/../config/boot'
@@ -136,7 +142,11 @@ class Dispatcher
       if Server::Application.config.crunch_job_user
         cmd_args.unshift("sudo", "-E", "-u",
                          Server::Application.config.crunch_job_user,
-                         "PERLLIB=#{ENV['PERLLIB']}")
+                         "PATH=#{ENV['PATH']}",
+                         "PERLLIB=#{ENV['PERLLIB']}",
+                         "PYTHONPATH=#{ENV['PYTHONPATH']}",
+                         "RUBYLIB=#{ENV['RUBYLIB']}",
+                         "GEM_PATH=#{ENV['GEM_PATH']}")
       end
 
       job_auth = ApiClientAuthorization.
@@ -176,10 +186,10 @@ class Dispatcher
       cmd_args << '--git-dir'
       cmd_args << arvados_internal
 
-      $stderr.puts "dispatch: #{cmd_args.join ' '}"
+      $stderr.puts "dispatch: #{cmd_args}"
 
       begin
-        i, o, e, t = Open3.popen3(*cmd_args)
+        i, o, e, t = Open3.popen3({}, *cmd_args, { :unsetenv_others => true})
       rescue
         $stderr.puts "dispatch: popen3: #{$!}"
         sleep 1
@@ -201,6 +211,7 @@ class Dispatcher
         started: false,
         sent_int: 0,
         job_auth: job_auth,
+        stderr_buf_to_flush: '',
         stderr_flushed_at: 0
       }
       i.close
@@ -236,18 +247,22 @@ class Dispatcher
 
       if stderr_buf
         j[:stderr_buf] << stderr_buf
-        if stderr_buf.index "\n" || j[:stderr_flushed_at] != Time.now.to_i
-        lines = stderr_buf.lines("\n").to_a
+        if j[:stderr_buf].index "\n"
+          lines = j[:stderr_buf].lines("\n").to_a
+          if j[:stderr_buf][-1] == "\n"
+            j[:stderr_buf] = ''
+          else
+            j[:stderr_buf] = lines.pop
+          end
           lines.each do |line|
             $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
             $stderr.puts line
-            log_msg = "#{Time.now.ctime.to_s} #{line.strip}"
-            j[:stderr_buf] << (log_msg + " \n")
+            pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
+            j[:stderr_buf_to_flush] << pub_msg
           end
 
-          if (LOG_BUFFER_SIZE < j[:stderr_buf].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i)
+          if (LOG_BUFFER_SIZE < j[:stderr_buf_to_flush].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i)
             write_log j
-            j[:stderr_flushed_at] = Time.now.to_i
           end
         end
       end
@@ -393,16 +408,18 @@ class Dispatcher
   # send message to log table. we want these records to be transient
   def write_log running_job
     begin
-      if (running_job && running_job[:stderr_buf] != '')
+      if (running_job && running_job[:stderr_buf_to_flush] != '')
         log = Log.new(object_uuid: running_job[:job].uuid,
                       event_type: 'stderr',
-                      properties: {"text" => running_job[:stderr_buf]})
+                      owner_uuid: running_job[:job].owner_uuid,
+                      properties: {"text" => running_job[:stderr_buf_to_flush]})
         log.save!
-        running_job[:stderr_buf] = ''
+        running_job[:stderr_buf_to_flush] = ''
         running_job[:stderr_flushed_at] = Time.now.to_i
       end
     rescue
-      running_job[:stderr_buf] = 'Failed to write logs \n'
+      running_job[:stderr_buf] = "Failed to write logs \n"
+      running_job[:stderr_buf_to_flush] = ''
       running_job[:stderr_flushed_at] = Time.now.to_i
     end
   end