X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3acd5d7f73c24a2ea2d686588be44efb9ac056b2..3826a6339ba1c901c054053920ed20547b3ba54d:/services/crunch-run/logging.go?ds=sidebyside diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go index 45dfc2e341..ce0a661263 100644 --- a/services/crunch-run/logging.go +++ b/services/crunch-run/logging.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -34,9 +38,10 @@ type ThrottledLogger struct { *log.Logger buf *bytes.Buffer sync.Mutex - writer io.WriteCloser - flush chan struct{} - stopped chan struct{} + writer io.WriteCloser + flush chan struct{} + stopped chan struct{} + stopping chan struct{} Timestamper Immediate *log.Logger pendingFlush bool @@ -61,11 +66,6 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) { tl.buf = &bytes.Buffer{} } - //if int64(tl.buf.Len()) >= crunchLogBytesPerEvent && !tl.pendingFlush { - // tl.pendingFlush = true - // tl.flush <- struct{}{} - //} - now := tl.Timestamper(time.Now().UTC()) sc := bufio.NewScanner(bytes.NewBuffer(p)) for err == nil && sc.Scan() { @@ -81,6 +81,17 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) { n = len(p) } } + + if int64(tl.buf.Len()) >= crunchLogBytesPerEvent { + // Non-blocking send. Try send a flush if it is ready to + // accept it. Otherwise do nothing because a flush is already + // pending. + select { + case tl.flush <- struct{}{}: + default: + } + } + return } @@ -91,9 +102,10 @@ func (tl *ThrottledLogger) flusher() { defer ticker.Stop() for stopping := false; !stopping; { select { - case _, open := <-tl.flush: - // if !open, flush tl.buf and exit the loop - stopping = !open + case <-tl.stopping: + // flush tl.buf and exit the loop + stopping = true + case <-tl.flush: case <-ticker.C: } @@ -101,7 +113,6 @@ func (tl *ThrottledLogger) flusher() { tl.Mutex.Lock() ready, tl.buf = tl.buf, &bytes.Buffer{} - tl.pendingFlush = false tl.Mutex.Unlock() if ready != nil && ready.Len() > 0 { @@ -115,10 +126,10 @@ func (tl *ThrottledLogger) flusher() { // underlying Writer. func (tl *ThrottledLogger) Close() error { select { - case <-tl.flush: + case <-tl.stopping: // already stopped default: - close(tl.flush) + close(tl.stopping) } <-tl.stopped return tl.writer.Close() @@ -170,6 +181,7 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { tl := &ThrottledLogger{} tl.flush = make(chan struct{}, 1) tl.stopped = make(chan struct{}) + tl.stopping = make(chan struct{}) tl.writer = writer tl.Logger = log.New(tl, "", 0) tl.Timestamper = RFC3339Timestamp @@ -178,13 +190,13 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { } // Log throttling rate limiting config parameters -var crunchLimitLogBytesPerJob int64 -var crunchLogThrottleBytes int64 -var crunchLogThrottlePeriod int -var crunchLogThrottleLines int64 -var crunchLogPartialLineThrottlePeriod int -var crunchLogBytesPerEvent int64 -var crunchLogSecondsBetweenEvents int +var crunchLimitLogBytesPerJob int64 = 67108864 +var crunchLogThrottleBytes int64 = 65536 +var crunchLogThrottlePeriod time.Duration = time.Second * 60 +var crunchLogThrottleLines int64 = 1024 +var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5 +var crunchLogBytesPerEvent int64 = 4096 +var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1 // ArvLogWriter is an io.WriteCloser that processes each write by // writing it through to another io.WriteCloser (typically a @@ -202,13 +214,14 @@ type ArvLogWriter struct { logThrottleBytesSoFar int64 logThrottleBytesSkipped int64 logThrottleIsOpen bool - logThrottlePartialLineLastAt time.Time + logThrottlePartialLineNextAt time.Time logThrottleFirstPartialLine bool bufToFlush bytes.Buffer bufFlushedAt time.Time + closing bool } -func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { +func (arvlog *ArvLogWriter) Write(p []byte) (int, error) { // Write to the next writer in the chain (a file in Keep) var err1 error if arvlog.writeCloser != nil { @@ -217,7 +230,6 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { // write to API after checking rate limit now := time.Now() - bytesWritten := 0 if now.After(arvlog.logThrottleResetTime) { // It has been more than throttle_period seconds since the last @@ -226,13 +238,11 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped)) } - arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(crunchLogThrottlePeriod)) + arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod) arvlog.logThrottleBytesSoFar = 0 arvlog.logThrottleLinesSoFar = 0 arvlog.logThrottleBytesSkipped = 0 arvlog.logThrottleIsOpen = true - arvlog.logThrottlePartialLineLastAt = time.Time{} - arvlog.logThrottleFirstPartialLine = true } lines := bytes.Split(p, []byte("\n")) @@ -254,8 +264,9 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { } } - if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent || - (now.Sub(arvlog.bufFlushedAt) >= time.Duration(crunchLogSecondsBetweenEvents)) { + if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent || + (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) || + arvlog.closing) && (arvlog.bufToFlush.Len() > 0) { // write to API lr := arvadosclient.Dict{"log": arvadosclient.Dict{ "object_uuid": arvlog.UUID, @@ -263,7 +274,6 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { "properties": map[string]string{"text": arvlog.bufToFlush.String()}}} err2 := arvlog.ArvClient.Create("logs", lr, nil) - bytesWritten = arvlog.bufToFlush.Len() arvlog.bufToFlush = bytes.Buffer{} arvlog.bufFlushedAt = now @@ -272,11 +282,13 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { } } - return bytesWritten, nil + return len(p), nil } // Close the underlying writer func (arvlog *ArvLogWriter) Close() (err error) { + arvlog.closing = true + arvlog.Write([]byte{}) if arvlog.writeCloser != nil { err = arvlog.writeCloser.Close() arvlog.writeCloser = nil @@ -291,22 +303,37 @@ var lineRegexp = regexp.MustCompile(`^\S+ (.*)`) func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) { message := "" lineSize := int64(len(line)) - partialLine := false if arvlog.logThrottleIsOpen { matches := lineRegexp.FindStringSubmatch(string(line)) if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") { - partialLine = true - if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) { - arvlog.logThrottlePartialLineLastAt = now - arvlog.logThrottleFirstPartialLine = true + // This is a partial line. + + if arvlog.logThrottleFirstPartialLine { + // Partial should be suppressed. First time this is happening for this line so provide a message instead. + arvlog.logThrottleFirstPartialLine = false + arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod) + arvlog.logThrottleBytesSkipped += lineSize + return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", + RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second)) + } else if now.After(arvlog.logThrottlePartialLineNextAt) { + // The throttle period has passed. Update timestamp and let it through. + arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod) + } else { + // Suppress line. + arvlog.logThrottleBytesSkipped += lineSize + return false, line } + } else { + // Not a partial line so reset. + arvlog.logThrottlePartialLineNextAt = time.Time{} + arvlog.logThrottleFirstPartialLine = true } - arvlog.logThrottleLinesSoFar += 1 - arvlog.logThrottleBytesSoFar += lineSize arvlog.bytesLogged += lineSize + arvlog.logThrottleBytesSoFar += lineSize + arvlog.logThrottleLinesSoFar += 1 if arvlog.bytesLogged > crunchLimitLogBytesPerJob { message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", @@ -317,20 +344,15 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes { remainingTime := arvlog.logThrottleResetTime.Sub(now) message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", - RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod, remainingTime/time.Second) + RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second) arvlog.logThrottleIsOpen = false } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines { remainingTime := arvlog.logThrottleResetTime.Sub(now) message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", - RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod, remainingTime/time.Second) + RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second) arvlog.logThrottleIsOpen = false - } else if partialLine && arvlog.logThrottleFirstPartialLine { - arvlog.logThrottleFirstPartialLine = false - message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", - RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod) - } } @@ -349,54 +371,40 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) } } -// load the rate limit discovery config paramters +// load the rate limit discovery config parameters func loadLogThrottleParams(clnt IArvadosClient) { param, err := clnt.Discovery("crunchLimitLogBytesPerJob") - if err != nil { - crunchLimitLogBytesPerJob = 67108864 - } else { + if err == nil { crunchLimitLogBytesPerJob = int64(param.(float64)) } param, err = clnt.Discovery("crunchLogThrottleBytes") - if err != nil { - crunchLogThrottleBytes = 65536 - } else { + if err == nil { crunchLogThrottleBytes = int64(param.(float64)) } param, err = clnt.Discovery("crunchLogThrottlePeriod") - if err != nil { - crunchLogThrottlePeriod = 60 - } else { - crunchLogThrottlePeriod = int(param.(float64)) + if err == nil { + crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64)) } param, err = clnt.Discovery("crunchLogThrottleLines") - if err != nil { - crunchLogThrottleLines = 1024 - } else { + if err == nil { crunchLogThrottleLines = int64(param.(float64)) } param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod") - if err != nil { - crunchLogPartialLineThrottlePeriod = 5 - } else { - crunchLogPartialLineThrottlePeriod = int(param.(float64)) + if err == nil { + crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64)) } param, err = clnt.Discovery("crunchLogBytesPerEvent") - if err != nil { - crunchLogBytesPerEvent = 4096 - } else { + if err == nil { crunchLogBytesPerEvent = int64(param.(float64)) } param, err = clnt.Discovery("crunchLogSecondsBetweenEvents") - if err != nil { - crunchLogSecondsBetweenEvents = 1 - } else { - crunchLogSecondsBetweenEvents = int(param.(float64)) + if err == nil { + crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64)) } }