8019: partial line throttling etc
authorradhika <radhika@curoverse.com>
Thu, 27 Apr 2017 19:44:37 +0000 (15:44 -0400)
committerradhika <radhika@curoverse.com>
Thu, 27 Apr 2017 19:44:37 +0000 (15:44 -0400)
services/crunch-run/logging.go

index 383d8ad025d71b37135214b3b66e8f0cdd917ae1..45dfc2e3413e03507f94b3ec007aa84037e2c884 100644 (file)
@@ -38,7 +38,8 @@ type ThrottledLogger struct {
        flush   chan struct{}
        stopped chan struct{}
        Timestamper
-       Immediate *log.Logger
+       Immediate    *log.Logger
+       pendingFlush bool
 }
 
 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
@@ -60,6 +61,11 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
                tl.buf = &bytes.Buffer{}
        }
 
+       //if int64(tl.buf.Len()) >= crunchLogBytesPerEvent && !tl.pendingFlush {
+       //      tl.pendingFlush = true
+       //      tl.flush <- struct{}{}
+       //}
+
        now := tl.Timestamper(time.Now().UTC())
        sc := bufio.NewScanner(bytes.NewBuffer(p))
        for err == nil && sc.Scan() {
@@ -94,7 +100,8 @@ func (tl *ThrottledLogger) flusher() {
                var ready *bytes.Buffer
 
                tl.Mutex.Lock()
-               ready, tl.buf = tl.buf, nil
+               ready, tl.buf = tl.buf, &bytes.Buffer{}
+               tl.pendingFlush = false
                tl.Mutex.Unlock()
 
                if ready != nil && ready.Len() > 0 {
@@ -157,8 +164,8 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
 
 // NewThrottledLogger creates a new thottled logger that
 // (a) prepends timestamps to each line
-// (b) batches log messages and only calls the underlying Writer at most once
-// per second.
+// (b) batches log messages and only calls the underlying Writer
+//  at most once per "crunchLogSecondsBetweenEvents" seconds.
 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
        tl := &ThrottledLogger{}
        tl.flush = make(chan struct{}, 1)
@@ -242,8 +249,9 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 
                // check rateLimit
                logOpen, msg := arvlog.rateLimit(line, now)
-               arvlog.bufToFlush.WriteString(string(msg) + "\n")
-               arvlog.logThrottleIsOpen = logOpen
+               if logOpen {
+                       arvlog.bufToFlush.WriteString(string(msg) + "\n")
+               }
        }
 
        if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
@@ -284,26 +292,21 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
        message := ""
        lineSize := int64(len(line))
        partialLine := false
-       skipCounts := false
 
        if arvlog.logThrottleIsOpen {
                matches := lineRegexp.FindStringSubmatch(string(line))
 
                if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
                        partialLine = true
-
                        if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
                                arvlog.logThrottlePartialLineLastAt = now
-                       } else {
-                               skipCounts = true
+                               arvlog.logThrottleFirstPartialLine = true
                        }
                }
 
-               if !skipCounts {
-                       arvlog.logThrottleLinesSoFar += 1
-                       arvlog.logThrottleBytesSoFar += lineSize
-                       arvlog.bytesLogged += lineSize
-               }
+               arvlog.logThrottleLinesSoFar += 1
+               arvlog.logThrottleBytesSoFar += lineSize
+               arvlog.bytesLogged += lineSize
 
                if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
                        message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
@@ -341,8 +344,6 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
                // instead of the log message that exceeded the limit.
                message += " A complete log is still being written to Keep, and will be available when the job finishes."
                return true, []byte(message)
-       } else if partialLine {
-               return false, line
        } else {
                return arvlog.logThrottleIsOpen, line
        }