+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
buf *bytes.Buffer
sync.Mutex
writer io.WriteCloser
- stopping chan struct{}
+ flush chan struct{}
stopped chan struct{}
+ stopping chan struct{}
Timestamper
- Immediate *log.Logger
+ Immediate *log.Logger
+ pendingFlush bool
}
// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
n = len(p)
}
}
+
+ if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
+ // Non-blocking send. Try send a flush if it is ready to
+ // accept it. Otherwise do nothing because a flush is already
+ // pending.
+ select {
+ case tl.flush <- struct{}{}:
+ default:
+ }
+ }
+
return
}
// Periodically check the current buffer; if not empty, send it on the
// channel to the goWriter goroutine.
func (tl *ThrottledLogger) flusher() {
- ticker := time.NewTicker(time.Second)
+ ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
defer ticker.Stop()
for stopping := false; !stopping; {
select {
case <-tl.stopping:
- // flush tl.buf, then exit the loop
+ // flush tl.buf and exit the loop
stopping = true
+ case <-tl.flush:
case <-ticker.C:
}
var ready *bytes.Buffer
tl.Mutex.Lock()
- ready, tl.buf = tl.buf, nil
+ ready, tl.buf = tl.buf, &bytes.Buffer{}
tl.Mutex.Unlock()
if ready != nil && ready.Len() > 0 {
// NewThrottledLogger creates a new thottled logger that
// (a) prepends timestamps to each line
-// (b) batches log messages and only calls the underlying Writer at most once
-// per second.
+// (b) batches log messages and only calls the underlying Writer
+// at most once per "crunchLogSecondsBetweenEvents" seconds.
func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
tl := &ThrottledLogger{}
- tl.stopping = make(chan struct{})
+ tl.flush = make(chan struct{}, 1)
tl.stopped = make(chan struct{})
+ tl.stopping = make(chan struct{})
tl.writer = writer
tl.Logger = log.New(tl, "", 0)
tl.Timestamper = RFC3339Timestamp
return tl
}
+// Log throttling rate limiting config parameters
+var crunchLimitLogBytesPerJob int64 = 67108864
+var crunchLogThrottleBytes int64 = 65536
+var crunchLogThrottlePeriod time.Duration = time.Second * 60
+var crunchLogThrottleLines int64 = 1024
+var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
+var crunchLogBytesPerEvent int64 = 4096
+var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
+
// ArvLogWriter is an io.WriteCloser that processes each write by
// writing it through to another io.WriteCloser (typically a
// CollectionFileWriter) and creating an Arvados log entry.
logThrottleBytesSoFar int64
logThrottleBytesSkipped int64
logThrottleIsOpen bool
- logThrottlePartialLineLastAt time.Time
+ logThrottlePartialLineNextAt time.Time
logThrottleFirstPartialLine bool
- stderrBufToFlush bytes.Buffer
- stderrFlushedAt time.Time
+ bufToFlush bytes.Buffer
+ bufFlushedAt time.Time
+ closing bool
}
-func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
+func (arvlog *ArvLogWriter) Write(p []byte) (int, error) {
// Write to the next writer in the chain (a file in Keep)
var err1 error
if arvlog.writeCloser != nil {
}
// write to API after checking rate limit
- crunchLogThrottlePeriod, err2 := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
- crunchLogBytesPerEvent, err2 := arvlog.ArvClient.Discovery("crunchLogBytesPerEvent")
- crunchLogSecondsBetweenEvents, err2 := arvlog.ArvClient.Discovery("crunchLogSecondsBetweenEvents")
- if err2 != nil {
- return 0, fmt.Errorf("%s ; %s", err1, err2)
- }
-
now := time.Now()
- bytesWritten := 0
if now.After(arvlog.logThrottleResetTime) {
// It has been more than throttle_period seconds since the last
// checkpoint; so reset the throttle
if arvlog.logThrottleBytesSkipped > 0 {
- arvlog.stderrBufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(time.Now()), arvlog.logThrottleBytesSkipped))
+ arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
}
- arvlog.logThrottleResetTime = time.Now().Add(time.Duration(int(crunchLogThrottlePeriod.(float64))))
+ arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
arvlog.logThrottleBytesSoFar = 0
arvlog.logThrottleLinesSoFar = 0
arvlog.logThrottleBytesSkipped = 0
arvlog.logThrottleIsOpen = true
- arvlog.logThrottlePartialLineLastAt = time.Time{}
- arvlog.logThrottleFirstPartialLine = true
}
lines := bytes.Split(p, []byte("\n"))
}
// check rateLimit
- _, msg, err2 := arvlog.rateLimit(line)
- if err2 != nil {
- return 0, fmt.Errorf("%s ; %s", err1, err2)
+ logOpen, msg := arvlog.rateLimit(line, now)
+ if logOpen {
+ arvlog.bufToFlush.WriteString(string(msg) + "\n")
}
- arvlog.stderrBufToFlush.WriteString(string(msg) + "\n")
}
- if arvlog.stderrBufToFlush.Len() > int(crunchLogBytesPerEvent.(float64)) ||
- (time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(int64(crunchLogSecondsBetweenEvents.(float64)))) {
+ if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
+ (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
+ arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
// write to API
lr := arvadosclient.Dict{"log": arvadosclient.Dict{
"object_uuid": arvlog.UUID,
"event_type": arvlog.loggingStream,
- "properties": map[string]string{"text": arvlog.stderrBufToFlush.String()}}}
+ "properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
err2 := arvlog.ArvClient.Create("logs", lr, nil)
- bytesWritten = arvlog.stderrBufToFlush.Len()
- arvlog.stderrBufToFlush = bytes.Buffer{}
- arvlog.stderrFlushedAt = time.Now()
+ arvlog.bufToFlush = bytes.Buffer{}
+ arvlog.bufFlushedAt = now
if err1 != nil || err2 != nil {
return 0, fmt.Errorf("%s ; %s", err1, err2)
}
}
- return bytesWritten, nil
+ return len(p), nil
}
// Close the underlying writer
func (arvlog *ArvLogWriter) Close() (err error) {
+ arvlog.closing = true
+ arvlog.Write([]byte{})
if arvlog.writeCloser != nil {
err = arvlog.writeCloser.Close()
arvlog.writeCloser = nil
return err
}
-var lineRegexp = regexp.MustCompile(`^\S+ \S+ \d+ \d+ stderr (.*)`)
+var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
// Test for hard cap on total output and for log throttling. Returns whether
// the log line should go to output or not. Returns message if limit exceeded.
-func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
+func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
message := ""
lineSize := int64(len(line))
- partialLine := false
- skipCounts := false
+
if arvlog.logThrottleIsOpen {
matches := lineRegexp.FindStringSubmatch(string(line))
- crunchLogPartialLineThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogPartialLineThrottlePeriod")
- crunchLimitLogBytesPerJob, err := arvlog.ArvClient.Discovery("crunchLimitLogBytesPerJob")
- crunchLogThrottleBytes, err := arvlog.ArvClient.Discovery("crunchLogThrottleBytes")
- crunchLogThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
- crunchLogThrottleLines, err := arvlog.ArvClient.Discovery("crunchLogThrottleLines")
- if err != nil {
- return false, []byte(""), err
- }
-
if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
- partialLine = true
-
- if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(int(crunchLogPartialLineThrottlePeriod.(float64))))) {
- arvlog.logThrottlePartialLineLastAt = time.Now()
+ // This is a partial line.
+
+ if arvlog.logThrottleFirstPartialLine {
+ // Partial should be suppressed. First time this is happening for this line so provide a message instead.
+ arvlog.logThrottleFirstPartialLine = false
+ arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
+ arvlog.logThrottleBytesSkipped += lineSize
+ return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
+ RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
+ } else if now.After(arvlog.logThrottlePartialLineNextAt) {
+ // The throttle period has passed. Update timestamp and let it through.
+ arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
} else {
- skipCounts = true
+ // Suppress line.
+ arvlog.logThrottleBytesSkipped += lineSize
+ return false, line
}
+ } else {
+ // Not a partial line so reset.
+ arvlog.logThrottlePartialLineNextAt = time.Time{}
+ arvlog.logThrottleFirstPartialLine = true
}
- if !skipCounts {
- arvlog.logThrottleLinesSoFar += 1
- arvlog.logThrottleBytesSoFar += lineSize
- arvlog.bytesLogged += lineSize
- }
+ arvlog.bytesLogged += lineSize
+ arvlog.logThrottleBytesSoFar += lineSize
+ arvlog.logThrottleLinesSoFar += 1
- if arvlog.bytesLogged > int64(crunchLimitLogBytesPerJob.(float64)) {
- message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now()), int(crunchLimitLogBytesPerJob.(float64)))
- arvlog.logThrottleResetTime = time.Now().Add(time.Duration(365 * 24 * time.Hour))
+ if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
+ message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
+ RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
+ arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
arvlog.logThrottleIsOpen = false
- } else if arvlog.logThrottleBytesSoFar > int64(crunchLogThrottleBytes.(float64)) {
- remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
- message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now()), crunchLogThrottleBytes, int(crunchLogThrottlePeriod.(float64)), remainingTime)
+
+ } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
+ remainingTime := arvlog.logThrottleResetTime.Sub(now)
+ message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
+ RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
arvlog.logThrottleIsOpen = false
- } else if arvlog.logThrottleLinesSoFar > int64(crunchLogThrottleLines.(float64)) {
- remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
- message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now()), crunchLogThrottleLines, int(crunchLogThrottlePeriod.(float64)), remainingTime)
+
+ } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
+ remainingTime := arvlog.logThrottleResetTime.Sub(now)
+ message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
+ RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
arvlog.logThrottleIsOpen = false
- } else if partialLine && arvlog.logThrottleFirstPartialLine {
- arvlog.logThrottleFirstPartialLine = false
- message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now()), crunchLogPartialLineThrottlePeriod)
+
}
}
if message != "" {
// Yes, write to logs, but use our "rate exceeded" message
// instead of the log message that exceeded the limit.
- message += " A complete log is still being written to Keep, and will be available when the job finishes.\n"
- return true, []byte(message), nil
- } else if partialLine {
- return false, line, nil
+ message += " A complete log is still being written to Keep, and will be available when the job finishes."
+ return true, []byte(message)
} else {
- return arvlog.logThrottleIsOpen, line, nil
+ return arvlog.logThrottleIsOpen, line
+ }
+}
+
+// load the rate limit discovery config parameters
+func loadLogThrottleParams(clnt IArvadosClient) {
+ param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
+ if err == nil {
+ crunchLimitLogBytesPerJob = int64(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogThrottleBytes")
+ if err == nil {
+ crunchLogThrottleBytes = int64(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogThrottlePeriod")
+ if err == nil {
+ crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogThrottleLines")
+ if err == nil {
+ crunchLogThrottleLines = int64(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
+ if err == nil {
+ crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogBytesPerEvent")
+ if err == nil {
+ crunchLogBytesPerEvent = int64(param.(float64))
+ }
+
+ param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
+ if err == nil {
+ crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
}
}