8019: rateLimit method signature
authorradhika <radhika@curoverse.com>
Tue, 25 Apr 2017 19:05:31 +0000 (15:05 -0400)
committerradhika <radhika@curoverse.com>
Tue, 25 Apr 2017 19:05:31 +0000 (15:05 -0400)
services/crunch-run/logging.go

index 22ba130f24bb2edd47239324ae7e1e902059d6b0..6e32d723e87d1e031dcaf94241358fc9c6e3a8e4 100644 (file)
@@ -188,8 +188,8 @@ type ArvLogWriter struct {
        logThrottleIsOpen            bool
        logThrottlePartialLineLastAt time.Time
        logThrottleFirstPartialLine  bool
-       stderrBufToFlush             bytes.Buffer
-       stderrFlushedAt              time.Time
+       bufToFlush                   bytes.Buffer
+       bufFlushedAt                 time.Time
 
        // rate limiting config parameters
        crunchLimitLogBytesPerJob          int64
@@ -273,10 +273,10 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
                // It has been more than throttle_period seconds since the last
                // checkpoint; so reset the throttle
                if arvlog.logThrottleBytesSkipped > 0 {
-                       arvlog.stderrBufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(time.Now().UTC()), arvlog.logThrottleBytesSkipped))
+                       arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
                }
 
-               arvlog.logThrottleResetTime = time.Now().Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
+               arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
                arvlog.logThrottleBytesSoFar = 0
                arvlog.logThrottleLinesSoFar = 0
                arvlog.logThrottleBytesSkipped = 0
@@ -298,25 +298,23 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
                }
 
                // check rateLimit
-               _, msg, err2 := arvlog.rateLimit(line)
-               if err2 != nil {
-                       return 0, fmt.Errorf("%s ; %s", err1, err2)
-               }
-               arvlog.stderrBufToFlush.WriteString(string(msg) + "\n")
+               logOpen, msg := arvlog.rateLimit(line, now)
+               arvlog.bufToFlush.WriteString(string(msg) + "\n")
+               arvlog.logThrottleIsOpen = logOpen
        }
 
-       if int64(arvlog.stderrBufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
-               (time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
+       if int64(arvlog.bufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
+               (now.Sub(arvlog.bufFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
                // write to API
                lr := arvadosclient.Dict{"log": arvadosclient.Dict{
                        "object_uuid": arvlog.UUID,
                        "event_type":  arvlog.loggingStream,
-                       "properties":  map[string]string{"text": arvlog.stderrBufToFlush.String()}}}
+                       "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
                err2 := arvlog.ArvClient.Create("logs", lr, nil)
 
-               bytesWritten = arvlog.stderrBufToFlush.Len()
-               arvlog.stderrBufToFlush = bytes.Buffer{}
-               arvlog.stderrFlushedAt = time.Now()
+               bytesWritten = arvlog.bufToFlush.Len()
+               arvlog.bufToFlush = bytes.Buffer{}
+               arvlog.bufFlushedAt = now
 
                if err1 != nil || err2 != nil {
                        return 0, fmt.Errorf("%s ; %s", err1, err2)
@@ -335,11 +333,11 @@ func (arvlog *ArvLogWriter) Close() (err error) {
        return err
 }
 
-var lineRegexp = regexp.MustCompile(`^\S+ \S+ \d+ \d+ stderr (.*)`)
+var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
 
 // Test for hard cap on total output and for log throttling. Returns whether
 // the log line should go to output or not. Returns message if limit exceeded.
-func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
+func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
        message := ""
        lineSize := int64(len(line))
        partialLine := false
@@ -351,8 +349,8 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
                if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
                        partialLine = true
 
-                       if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
-                               arvlog.logThrottlePartialLineLastAt = time.Now()
+                       if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
+                               arvlog.logThrottlePartialLineLastAt = now
                        } else {
                                skipCounts = true
                        }
@@ -365,23 +363,23 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
                }
 
                if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob {
-                       message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLimitLogBytesPerJob)
-                       arvlog.logThrottleResetTime = time.Now().Add(time.Duration(365 * 24 * time.Hour))
+                       message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(now.UTC()), arvlog.crunchLimitLogBytesPerJob)
+                       arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
                        arvlog.logThrottleIsOpen = false
 
                } else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes {
-                       remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
-                       message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+                       remainingTime := arvlog.logThrottleResetTime.Sub(now)
+                       message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
                        arvlog.logThrottleIsOpen = false
 
                } else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines {
-                       remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
-                       message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+                       remainingTime := arvlog.logThrottleResetTime.Sub(now)
+                       message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
                        arvlog.logThrottleIsOpen = false
 
                } else if partialLine && arvlog.logThrottleFirstPartialLine {
                        arvlog.logThrottleFirstPartialLine = false
-                       message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
+                       message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
 
                }
        }
@@ -395,10 +393,10 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
                // Yes, write to logs, but use our "rate exceeded" message
                // instead of the log message that exceeded the limit.
                message += " A complete log is still being written to Keep, and will be available when the job finishes."
-               return true, []byte(message), nil
+               return true, []byte(message)
        } else if partialLine {
-               return false, line, nil
+               return false, line
        } else {
-               return arvlog.logThrottleIsOpen, line, nil
+               return arvlog.logThrottleIsOpen, line
        }
 }