2756: rescue any exceptions. do not write to redis. use log properties, instead of...
authorradhika <radhika@curoverse.com>
Fri, 23 May 2014 02:21:13 +0000 (22:21 -0400)
committerradhika <radhika@curoverse.com>
Fri, 23 May 2014 02:21:13 +0000 (22:21 -0400)
apps/workbench/app/assets/javascripts/pipeline_instances.js
apps/workbench/app/helpers/pipeline_instances_helper.rb
services/api/script/crunch-dispatch.rb

index ef936e3fd725ea17b0d02504851bd3630f5491eb..0d83f3b97e8783c80178443f6430137a04d33034 100644 (file)
 
     $(document).on('arv-log-event', '.arv-log-event-handler-append-logs', function(event, eventData){
       parsedData = JSON.parse(eventData);
-      summary = parsedData.summary;
-      $(this).append(summary + "<br/>");
+
+      propertyText = undefined
+
+      properties = parsedData.properties;
+      if (properties !== null) {
+        propertyText = properties.text;
+      }
+
+      if (propertyText !== undefined) {
+        $(this).append(propertyText + "<br/>");
+      } else {
+        $(this).append(parsedData.summary + "<br/>");
+      }
     });
 
 })();
index b39bfb1e4f4a852e1761cba06868d1c29d0ab7c2..56e85dbd792269362c880f85626d14a50e48c996 100644 (file)
@@ -25,13 +25,18 @@ module PipelineInstancesHelper
   def pipieline_log_history(job_uuids)
     results = []
 
-    log_history = Log.where(event_type: 'stderr', object_uuid: job_uuids).
-                      order('id DESC').limit(20).all
+    log_history = Log.where(event_type: 'stderr',
+                            object_uuid: job_uuids).order('id DESC')
     if !log_history.results.empty?
       reversed_results = log_history.results.reverse
       reversed_results.each do |entry|
-        summary = entry.summary
-        results = results.concat summary.split("\n")
+        if entry.andand.properties
+          properties = entry.properties
+          text = properties[:text]
+          if text
+            results = results.concat text.split("\n")
+          end
+        end
       end
     end
 
index 57f146c6c8a3e296942613323be5c1880d2039fe..abed9ddf65e80e078ae34edf01a07fe63a49adf6 100755 (executable)
@@ -26,12 +26,7 @@ require File.dirname(__FILE__) + '/../config/boot'
 require File.dirname(__FILE__) + '/../config/environment'
 require 'open3'
 
-$redis ||= Redis.new
-LOG_BUFFER_SIZE = 2**20
-
-$tmp_log_buffer = ''
-$previous_tmp_log_at = Time.now
-TMP_LOG_BUFFER_SIZE = 4096
+LOG_BUFFER_SIZE = 4096
 
 class Dispatcher
   include ApplicationHelper
@@ -195,9 +190,6 @@ class Dispatcher
       $stderr.puts "dispatch: job #{job.uuid}"
       start_banner = "dispatch: child #{t.pid} start #{Time.now.ctime.to_s}"
       $stderr.puts start_banner
-      $redis.set job.uuid, start_banner + "\n"
-      $redis.publish job.uuid, start_banner
-      $redis.publish job.owner_uuid, start_banner
 
       @running[job.uuid] = {
         stdin: i,
@@ -208,7 +200,8 @@ class Dispatcher
         stderr_buf: '',
         started: false,
         sent_int: 0,
-        job_auth: job_auth
+        job_auth: job_auth,
+        stderr_flushed_at: 0
       }
       i.close
     end
@@ -243,33 +236,18 @@ class Dispatcher
 
       if stderr_buf
         j[:stderr_buf] << stderr_buf
-        if j[:stderr_buf].index "\n"
-          lines = j[:stderr_buf].lines("\n").to_a
-          if j[:stderr_buf][-1] == "\n"
-            j[:stderr_buf] = ''
-          else
-            j[:stderr_buf] = lines.pop
-          end
+        if stderr_buf.index "\n" || j[:stderr_flushed_at] != Time.now.to_i
+        lines = stderr_buf.lines("\n").to_a
           lines.each do |line|
             $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
             $stderr.puts line
-            pub_msg = "#{Time.now.ctime.to_s} #{line.strip}"
-            $redis.publish job.owner_uuid, pub_msg
-            $redis.publish job_uuid, pub_msg
-            $redis.append job_uuid, pub_msg + "\n"
-            if LOG_BUFFER_SIZE < $redis.strlen(job_uuid)
-              $redis.set(job_uuid,
-                         $redis
-                           .getrange(job_uuid, (LOG_BUFFER_SIZE >> 1), -1)
-                           .sub(/^.*?\n/, ''))
-            end
+            log_msg = "#{Time.now.ctime.to_s} #{line.strip}"
+            j[:stderr_buf] << (log_msg + " \n")
+          end
 
-            if (TMP_LOG_BUFFER_SIZE < $tmp_log_buffer.size) || ($previous_tmp_log_at+1 < Time.now)
-              $tmp_log_buffer += (pub_msg + "\n")
-              write_log job_uuid
-            else 
-              $tmp_log_buffer += (pub_msg + "\n")
-            end
+          if (LOG_BUFFER_SIZE < j[:stderr_buf].size) || ((j[:stderr_flushed_at]+1) < Time.now.to_i)
+            write_log j
+            j[:stderr_flushed_at] = Time.now.to_i
           end
         end
       end
@@ -317,7 +295,7 @@ class Dispatcher
 
     # Ensure every last drop of stdout and stderr is consumed
     read_pipes
-    write_log job_done.uuid  # write any remaining logs
+    write_log j_done # write any remaining logs
 
     if j_done[:stderr_buf] and j_done[:stderr_buf] != ''
       $stderr.puts j_done[:stderr_buf] + "\n"
@@ -346,8 +324,6 @@ class Dispatcher
     # Invalidate the per-job auth token
     j_done[:job_auth].update_attributes expires_at: Time.now
 
-    $redis.publish job_done.uuid, "end"
-
     @running.delete job_done.uuid
   end
 
@@ -415,17 +391,22 @@ class Dispatcher
   end
 
   # send message to log table. we want these records to be transient
-  def write_log job_uuid
-    if $tmp_log_buffer == ''
-      return
+  def write_log running_job
+    begin
+      if (running_job && running_job[:stderr_buf] != '')
+        log = Log.new(object_uuid: running_job[:job].uuid,
+                      event_type: 'stderr',
+                      properties: {"text" => running_job[:stderr_buf]})
+        log.save!
+        running_job[:stderr_buf] = ''
+        running_job[:stderr_flushed_at] = Time.now.to_i
+      end
+    rescue
+      running_job[:stderr_buf] = 'Failed to write logs \n'
+      running_job[:stderr_flushed_at] = Time.now.to_i
     end
-    log = Log.new(object_uuid: job_uuid,
-                  event_type:'stderr',
-                  summary: $tmp_log_buffer)
-    log.save!
-    $tmp_log_buffer = ''
-    $previous_tmp_log_at = Time.now
   end
+
 end
 
 # This is how crunch-job child procs know where the "refresh" trigger file is