X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3ff644b461b69c38418384e532e3741a07333daf..e910d13fc5fc63a86f20be3b758c08db3d429bc6:/services/api/script/crunch-dispatch.rb diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb index a5afb227a4..ff9d5f6c02 100755 --- a/services/api/script/crunch-dispatch.rb +++ b/services/api/script/crunch-dispatch.rb @@ -334,7 +334,7 @@ class Dispatcher stderr: e, wait_thr: t, job: job, - stderr_buf: '', + buf: {:stdout => '', :stderr => ''}, started: false, sent_int: 0, job_auth: job_auth, @@ -356,41 +356,47 @@ class Dispatcher @running.each do |job_uuid, j| job = j[:job] - # Throw away child stdout - begin - j[:stdout].read_nonblock(2**20) - rescue Errno::EAGAIN, EOFError - end - - # Read whatever is available from child stderr - stderr_buf = false - begin - stderr_buf = j[:stderr].read_nonblock(2**20) - rescue Errno::EAGAIN, EOFError - end + j[:buf].each do |stream, streambuf| + # Read some data from the child stream + buf = false + begin + buf = j[stream].read_nonblock(2**16) + rescue Errno::EAGAIN, EOFError + end - if stderr_buf - j[:stderr_buf] << stderr_buf - if j[:stderr_buf].index "\n" - lines = j[:stderr_buf].lines("\n").to_a - if j[:stderr_buf][-1] == "\n" - j[:stderr_buf] = '' - else - j[:stderr_buf] = lines.pop - end - lines.each do |line| - $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) - $stderr.puts line - pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n" - if not j[:log_truncated] - j[:stderr_buf_to_flush] << pub_msg + if buf + # Add to a the buffer + streambuf << buf + + # Check for at least one complete line + if streambuf.index "\n" + lines = streambuf.lines("\n").to_a + + # check if the last line is partial or not + j[:buf][stream] = if streambuf[-1] == "\n" + # nope + '' + else + # Put the partial line back into the buffer + lines.pop + end + + # Now spool the lines to the log output buffer + lines.each do |line| + $stderr.print "#{job_uuid} ! " unless line.index(job_uuid) + $stderr.puts line + pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n" + if not j[:log_truncated] + j[:stderr_buf_to_flush] << pub_msg + end end - end - if not j[:log_truncated] - if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or - (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i)) - write_log j + # Now actually send the log output to the logs table + if not j[:log_truncated] + if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or + (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i)) + write_log j + end end end end @@ -441,8 +447,10 @@ class Dispatcher read_pipes write_log j_done # write any remaining logs - if j_done[:stderr_buf] and j_done[:stderr_buf] != '' - $stderr.puts j_done[:stderr_buf] + "\n" + j_done[:buf].each do |stream, streambuf| + if streambuf != '' + $stderr.puts streambuf + "\n" + end end # Wait the thread (returns a Process::Status) @@ -609,7 +617,7 @@ class Dispatcher running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size running_job[:events_logged] += 1 rescue - running_job[:stderr_buf] = "Failed to write logs\n" + running_job[:stderr_buf] + running_job[:buf][:stderr] = "Failed to write logs\n" + running_job[:buf][:stderr] end running_job[:stderr_buf_to_flush] = '' running_job[:stderr_flushed_at] = Time.now.to_i