8019: Rework partial line throttling. Fix sending flush when buffer is ready
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 28 Apr 2017 18:50:38 +0000 (14:50 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 28 Apr 2017 18:50:38 +0000 (14:50 -0400)
so it does not block.

services/crunch-run/logging.go

index 45dfc2e3413e03507f94b3ec007aa84037e2c884..cdf2d6e68b2cb4ae95cc08bfcd553d5689de47a9 100644 (file)
@@ -61,11 +61,6 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
                tl.buf = &bytes.Buffer{}
        }
 
-       //if int64(tl.buf.Len()) >= crunchLogBytesPerEvent && !tl.pendingFlush {
-       //      tl.pendingFlush = true
-       //      tl.flush <- struct{}{}
-       //}
-
        now := tl.Timestamper(time.Now().UTC())
        sc := bufio.NewScanner(bytes.NewBuffer(p))
        for err == nil && sc.Scan() {
@@ -81,6 +76,17 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
                        n = len(p)
                }
        }
+
+       if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
+               // Non-blocking send.  Try send a flush if it is ready to
+               // accept it.  Otherwise do nothing because a flush is already
+               // pending.
+               select {
+               case tl.flush <- struct{}{}:
+               default:
+               }
+       }
+
        return
 }
 
@@ -92,7 +98,7 @@ func (tl *ThrottledLogger) flusher() {
        for stopping := false; !stopping; {
                select {
                case _, open := <-tl.flush:
-                       // if !open, flush tl.buf and exit the loop
+                       // if !open, will flush tl.buf and exit the loop
                        stopping = !open
                case <-ticker.C:
                }
@@ -101,7 +107,6 @@ func (tl *ThrottledLogger) flusher() {
 
                tl.Mutex.Lock()
                ready, tl.buf = tl.buf, &bytes.Buffer{}
-               tl.pendingFlush = false
                tl.Mutex.Unlock()
 
                if ready != nil && ready.Len() > 0 {
@@ -178,13 +183,13 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
 }
 
 // Log throttling rate limiting config parameters
-var crunchLimitLogBytesPerJob int64
-var crunchLogThrottleBytes int64
-var crunchLogThrottlePeriod int
-var crunchLogThrottleLines int64
-var crunchLogPartialLineThrottlePeriod int
-var crunchLogBytesPerEvent int64
-var crunchLogSecondsBetweenEvents int
+var crunchLimitLogBytesPerJob int64 = 67108864
+var crunchLogThrottleBytes int64 = 65536
+var crunchLogThrottlePeriod time.Duration = time.Second * 60
+var crunchLogThrottleLines int64 = 1024
+var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
+var crunchLogBytesPerEvent int64 = 4096
+var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
 
 // ArvLogWriter is an io.WriteCloser that processes each write by
 // writing it through to another io.WriteCloser (typically a
@@ -202,7 +207,7 @@ type ArvLogWriter struct {
        logThrottleBytesSoFar        int64
        logThrottleBytesSkipped      int64
        logThrottleIsOpen            bool
-       logThrottlePartialLineLastAt time.Time
+       logThrottlePartialLineNextAt time.Time
        logThrottleFirstPartialLine  bool
        bufToFlush                   bytes.Buffer
        bufFlushedAt                 time.Time
@@ -226,13 +231,11 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
                        arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
                }
 
-               arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(crunchLogThrottlePeriod))
+               arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
                arvlog.logThrottleBytesSoFar = 0
                arvlog.logThrottleLinesSoFar = 0
                arvlog.logThrottleBytesSkipped = 0
                arvlog.logThrottleIsOpen = true
-               arvlog.logThrottlePartialLineLastAt = time.Time{}
-               arvlog.logThrottleFirstPartialLine = true
        }
 
        lines := bytes.Split(p, []byte("\n"))
@@ -255,7 +258,7 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
        }
 
        if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
-               (now.Sub(arvlog.bufFlushedAt) >= time.Duration(crunchLogSecondsBetweenEvents)) {
+               (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) {
                // write to API
                lr := arvadosclient.Dict{"log": arvadosclient.Dict{
                        "object_uuid": arvlog.UUID,
@@ -291,22 +294,37 @@ var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
        message := ""
        lineSize := int64(len(line))
-       partialLine := false
 
        if arvlog.logThrottleIsOpen {
                matches := lineRegexp.FindStringSubmatch(string(line))
 
                if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
-                       partialLine = true
-                       if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
-                               arvlog.logThrottlePartialLineLastAt = now
-                               arvlog.logThrottleFirstPartialLine = true
+                       // This is a partial line.
+
+                       if arvlog.logThrottleFirstPartialLine {
+                               // Partial should be suppressed.  First time this is happening for this line so provide a message instead.
+                               arvlog.logThrottleFirstPartialLine = false
+                               arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
+                               arvlog.logThrottleBytesSkipped += lineSize
+                               return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
+                                       RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
+                       } else if now.After(arvlog.logThrottlePartialLineNextAt) {
+                               // The throttle period has passed.  Update timestamp and let it through.
+                               arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
+                       } else {
+                               // Suppress line.
+                               arvlog.logThrottleBytesSkipped += lineSize
+                               return false, line
                        }
+               } else {
+                       // Not a partial line so reset.
+                       arvlog.logThrottlePartialLineNextAt = time.Time{}
+                       arvlog.logThrottleFirstPartialLine = true
                }
 
-               arvlog.logThrottleLinesSoFar += 1
-               arvlog.logThrottleBytesSoFar += lineSize
                arvlog.bytesLogged += lineSize
+               arvlog.logThrottleBytesSoFar += lineSize
+               arvlog.logThrottleLinesSoFar += 1
 
                if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
                        message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
@@ -317,20 +335,15 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
                } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
                        remainingTime := arvlog.logThrottleResetTime.Sub(now)
                        message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
-                               RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod, remainingTime/time.Second)
+                               RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
                        arvlog.logThrottleIsOpen = false
 
                } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
                        remainingTime := arvlog.logThrottleResetTime.Sub(now)
                        message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
-                               RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod, remainingTime/time.Second)
+                               RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
                        arvlog.logThrottleIsOpen = false
 
-               } else if partialLine && arvlog.logThrottleFirstPartialLine {
-                       arvlog.logThrottleFirstPartialLine = false
-                       message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
-                               RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod)
-
                }
        }
 
@@ -352,51 +365,37 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
 // load the rate limit discovery config paramters
 func loadLogThrottleParams(clnt IArvadosClient) {
        param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
-       if err != nil {
-               crunchLimitLogBytesPerJob = 67108864
-       } else {
+       if err == nil {
                crunchLimitLogBytesPerJob = int64(param.(float64))
        }
 
        param, err = clnt.Discovery("crunchLogThrottleBytes")
-       if err != nil {
-               crunchLogThrottleBytes = 65536
-       } else {
+       if err == nil {
                crunchLogThrottleBytes = int64(param.(float64))
        }
 
        param, err = clnt.Discovery("crunchLogThrottlePeriod")
-       if err != nil {
-               crunchLogThrottlePeriod = 60
-       } else {
-               crunchLogThrottlePeriod = int(param.(float64))
+       if err == nil {
+               crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
        }
 
        param, err = clnt.Discovery("crunchLogThrottleLines")
-       if err != nil {
-               crunchLogThrottleLines = 1024
-       } else {
+       if err == nil {
                crunchLogThrottleLines = int64(param.(float64))
        }
 
        param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
-       if err != nil {
-               crunchLogPartialLineThrottlePeriod = 5
-       } else {
-               crunchLogPartialLineThrottlePeriod = int(param.(float64))
+       if err == nil {
+               crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
        }
 
        param, err = clnt.Discovery("crunchLogBytesPerEvent")
-       if err != nil {
-               crunchLogBytesPerEvent = 4096
-       } else {
+       if err == nil {
                crunchLogBytesPerEvent = int64(param.(float64))
        }
 
        param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
-       if err != nil {
-               crunchLogSecondsBetweenEvents = 1
-       } else {
-               crunchLogSecondsBetweenEvents = int(param.(float64))
+       if err == nil {
+               crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
        }
 }