21640: Stop accidentally reducing NOFILE limit.
[arvados.git] / lib / crunchrun / logging.go
index 91a1b77cf4fa6ab2d22bf9b476be8a756ea6b0b1..04c37249697bf5cbb0b3fbe81dfa164fb835894d 100644 (file)
 package crunchrun
 
 import (
-       "bufio"
        "bytes"
        "encoding/json"
-       "fmt"
        "io"
        "log"
-       "regexp"
-       "strings"
-       "sync"
        "time"
-
-       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
 )
 
-// Timestamper is the signature for a function that takes a timestamp and
-// return a formated string value.
-type Timestamper func(t time.Time) string
-
-// Logging plumbing:
-//
-// ThrottledLogger.Logger -> ThrottledLogger.Write ->
-// ThrottledLogger.buf -> ThrottledLogger.flusher ->
-// ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
-//
-// For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
-// data from the stdout/stderr Reader and send to the Logger.
+// rfc3339NanoFixed is a fixed-width version of time.RFC3339Nano.
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
 
-// ThrottledLogger accepts writes, prepends a timestamp to each line of the
-// write, and periodically flushes to a downstream writer.  It supports the
-// "Logger" and "WriteCloser" interfaces.
-type ThrottledLogger struct {
-       *log.Logger
-       buf *bytes.Buffer
-       sync.Mutex
-       writer   io.WriteCloser
-       flush    chan struct{}
-       stopped  chan struct{}
-       stopping chan struct{}
-       Timestamper
-       Immediate    *log.Logger
-       pendingFlush bool
+// prefixer wraps an io.Writer, inserting a string returned by
+// prefixFunc at the beginning of each line.
+type prefixer struct {
+       writer     io.Writer
+       prefixFunc func() string
+       unfinished bool // true if the most recent write ended with a non-newline char
 }
 
-// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
-const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-
-// RFC3339Timestamp formats t as RFC3339NanoFixed.
-func RFC3339Timestamp(t time.Time) string {
-       return t.Format(RFC3339NanoFixed)
-}
-
-// Write prepends a timestamp to each line of the input data and
-// appends to the internal buffer. Each line is also logged to
-// tl.Immediate, if tl.Immediate is not nil.
-func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
-       tl.Mutex.Lock()
-       defer tl.Mutex.Unlock()
-
-       if tl.buf == nil {
-               tl.buf = &bytes.Buffer{}
-       }
-
-       now := tl.Timestamper(time.Now().UTC())
-       sc := bufio.NewScanner(bytes.NewBuffer(p))
-       for err == nil && sc.Scan() {
-               out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
-               if tl.Immediate != nil {
-                       tl.Immediate.Print(out[:len(out)-1])
-               }
-               _, err = io.WriteString(tl.buf, out)
-       }
-       if err == nil {
-               err = sc.Err()
-               if err == nil {
-                       n = len(p)
-               }
-       }
-
-       if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
-               // Non-blocking send.  Try send a flush if it is ready to
-               // accept it.  Otherwise do nothing because a flush is already
-               // pending.
-               select {
-               case tl.flush <- struct{}{}:
-               default:
-               }
+// newTimestamper wraps an io.Writer, inserting an RFC3339NanoFixed
+// timestamp at the beginning of each line.
+func newTimestamper(w io.Writer) *prefixer {
+       return &prefixer{
+               writer:     w,
+               prefixFunc: func() string { return time.Now().UTC().Format(rfc3339NanoFixed + " ") },
        }
-
-       return
 }
 
-// Periodically check the current buffer; if not empty, send it on the
-// channel to the goWriter goroutine.
-func (tl *ThrottledLogger) flusher() {
-       ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
-       defer ticker.Stop()
-       for stopping := false; !stopping; {
-               select {
-               case <-tl.stopping:
-                       // flush tl.buf and exit the loop
-                       stopping = true
-               case <-tl.flush:
-               case <-ticker.C:
-               }
-
-               var ready *bytes.Buffer
-
-               tl.Mutex.Lock()
-               ready, tl.buf = tl.buf, &bytes.Buffer{}
-               tl.Mutex.Unlock()
-
-               if ready != nil && ready.Len() > 0 {
-                       tl.writer.Write(ready.Bytes())
-               }
-       }
-       close(tl.stopped)
-}
-
-// Close the flusher goroutine and wait for it to complete, then close the
-// underlying Writer.
-func (tl *ThrottledLogger) Close() error {
-       select {
-       case <-tl.stopping:
-               // already stopped
-       default:
-               close(tl.stopping)
+// newStringPrefixer wraps an io.Writer, inserting the given string at
+// the beginning of each line. The given string should include a
+// trailing space for readability.
+func newStringPrefixer(w io.Writer, s string) *prefixer {
+       return &prefixer{
+               writer:     w,
+               prefixFunc: func() string { return s },
        }
-       <-tl.stopped
-       return tl.writer.Close()
 }
 
-const (
-       // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
-       MaxLogLine = 1 << 12
-)
-
-// ReadWriteLines reads lines from a reader and writes to a Writer, with long
-// line splitting.
-func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
-       reader := bufio.NewReaderSize(in, MaxLogLine)
-       var prefix string
-       for {
-               line, isPrefix, err := reader.ReadLine()
-               if err == io.EOF {
-                       break
-               } else if err != nil {
-                       writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
-               }
-               var suffix string
-               if isPrefix {
-                       suffix = "[...]\n"
-               }
-
-               if prefix == "" && suffix == "" {
-                       writer.Write(line)
-               } else {
-                       writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
+func (tp *prefixer) Write(p []byte) (n int, err error) {
+       for len(p) > 0 && err == nil {
+               if !tp.unfinished {
+                       _, err = io.WriteString(tp.writer, tp.prefixFunc())
+                       if err != nil {
+                               return
+                       }
                }
-
-               // Set up prefix for following line
-               if isPrefix {
-                       prefix = "[...]"
+               newline := bytes.IndexRune(p, '\n')
+               var nn int
+               if newline < 0 {
+                       tp.unfinished = true
+                       nn, err = tp.writer.Write(p)
+                       p = nil
                } else {
-                       prefix = ""
+                       tp.unfinished = false
+                       nn, err = tp.writer.Write(p[:newline+1])
+                       p = p[nn:]
                }
+               n += nn
        }
-       done <- true
-}
-
-// NewThrottledLogger creates a new thottled logger that
-//   - prepends timestamps to each line, and
-//   - batches log messages and only calls the underlying Writer
-//     at most once per "crunchLogSecondsBetweenEvents" seconds.
-func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
-       tl := &ThrottledLogger{}
-       tl.flush = make(chan struct{}, 1)
-       tl.stopped = make(chan struct{})
-       tl.stopping = make(chan struct{})
-       tl.writer = writer
-       tl.Logger = log.New(tl, "", 0)
-       tl.Timestamper = RFC3339Timestamp
-       go tl.flusher()
-       return tl
-}
-
-// Log throttling rate limiting config parameters
-var crunchLimitLogBytesPerJob int64 = 67108864
-var crunchLogThrottleBytes int64 = 65536
-var crunchLogThrottlePeriod time.Duration = time.Second * 60
-var crunchLogThrottleLines int64 = 1024
-var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
-var crunchLogBytesPerEvent int64 = 4096
-var crunchLogSecondsBetweenEvents = time.Second
-var crunchLogUpdatePeriod = time.Hour / 2
-var crunchLogUpdateSize = int64(1 << 25)
-
-// ArvLogWriter is an io.WriteCloser that processes each write by
-// writing it through to another io.WriteCloser (typically a
-// CollectionFileWriter) and creating an Arvados log entry.
-type ArvLogWriter struct {
-       ArvClient     IArvadosClient
-       UUID          string
-       loggingStream string
-       writeCloser   io.WriteCloser
-
-       // for rate limiting
-       bytesLogged                  int64
-       logThrottleResetTime         time.Time
-       logThrottleLinesSoFar        int64
-       logThrottleBytesSoFar        int64
-       logThrottleBytesSkipped      int64
-       logThrottleIsOpen            bool
-       logThrottlePartialLineNextAt time.Time
-       logThrottleFirstPartialLine  bool
-       bufToFlush                   bytes.Buffer
-       bufFlushedAt                 time.Time
-       closing                      bool
+       return
 }
 
-func (arvlog *ArvLogWriter) Write(p []byte) (int, error) {
-       // Write to the next writer in the chain (a file in Keep)
-       var err1 error
-       if arvlog.writeCloser != nil {
-               _, err1 = arvlog.writeCloser.Write(p)
-       }
-
-       // write to API after checking rate limit
-       now := time.Now()
-
-       if now.After(arvlog.logThrottleResetTime) {
-               // It has been more than throttle_period seconds since the last
-               // checkpoint; so reset the throttle
-               if arvlog.logThrottleBytesSkipped > 0 {
-                       arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
-               }
-
-               arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
-               arvlog.logThrottleBytesSoFar = 0
-               arvlog.logThrottleLinesSoFar = 0
-               arvlog.logThrottleBytesSkipped = 0
-               arvlog.logThrottleIsOpen = true
-       }
-
-       lines := bytes.Split(p, []byte("\n"))
-
-       for _, line := range lines {
-               // Short circuit the counting code if we're just going to throw
-               // away the data anyway.
-               if !arvlog.logThrottleIsOpen {
-                       arvlog.logThrottleBytesSkipped += int64(len(line))
-                       continue
-               } else if len(line) == 0 {
-                       continue
-               }
-
-               // check rateLimit
-               logOpen, msg := arvlog.rateLimit(line, now)
-               if logOpen {
-                       arvlog.bufToFlush.WriteString(string(msg) + "\n")
-               }
-       }
-
-       if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
-               (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
-               arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
-               // write to API
-               lr := arvadosclient.Dict{"log": arvadosclient.Dict{
-                       "object_uuid": arvlog.UUID,
-                       "event_type":  arvlog.loggingStream,
-                       "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
-               err2 := arvlog.ArvClient.Create("logs", lr, nil)
-
-               arvlog.bufToFlush = bytes.Buffer{}
-               arvlog.bufFlushedAt = now
-
-               if err1 != nil || err2 != nil {
-                       return 0, fmt.Errorf("%s ; %s", err1, err2)
-               }
-       }
-
-       return len(p), nil
+// logWriter adds log.Logger methods to an io.Writer.
+type logWriter struct {
+       io.Writer
+       *log.Logger
 }
 
-// Close the underlying writer
-func (arvlog *ArvLogWriter) Close() (err error) {
-       arvlog.closing = true
-       arvlog.Write([]byte{})
-       if arvlog.writeCloser != nil {
-               err = arvlog.writeCloser.Close()
-               arvlog.writeCloser = nil
+func newLogWriter(w io.Writer) *logWriter {
+       return &logWriter{
+               Writer: w,
+               Logger: log.New(w, "", 0),
        }
-       return err
 }
 
-var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
-
-// Test for hard cap on total output and for log throttling. Returns whether
-// the log line should go to output or not. Returns message if limit exceeded.
-func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
-       message := ""
-       lineSize := int64(len(line))
-
-       if arvlog.logThrottleIsOpen {
-               matches := lineRegexp.FindStringSubmatch(string(line))
-
-               if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
-                       // This is a partial line.
-
-                       if arvlog.logThrottleFirstPartialLine {
-                               // Partial should be suppressed.  First time this is happening for this line so provide a message instead.
-                               arvlog.logThrottleFirstPartialLine = false
-                               arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
-                               arvlog.logThrottleBytesSkipped += lineSize
-                               return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
-                                       RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
-                       } else if now.After(arvlog.logThrottlePartialLineNextAt) {
-                               // The throttle period has passed.  Update timestamp and let it through.
-                               arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
-                       } else {
-                               // Suppress line.
-                               arvlog.logThrottleBytesSkipped += lineSize
-                               return false, line
-                       }
-               } else {
-                       // Not a partial line so reset.
-                       arvlog.logThrottlePartialLineNextAt = time.Time{}
-                       arvlog.logThrottleFirstPartialLine = true
-               }
-
-               arvlog.bytesLogged += lineSize
-               arvlog.logThrottleBytesSoFar += lineSize
-               arvlog.logThrottleLinesSoFar++
-
-               if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
-                       message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
-                               RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
-                       arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
-                       arvlog.logThrottleIsOpen = false
-
-               } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
-                       remainingTime := arvlog.logThrottleResetTime.Sub(now)
-                       message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
-                               RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
-                       arvlog.logThrottleIsOpen = false
-
-               } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
-                       remainingTime := arvlog.logThrottleResetTime.Sub(now)
-                       message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
-                               RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
-                       arvlog.logThrottleIsOpen = false
-
-               }
-       }
-
-       if !arvlog.logThrottleIsOpen {
-               // Don't log anything if any limit has been exceeded. Just count lossage.
-               arvlog.logThrottleBytesSkipped += lineSize
-       }
-
-       if message != "" {
-               // Yes, write to logs, but use our "rate exceeded" message
-               // instead of the log message that exceeded the limit.
-               message += " A complete log is still being written to Keep, and will be available when the job finishes."
-               return true, []byte(message)
-       }
-       return arvlog.logThrottleIsOpen, line
-}
+var crunchLogUpdatePeriod = time.Hour / 2
+var crunchLogUpdateSize = int64(1 << 25)
 
 // load the rate limit discovery config parameters
 func loadLogThrottleParams(clnt IArvadosClient) {
@@ -394,13 +103,6 @@ func loadLogThrottleParams(clnt IArvadosClient) {
                }
        }
 
-       loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
-       loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
-       loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
-       loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
-       loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
-       loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
-       loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
        loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
        loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")