X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8622b46a4a6c127a1927d9c2e54febec6a5bf503..0eb72b526bf8bbb011551ecf019f604e17a534f1:/services/crunch-run/logging.go diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go index 22ba130f24..0083f0999c 100644 --- a/services/crunch-run/logging.go +++ b/services/crunch-run/logging.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -35,10 +39,12 @@ type ThrottledLogger struct { buf *bytes.Buffer sync.Mutex writer io.WriteCloser - stopping chan struct{} + flush chan struct{} stopped chan struct{} + stopping chan struct{} Timestamper - Immediate *log.Logger + Immediate *log.Logger + pendingFlush bool } // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano. @@ -75,26 +81,38 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) { n = len(p) } } + + if int64(tl.buf.Len()) >= crunchLogBytesPerEvent { + // Non-blocking send. Try send a flush if it is ready to + // accept it. Otherwise do nothing because a flush is already + // pending. + select { + case tl.flush <- struct{}{}: + default: + } + } + return } // Periodically check the current buffer; if not empty, send it on the // channel to the goWriter goroutine. func (tl *ThrottledLogger) flusher() { - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents)) defer ticker.Stop() for stopping := false; !stopping; { select { case <-tl.stopping: - // flush tl.buf, then exit the loop + // flush tl.buf and exit the loop stopping = true + case <-tl.flush: case <-ticker.C: } var ready *bytes.Buffer tl.Mutex.Lock() - ready, tl.buf = tl.buf, nil + ready, tl.buf = tl.buf, &bytes.Buffer{} tl.Mutex.Unlock() if ready != nil && ready.Len() > 0 { @@ -157,12 +175,13 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) { // NewThrottledLogger creates a new thottled logger that // (a) prepends timestamps to each line -// (b) batches log messages and only calls the underlying Writer at most once -// per second. +// (b) batches log messages and only calls the underlying Writer +// at most once per "crunchLogSecondsBetweenEvents" seconds. func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { tl := &ThrottledLogger{} - tl.stopping = make(chan struct{}) + tl.flush = make(chan struct{}, 1) tl.stopped = make(chan struct{}) + tl.stopping = make(chan struct{}) tl.writer = writer tl.Logger = log.New(tl, "", 0) tl.Timestamper = RFC3339Timestamp @@ -170,6 +189,15 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { return tl } +// Log throttling rate limiting config parameters +var crunchLimitLogBytesPerJob int64 = 67108864 +var crunchLogThrottleBytes int64 = 65536 +var crunchLogThrottlePeriod time.Duration = time.Second * 60 +var crunchLogThrottleLines int64 = 1024 +var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5 +var crunchLogBytesPerEvent int64 = 4096 +var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1 + // ArvLogWriter is an io.WriteCloser that processes each write by // writing it through to another io.WriteCloser (typically a // CollectionFileWriter) and creating an Arvados log entry. @@ -186,76 +214,11 @@ type ArvLogWriter struct { logThrottleBytesSoFar int64 logThrottleBytesSkipped int64 logThrottleIsOpen bool - logThrottlePartialLineLastAt time.Time + logThrottlePartialLineNextAt time.Time logThrottleFirstPartialLine bool - stderrBufToFlush bytes.Buffer - stderrFlushedAt time.Time - - // rate limiting config parameters - crunchLimitLogBytesPerJob int64 - crunchLogThrottleBytes int64 - crunchLogThrottlePeriod int - crunchLogThrottleLines int64 - crunchLogPartialLineThrottlePeriod int - crunchLogBytesPerEvent int64 - crunchLogSecondsBetweenEvents int -} - -// NewArvLogWriter creates new ArvLogWriter and loads the rate limiting config params -func NewArvLogWriter(clnt IArvadosClient, uuid string, ls string, wc io.WriteCloser) *ArvLogWriter { - w := &ArvLogWriter{ArvClient: clnt, UUID: uuid, loggingStream: ls, writeCloser: wc} - - // load the rate limit discovery config paramters - param, err := clnt.Discovery("crunchLimitLogBytesPerJob") - if err != nil { - w.crunchLimitLogBytesPerJob = 67108864 - } else { - w.crunchLimitLogBytesPerJob = int64(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogThrottleBytes") - if err != nil { - w.crunchLogThrottleBytes = 65536 - } else { - w.crunchLogThrottleBytes = int64(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogThrottlePeriod") - if err != nil { - w.crunchLogThrottlePeriod = 60 - } else { - w.crunchLogThrottlePeriod = int(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogThrottleLines") - if err != nil { - w.crunchLogThrottleLines = 1024 - } else { - w.crunchLogThrottleLines = int64(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod") - if err != nil { - w.crunchLogPartialLineThrottlePeriod = 5 - } else { - w.crunchLogPartialLineThrottlePeriod = int(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogBytesPerEvent") - if err != nil { - w.crunchLogBytesPerEvent = 4096 - } else { - w.crunchLogBytesPerEvent = int64(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogSecondsBetweenEvents") - if err != nil { - w.crunchLogSecondsBetweenEvents = 1 - } else { - w.crunchLogSecondsBetweenEvents = int(param.(float64)) - } - - return w + bufToFlush bytes.Buffer + bufFlushedAt time.Time + closing bool } func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { @@ -273,16 +236,14 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { // It has been more than throttle_period seconds since the last // checkpoint; so reset the throttle if arvlog.logThrottleBytesSkipped > 0 { - arvlog.stderrBufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(time.Now().UTC()), arvlog.logThrottleBytesSkipped)) + arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped)) } - arvlog.logThrottleResetTime = time.Now().Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod)) + arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod) arvlog.logThrottleBytesSoFar = 0 arvlog.logThrottleLinesSoFar = 0 arvlog.logThrottleBytesSkipped = 0 arvlog.logThrottleIsOpen = true - arvlog.logThrottlePartialLineLastAt = time.Time{} - arvlog.logThrottleFirstPartialLine = true } lines := bytes.Split(p, []byte("\n")) @@ -298,25 +259,25 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { } // check rateLimit - _, msg, err2 := arvlog.rateLimit(line) - if err2 != nil { - return 0, fmt.Errorf("%s ; %s", err1, err2) + logOpen, msg := arvlog.rateLimit(line, now) + if logOpen { + arvlog.bufToFlush.WriteString(string(msg) + "\n") } - arvlog.stderrBufToFlush.WriteString(string(msg) + "\n") } - if int64(arvlog.stderrBufToFlush.Len()) > arvlog.crunchLogBytesPerEvent || - (time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) { + if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent || + (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) || + arvlog.closing) && (arvlog.bufToFlush.Len() > 0) { // write to API lr := arvadosclient.Dict{"log": arvadosclient.Dict{ "object_uuid": arvlog.UUID, "event_type": arvlog.loggingStream, - "properties": map[string]string{"text": arvlog.stderrBufToFlush.String()}}} + "properties": map[string]string{"text": arvlog.bufToFlush.String()}}} err2 := arvlog.ArvClient.Create("logs", lr, nil) - bytesWritten = arvlog.stderrBufToFlush.Len() - arvlog.stderrBufToFlush = bytes.Buffer{} - arvlog.stderrFlushedAt = time.Now() + bytesWritten = arvlog.bufToFlush.Len() + arvlog.bufToFlush = bytes.Buffer{} + arvlog.bufFlushedAt = now if err1 != nil || err2 != nil { return 0, fmt.Errorf("%s ; %s", err1, err2) @@ -328,6 +289,8 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { // Close the underlying writer func (arvlog *ArvLogWriter) Close() (err error) { + arvlog.closing = true + arvlog.Write([]byte{}) if arvlog.writeCloser != nil { err = arvlog.writeCloser.Close() arvlog.writeCloser = nil @@ -335,54 +298,63 @@ func (arvlog *ArvLogWriter) Close() (err error) { return err } -var lineRegexp = regexp.MustCompile(`^\S+ \S+ \d+ \d+ stderr (.*)`) +var lineRegexp = regexp.MustCompile(`^\S+ (.*)`) // Test for hard cap on total output and for log throttling. Returns whether // the log line should go to output or not. Returns message if limit exceeded. -func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) { +func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) { message := "" lineSize := int64(len(line)) - partialLine := false - skipCounts := false if arvlog.logThrottleIsOpen { matches := lineRegexp.FindStringSubmatch(string(line)) if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") { - partialLine = true - - if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) { - arvlog.logThrottlePartialLineLastAt = time.Now() + // This is a partial line. + + if arvlog.logThrottleFirstPartialLine { + // Partial should be suppressed. First time this is happening for this line so provide a message instead. + arvlog.logThrottleFirstPartialLine = false + arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod) + arvlog.logThrottleBytesSkipped += lineSize + return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", + RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second)) + } else if now.After(arvlog.logThrottlePartialLineNextAt) { + // The throttle period has passed. Update timestamp and let it through. + arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod) } else { - skipCounts = true + // Suppress line. + arvlog.logThrottleBytesSkipped += lineSize + return false, line } + } else { + // Not a partial line so reset. + arvlog.logThrottlePartialLineNextAt = time.Time{} + arvlog.logThrottleFirstPartialLine = true } - if !skipCounts { - arvlog.logThrottleLinesSoFar += 1 - arvlog.logThrottleBytesSoFar += lineSize - arvlog.bytesLogged += lineSize - } + arvlog.bytesLogged += lineSize + arvlog.logThrottleBytesSoFar += lineSize + arvlog.logThrottleLinesSoFar += 1 - if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob { - message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLimitLogBytesPerJob) - arvlog.logThrottleResetTime = time.Now().Add(time.Duration(365 * 24 * time.Hour)) + if arvlog.bytesLogged > crunchLimitLogBytesPerJob { + message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", + RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob) + arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour)) arvlog.logThrottleIsOpen = false - } else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes { - remainingTime := arvlog.logThrottleResetTime.Sub(time.Now()) - message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second) + } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes { + remainingTime := arvlog.logThrottleResetTime.Sub(now) + message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", + RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second) arvlog.logThrottleIsOpen = false - } else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines { - remainingTime := arvlog.logThrottleResetTime.Sub(time.Now()) - message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second) + } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines { + remainingTime := arvlog.logThrottleResetTime.Sub(now) + message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", + RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second) arvlog.logThrottleIsOpen = false - } else if partialLine && arvlog.logThrottleFirstPartialLine { - arvlog.logThrottleFirstPartialLine = false - message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogPartialLineThrottlePeriod) - } } @@ -395,10 +367,46 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) { // Yes, write to logs, but use our "rate exceeded" message // instead of the log message that exceeded the limit. message += " A complete log is still being written to Keep, and will be available when the job finishes." - return true, []byte(message), nil - } else if partialLine { - return false, line, nil + return true, []byte(message) } else { - return arvlog.logThrottleIsOpen, line, nil + return arvlog.logThrottleIsOpen, line + } +} + +// load the rate limit discovery config parameters +func loadLogThrottleParams(clnt IArvadosClient) { + param, err := clnt.Discovery("crunchLimitLogBytesPerJob") + if err == nil { + crunchLimitLogBytesPerJob = int64(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogThrottleBytes") + if err == nil { + crunchLogThrottleBytes = int64(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogThrottlePeriod") + if err == nil { + crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64)) + } + + param, err = clnt.Discovery("crunchLogThrottleLines") + if err == nil { + crunchLogThrottleLines = int64(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod") + if err == nil { + crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64)) + } + + param, err = clnt.Discovery("crunchLogBytesPerEvent") + if err == nil { + crunchLogBytesPerEvent = int64(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogSecondsBetweenEvents") + if err == nil { + crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64)) } }