logThrottleIsOpen bool
logThrottlePartialLineLastAt time.Time
logThrottleFirstPartialLine bool
- stderrBufToFlush bytes.Buffer
- stderrFlushedAt time.Time
+ bufToFlush bytes.Buffer
+ bufFlushedAt time.Time
// rate limiting config parameters
crunchLimitLogBytesPerJob int64
// It has been more than throttle_period seconds since the last
// checkpoint; so reset the throttle
if arvlog.logThrottleBytesSkipped > 0 {
- arvlog.stderrBufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(time.Now().UTC()), arvlog.logThrottleBytesSkipped))
+ arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
}
- arvlog.logThrottleResetTime = time.Now().Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
+ arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
arvlog.logThrottleBytesSoFar = 0
arvlog.logThrottleLinesSoFar = 0
arvlog.logThrottleBytesSkipped = 0
}
// check rateLimit
- _, msg, err2 := arvlog.rateLimit(line)
- if err2 != nil {
- return 0, fmt.Errorf("%s ; %s", err1, err2)
- }
- arvlog.stderrBufToFlush.WriteString(string(msg) + "\n")
+ logOpen, msg := arvlog.rateLimit(line, now)
+ arvlog.bufToFlush.WriteString(string(msg) + "\n")
+ arvlog.logThrottleIsOpen = logOpen
}
- if int64(arvlog.stderrBufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
- (time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
+ if int64(arvlog.bufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
+ (now.Sub(arvlog.bufFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
// write to API
lr := arvadosclient.Dict{"log": arvadosclient.Dict{
"object_uuid": arvlog.UUID,
"event_type": arvlog.loggingStream,
- "properties": map[string]string{"text": arvlog.stderrBufToFlush.String()}}}
+ "properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
err2 := arvlog.ArvClient.Create("logs", lr, nil)
- bytesWritten = arvlog.stderrBufToFlush.Len()
- arvlog.stderrBufToFlush = bytes.Buffer{}
- arvlog.stderrFlushedAt = time.Now()
+ bytesWritten = arvlog.bufToFlush.Len()
+ arvlog.bufToFlush = bytes.Buffer{}
+ arvlog.bufFlushedAt = now
if err1 != nil || err2 != nil {
return 0, fmt.Errorf("%s ; %s", err1, err2)
return err
}
-var lineRegexp = regexp.MustCompile(`^\S+ \S+ \d+ \d+ stderr (.*)`)
+var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
// Test for hard cap on total output and for log throttling. Returns whether
// the log line should go to output or not. Returns message if limit exceeded.
-func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
+func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
message := ""
lineSize := int64(len(line))
partialLine := false
if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
partialLine = true
- if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
- arvlog.logThrottlePartialLineLastAt = time.Now()
+ if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
+ arvlog.logThrottlePartialLineLastAt = now
} else {
skipCounts = true
}
}
if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob {
- message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLimitLogBytesPerJob)
- arvlog.logThrottleResetTime = time.Now().Add(time.Duration(365 * 24 * time.Hour))
+ message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(now.UTC()), arvlog.crunchLimitLogBytesPerJob)
+ arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
arvlog.logThrottleIsOpen = false
} else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes {
- remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
- message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+ remainingTime := arvlog.logThrottleResetTime.Sub(now)
+ message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
arvlog.logThrottleIsOpen = false
} else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines {
- remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
- message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+ remainingTime := arvlog.logThrottleResetTime.Sub(now)
+ message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
arvlog.logThrottleIsOpen = false
} else if partialLine && arvlog.logThrottleFirstPartialLine {
arvlog.logThrottleFirstPartialLine = false
- message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now().UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
+ message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
}
}
// Yes, write to logs, but use our "rate exceeded" message
// instead of the log message that exceeded the limit.
message += " A complete log is still being written to Keep, and will be available when the job finishes."
- return true, []byte(message), nil
+ return true, []byte(message)
} else if partialLine {
- return false, line, nil
+ return false, line
} else {
- return arvlog.logThrottleIsOpen, line, nil
+ return arvlog.logThrottleIsOpen, line
}
}