8019: rateLimit method signature
[arvados.git] / services / crunch-run / logging.go
index 5254ff671e6d3826356722259069515d450c1066..6e32d723e87d1e031dcaf94241358fc9c6e3a8e4 100644 (file)
@@ -6,6 +6,8 @@ import (
        "fmt"
        "io"
        "log"
+       "regexp"
+       "strings"
        "sync"
        "time"
 
@@ -176,6 +178,84 @@ type ArvLogWriter struct {
        UUID          string
        loggingStream string
        writeCloser   io.WriteCloser
+
+       // for rate limiting
+       bytesLogged                  int64
+       logThrottleResetTime         time.Time
+       logThrottleLinesSoFar        int64
+       logThrottleBytesSoFar        int64
+       logThrottleBytesSkipped      int64
+       logThrottleIsOpen            bool
+       logThrottlePartialLineLastAt time.Time
+       logThrottleFirstPartialLine  bool
+       bufToFlush                   bytes.Buffer
+       bufFlushedAt                 time.Time
+
+       // rate limiting config parameters
+       crunchLimitLogBytesPerJob          int64
+       crunchLogThrottleBytes             int64
+       crunchLogThrottlePeriod            int
+       crunchLogThrottleLines             int64
+       crunchLogPartialLineThrottlePeriod int
+       crunchLogBytesPerEvent             int64
+       crunchLogSecondsBetweenEvents      int
+}
+
+// NewArvLogWriter creates new ArvLogWriter and loads the rate limiting config params
+func NewArvLogWriter(clnt IArvadosClient, uuid string, ls string, wc io.WriteCloser) *ArvLogWriter {
+       w := &ArvLogWriter{ArvClient: clnt, UUID: uuid, loggingStream: ls, writeCloser: wc}
+
+       // load the rate limit discovery config paramters
+       param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
+       if err != nil {
+               w.crunchLimitLogBytesPerJob = 67108864
+       } else {
+               w.crunchLimitLogBytesPerJob = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogThrottleBytes")
+       if err != nil {
+               w.crunchLogThrottleBytes = 65536
+       } else {
+               w.crunchLogThrottleBytes = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogThrottlePeriod")
+       if err != nil {
+               w.crunchLogThrottlePeriod = 60
+       } else {
+               w.crunchLogThrottlePeriod = int(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogThrottleLines")
+       if err != nil {
+               w.crunchLogThrottleLines = 1024
+       } else {
+               w.crunchLogThrottleLines = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
+       if err != nil {
+               w.crunchLogPartialLineThrottlePeriod = 5
+       } else {
+               w.crunchLogPartialLineThrottlePeriod = int(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogBytesPerEvent")
+       if err != nil {
+               w.crunchLogBytesPerEvent = 4096
+       } else {
+               w.crunchLogBytesPerEvent = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
+       if err != nil {
+               w.crunchLogSecondsBetweenEvents = 1
+       } else {
+               w.crunchLogSecondsBetweenEvents = int(param.(float64))
+       }
+
+       return w
 }
 
 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
@@ -185,17 +265,63 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
                _, err1 = arvlog.writeCloser.Write(p)
        }
 
-       // write to API
-       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
-               "object_uuid": arvlog.UUID,
-               "event_type":  arvlog.loggingStream,
-               "properties":  map[string]string{"text": string(p)}}}
-       err2 := arvlog.ArvClient.Create("logs", lr, nil)
+       // write to API after checking rate limit
+       now := time.Now()
+       bytesWritten := 0
+
+       if now.After(arvlog.logThrottleResetTime) {
+               // It has been more than throttle_period seconds since the last
+               // checkpoint; so reset the throttle
+               if arvlog.logThrottleBytesSkipped > 0 {
+                       arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
+               }
+
+               arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
+               arvlog.logThrottleBytesSoFar = 0
+               arvlog.logThrottleLinesSoFar = 0
+               arvlog.logThrottleBytesSkipped = 0
+               arvlog.logThrottleIsOpen = true
+               arvlog.logThrottlePartialLineLastAt = time.Time{}
+               arvlog.logThrottleFirstPartialLine = true
+       }
+
+       lines := bytes.Split(p, []byte("\n"))
+
+       for _, line := range lines {
+               // Short circuit the counting code if we're just going to throw
+               // away the data anyway.
+               if !arvlog.logThrottleIsOpen {
+                       arvlog.logThrottleBytesSkipped += int64(len(line))
+                       continue
+               } else if len(line) == 0 {
+                       continue
+               }
+
+               // check rateLimit
+               logOpen, msg := arvlog.rateLimit(line, now)
+               arvlog.bufToFlush.WriteString(string(msg) + "\n")
+               arvlog.logThrottleIsOpen = logOpen
+       }
+
+       if int64(arvlog.bufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
+               (now.Sub(arvlog.bufFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
+               // write to API
+               lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+                       "object_uuid": arvlog.UUID,
+                       "event_type":  arvlog.loggingStream,
+                       "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
+               err2 := arvlog.ArvClient.Create("logs", lr, nil)
+
+               bytesWritten = arvlog.bufToFlush.Len()
+               arvlog.bufToFlush = bytes.Buffer{}
+               arvlog.bufFlushedAt = now
 
-       if err1 != nil || err2 != nil {
-               return 0, fmt.Errorf("%s ; %s", err1, err2)
+               if err1 != nil || err2 != nil {
+                       return 0, fmt.Errorf("%s ; %s", err1, err2)
+               }
        }
-       return len(p), nil
+
+       return bytesWritten, nil
 }
 
 // Close the underlying writer
@@ -206,3 +332,71 @@ func (arvlog *ArvLogWriter) Close() (err error) {
        }
        return err
 }
+
+var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
+
+// Test for hard cap on total output and for log throttling. Returns whether
+// the log line should go to output or not. Returns message if limit exceeded.
+func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
+       message := ""
+       lineSize := int64(len(line))
+       partialLine := false
+       skipCounts := false
+
+       if arvlog.logThrottleIsOpen {
+               matches := lineRegexp.FindStringSubmatch(string(line))
+
+               if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
+                       partialLine = true
+
+                       if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
+                               arvlog.logThrottlePartialLineLastAt = now
+                       } else {
+                               skipCounts = true
+                       }
+               }
+
+               if !skipCounts {
+                       arvlog.logThrottleLinesSoFar += 1
+                       arvlog.logThrottleBytesSoFar += lineSize
+                       arvlog.bytesLogged += lineSize
+               }
+
+               if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob {
+                       message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(now.UTC()), arvlog.crunchLimitLogBytesPerJob)
+                       arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
+                       arvlog.logThrottleIsOpen = false
+
+               } else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes {
+                       remainingTime := arvlog.logThrottleResetTime.Sub(now)
+                       message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+                       arvlog.logThrottleIsOpen = false
+
+               } else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines {
+                       remainingTime := arvlog.logThrottleResetTime.Sub(now)
+                       message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+                       arvlog.logThrottleIsOpen = false
+
+               } else if partialLine && arvlog.logThrottleFirstPartialLine {
+                       arvlog.logThrottleFirstPartialLine = false
+                       message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
+
+               }
+       }
+
+       if !arvlog.logThrottleIsOpen {
+               // Don't log anything if any limit has been exceeded. Just count lossage.
+               arvlog.logThrottleBytesSkipped += lineSize
+       }
+
+       if message != "" {
+               // Yes, write to logs, but use our "rate exceeded" message
+               // instead of the log message that exceeded the limit.
+               message += " A complete log is still being written to Keep, and will be available when the job finishes."
+               return true, []byte(message)
+       } else if partialLine {
+               return false, line
+       } else {
+               return arvlog.logThrottleIsOpen, line
+       }
+}