X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/19ae770973482257117fe8ded5619c3018c4b60f..1e31815d4a0d094633d4acb4f6265d6b8b6e3246:/services/crunch-run/logging.go diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go index 20928dbef7..0083f0999c 100644 --- a/services/crunch-run/logging.go +++ b/services/crunch-run/logging.go @@ -1,14 +1,21 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( "bufio" "bytes" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "io" "log" + "regexp" + "strings" "sync" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" ) // Timestamper is the signature for a function that takes a timestamp and @@ -18,7 +25,7 @@ type Timestamper func(t time.Time) string // Logging plumbing: // // ThrottledLogger.Logger -> ThrottledLogger.Write -> -// ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter -> +// ThrottledLogger.buf -> ThrottledLogger.flusher -> // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create // // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull @@ -31,87 +38,100 @@ type ThrottledLogger struct { *log.Logger buf *bytes.Buffer sync.Mutex - writer io.WriteCloser - stop bool - flusherDone chan bool + writer io.WriteCloser + flush chan struct{} + stopped chan struct{} + stopping chan struct{} Timestamper - Immediate *log.Logger + Immediate *log.Logger + pendingFlush bool } -// RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision, -// because the RFC3339Nano format isn't fixed width. -const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00" +// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano. +const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" -// RFC3339Timestamp return a RFC3339 formatted timestamp using RFC3339Fixed -func RFC3339Timestamp(now time.Time) string { - return now.Format(RFC3339Fixed) +// RFC3339Timestamp formats t as RFC3339NanoFixed. +func RFC3339Timestamp(t time.Time) string { + return t.Format(RFC3339NanoFixed) } -// Write to the internal buffer. Prepend a timestamp to each line of the input -// data. +// Write prepends a timestamp to each line of the input data and +// appends to the internal buffer. Each line is also logged to +// tl.Immediate, if tl.Immediate is not nil. func (tl *ThrottledLogger) Write(p []byte) (n int, err error) { tl.Mutex.Lock() + defer tl.Mutex.Unlock() + if tl.buf == nil { tl.buf = &bytes.Buffer{} } - defer tl.Mutex.Unlock() now := tl.Timestamper(time.Now().UTC()) sc := bufio.NewScanner(bytes.NewBuffer(p)) - for sc.Scan() { - _, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text()) + for err == nil && sc.Scan() { + out := fmt.Sprintf("%s %s\n", now, sc.Bytes()) if tl.Immediate != nil { - tl.Immediate.Printf("%s %s\n", now, sc.Text()) + tl.Immediate.Print(out[:len(out)-1]) + } + _, err = io.WriteString(tl.buf, out) + } + if err == nil { + err = sc.Err() + if err == nil { + n = len(p) } } - return len(p), err + + if int64(tl.buf.Len()) >= crunchLogBytesPerEvent { + // Non-blocking send. Try send a flush if it is ready to + // accept it. Otherwise do nothing because a flush is already + // pending. + select { + case tl.flush <- struct{}{}: + default: + } + } + + return } // Periodically check the current buffer; if not empty, send it on the // channel to the goWriter goroutine. func (tl *ThrottledLogger) flusher() { - bufchan := make(chan *bytes.Buffer) - bufterm := make(chan bool) - - // Use a separate goroutine for the actual write so that the writes are - // actually initiated closer every 1s instead of every - // 1s + (time to it takes to write). - go goWriter(tl.writer, bufchan, bufterm) - for { - if !tl.stop { - time.Sleep(1 * time.Second) + ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents)) + defer ticker.Stop() + for stopping := false; !stopping; { + select { + case <-tl.stopping: + // flush tl.buf and exit the loop + stopping = true + case <-tl.flush: + case <-ticker.C: } + + var ready *bytes.Buffer + tl.Mutex.Lock() - if tl.buf != nil && tl.buf.Len() > 0 { - oldbuf := tl.buf - tl.buf = nil - tl.Mutex.Unlock() - bufchan <- oldbuf - } else if tl.stop { - tl.Mutex.Unlock() - break - } else { - tl.Mutex.Unlock() - } - } - close(bufchan) - <-bufterm - tl.flusherDone <- true -} + ready, tl.buf = tl.buf, &bytes.Buffer{} + tl.Mutex.Unlock() -// Receive buffers from a channel and send to the underlying Writer -func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) { - for b := range c { - writer.Write(b.Bytes()) + if ready != nil && ready.Len() > 0 { + tl.writer.Write(ready.Bytes()) + } } - t <- true + close(tl.stopped) } // Close the flusher goroutine and wait for it to complete, then close the // underlying Writer. func (tl *ThrottledLogger) Close() error { - tl.stop = true - <-tl.flusherDone + select { + case <-tl.stopping: + // already stopped + default: + close(tl.stopping) + } + <-tl.stopped return tl.writer.Close() } @@ -155,25 +175,50 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) { // NewThrottledLogger creates a new thottled logger that // (a) prepends timestamps to each line -// (b) batches log messages and only calls the underlying Writer at most once -// per second. +// (b) batches log messages and only calls the underlying Writer +// at most once per "crunchLogSecondsBetweenEvents" seconds. func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { - alw := &ThrottledLogger{} - alw.flusherDone = make(chan bool) - alw.writer = writer - alw.Logger = log.New(alw, "", 0) - alw.Timestamper = RFC3339Timestamp - go alw.flusher() - return alw + tl := &ThrottledLogger{} + tl.flush = make(chan struct{}, 1) + tl.stopped = make(chan struct{}) + tl.stopping = make(chan struct{}) + tl.writer = writer + tl.Logger = log.New(tl, "", 0) + tl.Timestamper = RFC3339Timestamp + go tl.flusher() + return tl } -// ArvLogWriter implements a writer that writes to each of a WriteCloser -// (typically CollectionFileWriter) and creates an API server log entry. +// Log throttling rate limiting config parameters +var crunchLimitLogBytesPerJob int64 = 67108864 +var crunchLogThrottleBytes int64 = 65536 +var crunchLogThrottlePeriod time.Duration = time.Second * 60 +var crunchLogThrottleLines int64 = 1024 +var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5 +var crunchLogBytesPerEvent int64 = 4096 +var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1 + +// ArvLogWriter is an io.WriteCloser that processes each write by +// writing it through to another io.WriteCloser (typically a +// CollectionFileWriter) and creating an Arvados log entry. type ArvLogWriter struct { ArvClient IArvadosClient UUID string loggingStream string writeCloser io.WriteCloser + + // for rate limiting + bytesLogged int64 + logThrottleResetTime time.Time + logThrottleLinesSoFar int64 + logThrottleBytesSoFar int64 + logThrottleBytesSkipped int64 + logThrottleIsOpen bool + logThrottlePartialLineNextAt time.Time + logThrottleFirstPartialLine bool + bufToFlush bytes.Buffer + bufFlushedAt time.Time + closing bool } func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { @@ -183,24 +228,185 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { _, err1 = arvlog.writeCloser.Write(p) } - // write to API - lr := arvadosclient.Dict{"log": arvadosclient.Dict{ - "object_uuid": arvlog.UUID, - "event_type": arvlog.loggingStream, - "properties": map[string]string{"text": string(p)}}} - err2 := arvlog.ArvClient.Create("logs", lr, nil) + // write to API after checking rate limit + now := time.Now() + bytesWritten := 0 + + if now.After(arvlog.logThrottleResetTime) { + // It has been more than throttle_period seconds since the last + // checkpoint; so reset the throttle + if arvlog.logThrottleBytesSkipped > 0 { + arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped)) + } - if err1 != nil || err2 != nil { - return 0, fmt.Errorf("%s ; %s", err1, err2) + arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod) + arvlog.logThrottleBytesSoFar = 0 + arvlog.logThrottleLinesSoFar = 0 + arvlog.logThrottleBytesSkipped = 0 + arvlog.logThrottleIsOpen = true } - return len(p), nil + + lines := bytes.Split(p, []byte("\n")) + + for _, line := range lines { + // Short circuit the counting code if we're just going to throw + // away the data anyway. + if !arvlog.logThrottleIsOpen { + arvlog.logThrottleBytesSkipped += int64(len(line)) + continue + } else if len(line) == 0 { + continue + } + + // check rateLimit + logOpen, msg := arvlog.rateLimit(line, now) + if logOpen { + arvlog.bufToFlush.WriteString(string(msg) + "\n") + } + } + + if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent || + (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) || + arvlog.closing) && (arvlog.bufToFlush.Len() > 0) { + // write to API + lr := arvadosclient.Dict{"log": arvadosclient.Dict{ + "object_uuid": arvlog.UUID, + "event_type": arvlog.loggingStream, + "properties": map[string]string{"text": arvlog.bufToFlush.String()}}} + err2 := arvlog.ArvClient.Create("logs", lr, nil) + + bytesWritten = arvlog.bufToFlush.Len() + arvlog.bufToFlush = bytes.Buffer{} + arvlog.bufFlushedAt = now + + if err1 != nil || err2 != nil { + return 0, fmt.Errorf("%s ; %s", err1, err2) + } + } + + return bytesWritten, nil } // Close the underlying writer func (arvlog *ArvLogWriter) Close() (err error) { + arvlog.closing = true + arvlog.Write([]byte{}) if arvlog.writeCloser != nil { err = arvlog.writeCloser.Close() arvlog.writeCloser = nil } return err } + +var lineRegexp = regexp.MustCompile(`^\S+ (.*)`) + +// Test for hard cap on total output and for log throttling. Returns whether +// the log line should go to output or not. Returns message if limit exceeded. +func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) { + message := "" + lineSize := int64(len(line)) + + if arvlog.logThrottleIsOpen { + matches := lineRegexp.FindStringSubmatch(string(line)) + + if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") { + // This is a partial line. + + if arvlog.logThrottleFirstPartialLine { + // Partial should be suppressed. First time this is happening for this line so provide a message instead. + arvlog.logThrottleFirstPartialLine = false + arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod) + arvlog.logThrottleBytesSkipped += lineSize + return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", + RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second)) + } else if now.After(arvlog.logThrottlePartialLineNextAt) { + // The throttle period has passed. Update timestamp and let it through. + arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod) + } else { + // Suppress line. + arvlog.logThrottleBytesSkipped += lineSize + return false, line + } + } else { + // Not a partial line so reset. + arvlog.logThrottlePartialLineNextAt = time.Time{} + arvlog.logThrottleFirstPartialLine = true + } + + arvlog.bytesLogged += lineSize + arvlog.logThrottleBytesSoFar += lineSize + arvlog.logThrottleLinesSoFar += 1 + + if arvlog.bytesLogged > crunchLimitLogBytesPerJob { + message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", + RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob) + arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour)) + arvlog.logThrottleIsOpen = false + + } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes { + remainingTime := arvlog.logThrottleResetTime.Sub(now) + message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", + RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second) + arvlog.logThrottleIsOpen = false + + } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines { + remainingTime := arvlog.logThrottleResetTime.Sub(now) + message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", + RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second) + arvlog.logThrottleIsOpen = false + + } + } + + if !arvlog.logThrottleIsOpen { + // Don't log anything if any limit has been exceeded. Just count lossage. + arvlog.logThrottleBytesSkipped += lineSize + } + + if message != "" { + // Yes, write to logs, but use our "rate exceeded" message + // instead of the log message that exceeded the limit. + message += " A complete log is still being written to Keep, and will be available when the job finishes." + return true, []byte(message) + } else { + return arvlog.logThrottleIsOpen, line + } +} + +// load the rate limit discovery config parameters +func loadLogThrottleParams(clnt IArvadosClient) { + param, err := clnt.Discovery("crunchLimitLogBytesPerJob") + if err == nil { + crunchLimitLogBytesPerJob = int64(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogThrottleBytes") + if err == nil { + crunchLogThrottleBytes = int64(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogThrottlePeriod") + if err == nil { + crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64)) + } + + param, err = clnt.Discovery("crunchLogThrottleLines") + if err == nil { + crunchLogThrottleLines = int64(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod") + if err == nil { + crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64)) + } + + param, err = clnt.Discovery("crunchLogBytesPerEvent") + if err == nil { + crunchLogBytesPerEvent = int64(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogSecondsBetweenEvents") + if err == nil { + crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64)) + } +}