Fix typo.
[arvados.git] / services / crunch-run / logging.go
index 22ba130f24bb2edd47239324ae7e1e902059d6b0..0083f0999ce7f27a4c50e94729e3dbd344f89a4e 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
@@ -35,10 +39,12 @@ type ThrottledLogger struct {
        buf *bytes.Buffer
        sync.Mutex
        writer   io.WriteCloser
-       stopping chan struct{}
+       flush    chan struct{}
        stopped  chan struct{}
+       stopping chan struct{}
        Timestamper
-       Immediate *log.Logger
+       Immediate    *log.Logger
+       pendingFlush bool
 }
 
 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
@@ -75,26 +81,38 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
                        n = len(p)
                }
        }
+
+       if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
+               // Non-blocking send.  Try send a flush if it is ready to
+               // accept it.  Otherwise do nothing because a flush is already
+               // pending.
+               select {
+               case tl.flush <- struct{}{}:
+               default:
+               }
+       }
+
        return
 }
 
 // Periodically check the current buffer; if not empty, send it on the
 // channel to the goWriter goroutine.
 func (tl *ThrottledLogger) flusher() {
-       ticker := time.NewTicker(time.Second)
+       ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
        defer ticker.Stop()
        for stopping := false; !stopping; {
                select {
                case <-tl.stopping:
-                       // flush tl.buf, then exit the loop
+                       // flush tl.buf and exit the loop
                        stopping = true
+               case <-tl.flush:
                case <-ticker.C:
                }
 
                var ready *bytes.Buffer
 
                tl.Mutex.Lock()
-               ready, tl.buf = tl.buf, nil
+               ready, tl.buf = tl.buf, &bytes.Buffer{}
                tl.Mutex.Unlock()
 
                if ready != nil && ready.Len() > 0 {
@@ -157,12 +175,13 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
 
 // NewThrottledLogger creates a new thottled logger that
 // (a) prepends timestamps to each line
-// (b) batches log messages and only calls the underlying Writer at most once
-// per second.
+// (b) batches log messages and only calls the underlying Writer
+//  at most once per "crunchLogSecondsBetweenEvents" seconds.
 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
        tl := &ThrottledLogger{}
-       tl.stopping = make(chan struct{})
+       tl.flush = make(chan struct{}, 1)
        tl.stopped = make(chan struct{})
+       tl.stopping = make(chan struct{})
        tl.writer = writer
        tl.Logger = log.New(tl, "", 0)
        tl.Timestamper = RFC3339Timestamp
@@ -170,6 +189,15 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
        return tl
 }
 
+// Log throttling rate limiting config parameters
+var crunchLimitLogBytesPerJob int64 = 67108864
+var crunchLogThrottleBytes int64 = 65536
+var crunchLogThrottlePeriod time.Duration = time.Second * 60
+var crunchLogThrottleLines int64 = 1024
+var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
+var crunchLogBytesPerEvent int64 = 4096
+var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
+
 // ArvLogWriter is an io.WriteCloser that processes each write by
 // writing it through to another io.WriteCloser (typically a
 // CollectionFileWriter) and creating an Arvados log entry.
@@ -186,76 +214,11 @@ type ArvLogWriter struct {
        logThrottleBytesSoFar        int64
        logThrottleBytesSkipped      int64
        logThrottleIsOpen            bool
-       logThrottlePartialLineLastAt time.Time
+       logThrottlePartialLineNextAt time.Time
        logThrottleFirstPartialLine  bool
-       stderrBufToFlush             bytes.Buffer
-       stderrFlushedAt              time.Time
-
-       // rate limiting config parameters
-       crunchLimitLogBytesPerJob          int64
-       crunchLogThrottleBytes             int64
-       crunchLogThrottlePeriod            int
-       crunchLogThrottleLines             int64
-       crunchLogPartialLineThrottlePeriod int
-       crunchLogBytesPerEvent             int64
-       crunchLogSecondsBetweenEvents      int
-}
-
-// NewArvLogWriter creates new ArvLogWriter and loads the rate limiting config params
-func NewArvLogWriter(clnt IArvadosClient, uuid string, ls string, wc io.WriteCloser) *ArvLogWriter {
-       w := &ArvLogWriter{ArvClient: clnt, UUID: uuid, loggingStream: ls, writeCloser: wc}
-
-       // load the rate limit discovery config paramters
-       param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
-       if err != nil {
-               w.crunchLimitLogBytesPerJob = 67108864
-       } else {
-               w.crunchLimitLogBytesPerJob = int64(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogThrottleBytes")
-       if err != nil {
-               w.crunchLogThrottleBytes = 65536
-       } else {
-               w.crunchLogThrottleBytes = int64(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogThrottlePeriod")
-       if err != nil {
-               w.crunchLogThrottlePeriod = 60
-       } else {
-               w.crunchLogThrottlePeriod = int(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogThrottleLines")
-       if err != nil {
-               w.crunchLogThrottleLines = 1024
-       } else {
-               w.crunchLogThrottleLines = int64(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
-       if err != nil {
-               w.crunchLogPartialLineThrottlePeriod = 5
-       } else {
-               w.crunchLogPartialLineThrottlePeriod = int(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogBytesPerEvent")
-       if err != nil {
-               w.crunchLogBytesPerEvent = 4096
-       } else {
-               w.crunchLogBytesPerEvent = int64(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
-       if err != nil {
-               w.crunchLogSecondsBetweenEvents = 1
-       } else {
-               w.crunchLogSecondsBetweenEvents = int(param.(float64))
-       }
-
-       return w
+       bufToFlush                   bytes.Buffer
+       bufFlushedAt                 time.Time
+       closing                      bool
 }
 
 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
@@ -273,16 +236,14 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
                // It has been more than throttle_period seconds since the last
                // checkpoint; so reset the throttle
                if arvlog.logThrottleBytesSkipped > 0 {
-                       arvlog.stderrBufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(time.Now().UTC()), arvlog.logThrottleBytesSkipped))
+                       arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
                }
 
-               arvlog.logThrottleResetTime = time.Now().Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
+               arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
                arvlog.logThrottleBytesSoFar = 0
                arvlog.logThrottleLinesSoFar = 0
                arvlog.logThrottleBytesSkipped = 0
                arvlog.logThrottleIsOpen = true
-               arvlog.logThrottlePartialLineLastAt = time.Time{}
-               arvlog.logThrottleFirstPartialLine = true
        }
 
        lines := bytes.Split(p, []byte("\n"))
@@ -298,25 +259,25 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
                }
 
                // check rateLimit
-               _, msg, err2 := arvlog.rateLimit(line)
-               if err2 != nil {
-                       return 0, fmt.Errorf("%s ; %s", err1, err2)
+               logOpen, msg := arvlog.rateLimit(line, now)
+               if logOpen {
+                       arvlog.bufToFlush.WriteString(string(msg) + "\n")
                }
-               arvlog.stderrBufToFlush.WriteString(string(msg) + "\n")
        }
 
-       if int64(arvlog.stderrBufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
-               (time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
+       if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
+               (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
+               arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
                // write to API
                lr := arvadosclient.Dict{"log": arvadosclient.Dict{
                        "object_uuid": arvlog.UUID,
                        "event_type":  arvlog.loggingStream,
-                       "properties":  map[string]string{"text": arvlog.stderrBufToFlush.String()}}}
+                       "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
                err2 := arvlog.ArvClient.Create("logs", lr, nil)
 
-               bytesWritten = arvlog.stderrBufToFlush.Len()
-               arvlog.stderrBufToFlush = bytes.Buffer{}
-               arvlog.stderrFlushedAt = time.Now()
+               bytesWritten = arvlog.bufToFlush.Len()
+               arvlog.bufToFlush = bytes.Buffer{}
+               arvlog.bufFlushedAt = now
 
                if err1 != nil || err2 != nil {
                        return 0, fmt.Errorf("%s ; %s", err1, err2)
@@ -328,6 +289,8 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 
 // Close the underlying writer
 func (arvlog *ArvLogWriter) Close() (err error) {
+       arvlog.closing = true
+       arvlog.Write([]byte{})
        if arvlog.writeCloser != nil {
                err = arvlog.writeCloser.Close()
                arvlog.writeCloser = nil
@@ -335,54 +298,63 @@ func (arvlog *ArvLogWriter) Close() (err error) {
        return err
 }
 
-var lineRegexp = regexp.MustCompile(`^\S+ \S+ \d+ \d+ stderr (.*)`)
+var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
 
 // Test for hard cap on total output and for log throttling. Returns whether
 // the log line should go to output or not. Returns message if limit exceeded.
-func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
+func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
        message := ""
        lineSize := int64(len(line))
-       partialLine := false
-       skipCounts := false
 
        if arvlog.logThrottleIsOpen {
                matches := lineRegexp.FindStringSubmatch(string(line))
 
                if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
-                       partialLine = true
-
-                       if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
-                               arvlog.logThrottlePartialLineLastAt = time.Now()
+                       // This is a partial line.
+
+                       if arvlog.logThrottleFirstPartialLine {
+                               // Partial should be suppressed.  First time this is happening for this line so provide a message instead.
+                               arvlog.logThrottleFirstPartialLine = false
+                               arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
+                               arvlog.logThrottleBytesSkipped += lineSize
+                               return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
+                                       RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
+                       } else if now.After(arvlog.logThrottlePartialLineNextAt) {
+                               // The throttle period has passed.  Update timestamp and let it through.
+                               arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
                        } else {
-                               skipCounts = true
+                               // Suppress line.
+                               arvlog.logThrottleBytesSkipped += lineSize
+                               return false, line
                        }
+               } else {
+                       // Not a partial line so reset.
+                       arvlog.logThrottlePartialLineNextAt = time.Time{}
+                       arvlog.logThrottleFirstPartialLine = true
                }
 
-               if !skipCounts {
-                       arvlog.logThrottleLinesSoFar += 1
-                       arvlog.logThrottleBytesSoFar += lineSize
-                       arvlog.bytesLogged += lineSize
-               }
+               arvlog.bytesLogged += lineSize
+               arvlog.logThrottleBytesSoFar += lineSize
+               arvlog.logThrottleLinesSoFar += 1
 
-               if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob {
-                       message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLimitLogBytesPerJob)
-                       arvlog.logThrottleResetTime = time.Now().Add(time.Duration(365 * 24 * time.Hour))
+               if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
+                       message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
+                               RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
+                       arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
                        arvlog.logThrottleIsOpen = false
 
-               } else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes {
-                       remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
-                       message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+               } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
+                       remainingTime := arvlog.logThrottleResetTime.Sub(now)
+                       message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
+                               RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
                        arvlog.logThrottleIsOpen = false
 
-               } else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines {
-                       remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
-                       message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+               } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
+                       remainingTime := arvlog.logThrottleResetTime.Sub(now)
+                       message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
+                               RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
                        arvlog.logThrottleIsOpen = false
 
-               } else if partialLine && arvlog.logThrottleFirstPartialLine {
-                       arvlog.logThrottleFirstPartialLine = false
-                       message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
-
                }
        }
 
@@ -395,10 +367,46 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
                // Yes, write to logs, but use our "rate exceeded" message
                // instead of the log message that exceeded the limit.
                message += " A complete log is still being written to Keep, and will be available when the job finishes."
-               return true, []byte(message), nil
-       } else if partialLine {
-               return false, line, nil
+               return true, []byte(message)
        } else {
-               return arvlog.logThrottleIsOpen, line, nil
+               return arvlog.logThrottleIsOpen, line
+       }
+}
+
+// load the rate limit discovery config parameters
+func loadLogThrottleParams(clnt IArvadosClient) {
+       param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
+       if err == nil {
+               crunchLimitLogBytesPerJob = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogThrottleBytes")
+       if err == nil {
+               crunchLogThrottleBytes = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogThrottlePeriod")
+       if err == nil {
+               crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogThrottleLines")
+       if err == nil {
+               crunchLogThrottleLines = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
+       if err == nil {
+               crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogBytesPerEvent")
+       if err == nil {
+               crunchLogBytesPerEvent = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
+       if err == nil {
+               crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
        }
 }