class LogStreamer
Q_UPDATE_INTERVAL = 12
- def initialize(job)
+ def initialize(job, opts={})
@job = job
+ @opts = opts
end
def each
if @job.finished_at
end
@redis = Redis.new(:timeout => 0)
@redis.subscribe(@job.uuid) do |event|
+ if @redis.exists @job.uuid
+ # A log buffer exists. Start by showing the last few KB.
+ @redis.
+ getrange(@job.uuid, 0 - [@opts[:buffer_size], 1].max, -1).
+ sub(/^[^\n]*\n?/, '').
+ split("\n").
+ each do |line|
+ yield "#{line}\n"
+ end
+ end
+ # TODO: avoid duplicating the last few lines of the log
+ # file. Use the fact that timestamps are lexicographically
+ # ordered.
event.message do |channel, msg|
if msg == "end"
@redis.unsubscribe @job.uuid
end
end
+ def self._log_tail_follow_requires_parameters
+ {
+ buffer_size: {type: 'integer', required: false}
+ }
+ end
def log_tail_follow
if !@object.andand.uuid
return render_not_found
end
self.response.headers['Last-Modified'] = Time.now.ctime.to_s
- self.response_body = LogStreamer.new @object
+ self.response_body = LogStreamer.new @object, {
+ buffer_size: (params[:buffer_size] || 2**13)
+ }
end
def queue