+
+var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
+
+// Test for hard cap on total output and for log throttling. Returns whether
+// the log line should go to output or not. Returns message if limit exceeded.
+func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
+ message := ""
+ lineSize := int64(len(line))
+ partialLine := false
+ skipCounts := false
+
+ if arvlog.logThrottleIsOpen {
+ matches := lineRegexp.FindStringSubmatch(string(line))
+
+ if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
+ partialLine = true
+
+ if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
+ arvlog.logThrottlePartialLineLastAt = now
+ } else {
+ skipCounts = true
+ }
+ }
+
+ if !skipCounts {
+ arvlog.logThrottleLinesSoFar += 1
+ arvlog.logThrottleBytesSoFar += lineSize
+ arvlog.bytesLogged += lineSize
+ }
+
+ if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob {
+ message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(now.UTC()), arvlog.crunchLimitLogBytesPerJob)
+ arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
+ arvlog.logThrottleIsOpen = false
+
+ } else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes {
+ remainingTime := arvlog.logThrottleResetTime.Sub(now)
+ message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+ arvlog.logThrottleIsOpen = false
+
+ } else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines {
+ remainingTime := arvlog.logThrottleResetTime.Sub(now)
+ message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+ arvlog.logThrottleIsOpen = false
+
+ } else if partialLine && arvlog.logThrottleFirstPartialLine {
+ arvlog.logThrottleFirstPartialLine = false
+ message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
+
+ }
+ }
+
+ if !arvlog.logThrottleIsOpen {
+ // Don't log anything if any limit has been exceeded. Just count lossage.
+ arvlog.logThrottleBytesSkipped += lineSize
+ }
+
+ if message != "" {
+ // Yes, write to logs, but use our "rate exceeded" message
+ // instead of the log message that exceeded the limit.
+ message += " A complete log is still being written to Keep, and will be available when the job finishes."
+ return true, []byte(message)
+ } else if partialLine {
+ return false, line
+ } else {
+ return arvlog.logThrottleIsOpen, line
+ }
+}