def pipieline_log_history(job_uuids)
results = []
- log_history = Log.where(event_type: 'stderr', object_uuid: job_uuids).
- order('id DESC').limit(20).all
+ log_history = Log.where(event_type: 'stderr',
+ object_uuid: job_uuids).order('id DESC')
if !log_history.results.empty?
reversed_results = log_history.results.reverse
reversed_results.each do |entry|
- summary = entry.summary
- results = results.concat summary.split("\n")
+ if entry.andand.properties
+ properties = entry.properties
+ text = properties[:text]
+ if text
+ results = results.concat text.split("\n")
+ end
+ end
end
end
require File.dirname(__FILE__) + '/../config/environment'
require 'open3'
-$redis ||= Redis.new
-LOG_BUFFER_SIZE = 2**20
-
-$tmp_log_buffer = ''
-$previous_tmp_log_at = Time.now
-TMP_LOG_BUFFER_SIZE = 4096
+LOG_BUFFER_SIZE = 4096
class Dispatcher
include ApplicationHelper
$stderr.puts "dispatch: job #{job.uuid}"
start_banner = "dispatch: child #{t.pid} start #{Time.now.ctime.to_s}"
$stderr.puts start_banner
- $redis.set job.uuid, start_banner + "\n"
- $redis.publish job.uuid, start_banner
- $redis.publish job.owner_uuid, start_banner
@running[job.uuid] = {
stdin: i,
stderr_buf: '',
started: false,
sent_int: 0,
- job_auth: job_auth
+ job_auth: job_auth,
+ stderr_flushed_at: 0
}
i.close
end
if stderr_buf
j[:stderr_buf] << stderr_buf
- if j[:stderr_buf].index "\n"
- lines = j[:stderr_buf].lines("\n").to_a
- if j[:stderr_buf][-1] == "\n"
- j[:stderr_buf] = ''
- else
- j[:stderr_buf] = lines.pop
- end
+ if stderr_buf.index "\n" || j[:stderr_flushed_at] != Time.now.to_i
+ lines = stderr_buf.lines("\n").to_a
lines.each do |line|
$stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
$stderr.puts line
- pub_msg = "#{Time.now.ctime.to_s} #{line.strip}"
- $redis.publish job.owner_uuid, pub_msg
- $redis.publish job_uuid, pub_msg
- $redis.append job_uuid, pub_msg + "\n"
- if LOG_BUFFER_SIZE < $redis.strlen(job_uuid)
- $redis.set(job_uuid,
- $redis
- .getrange(job_uuid, (LOG_BUFFER_SIZE >> 1), -1)
- .sub(/^.*?\n/, ''))
- end
+ log_msg = "#{Time.now.ctime.to_s} #{line.strip}"
+ j[:stderr_buf] << (log_msg + " \n")
+ end
- if (TMP_LOG_BUFFER_SIZE < $tmp_log_buffer.size) || ($previous_tmp_log_at+1 < Time.now)
- $tmp_log_buffer += (pub_msg + "\n")
- write_log job_uuid
- else
- $tmp_log_buffer += (pub_msg + "\n")
- end
+ if (LOG_BUFFER_SIZE < j[:stderr_buf].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i)
+ write_log j
+ j[:stderr_flushed_at] = Time.now.to_i
end
end
end
# Ensure every last drop of stdout and stderr is consumed
read_pipes
- write_log job_done.uuid # write any remaining logs
+ write_log j_done # write any remaining logs
if j_done[:stderr_buf] and j_done[:stderr_buf] != ''
$stderr.puts j_done[:stderr_buf] + "\n"
# Invalidate the per-job auth token
j_done[:job_auth].update_attributes expires_at: Time.now
- $redis.publish job_done.uuid, "end"
-
@running.delete job_done.uuid
end
end
# send message to log table. we want these records to be transient
- def write_log job_uuid
- if $tmp_log_buffer == ''
- return
+ def write_log running_job
+ begin
+ if (running_job && running_job[:stderr_buf] != '')
+ log = Log.new(object_uuid: running_job[:job].uuid,
+ event_type: 'stderr',
+ properties: {"text" => running_job[:stderr_buf]})
+ log.save!
+ running_job[:stderr_buf] = ''
+ running_job[:stderr_flushed_at] = Time.now.to_i
+ end
+ rescue
+ running_job[:stderr_buf] = 'Failed to write logs \n'
+ running_job[:stderr_flushed_at] = Time.now.to_i
end
- log = Log.new(object_uuid: job_uuid,
- event_type:'stderr',
- summary: $tmp_log_buffer)
- log.save!
- $tmp_log_buffer = ''
- $previous_tmp_log_at = Time.now
end
+
end
# This is how crunch-job child procs know where the "refresh" trigger file is