+
+var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
+
+// Test for hard cap on total output and for log throttling. Returns whether
+// the log line should go to output or not. Returns message if limit exceeded.
+func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
+ message := ""
+ lineSize := int64(len(line))
+
+ if arvlog.logThrottleIsOpen {
+ matches := lineRegexp.FindStringSubmatch(string(line))
+
+ if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
+ // This is a partial line.
+
+ if arvlog.logThrottleFirstPartialLine {
+ // Partial should be suppressed. First time this is happening for this line so provide a message instead.
+ arvlog.logThrottleFirstPartialLine = false
+ arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
+ arvlog.logThrottleBytesSkipped += lineSize
+ return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
+ RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
+ } else if now.After(arvlog.logThrottlePartialLineNextAt) {
+ // The throttle period has passed. Update timestamp and let it through.
+ arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
+ } else {
+ // Suppress line.
+ arvlog.logThrottleBytesSkipped += lineSize
+ return false, line
+ }
+ } else {
+ // Not a partial line so reset.
+ arvlog.logThrottlePartialLineNextAt = time.Time{}
+ arvlog.logThrottleFirstPartialLine = true
+ }
+
+ arvlog.bytesLogged += lineSize
+ arvlog.logThrottleBytesSoFar += lineSize
+ arvlog.logThrottleLinesSoFar += 1
+
+ if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
+ message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
+ RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
+ arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
+ arvlog.logThrottleIsOpen = false
+
+ } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
+ remainingTime := arvlog.logThrottleResetTime.Sub(now)
+ message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
+ RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
+ arvlog.logThrottleIsOpen = false
+
+ } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
+ remainingTime := arvlog.logThrottleResetTime.Sub(now)
+ message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
+ RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
+ arvlog.logThrottleIsOpen = false
+
+ }
+ }
+
+ if !arvlog.logThrottleIsOpen {
+ // Don't log anything if any limit has been exceeded. Just count lossage.
+ arvlog.logThrottleBytesSkipped += lineSize
+ }
+
+ if message != "" {
+ // Yes, write to logs, but use our "rate exceeded" message
+ // instead of the log message that exceeded the limit.
+ message += " A complete log is still being written to Keep, and will be available when the job finishes."
+ return true, []byte(message)
+ } else {
+ return arvlog.logThrottleIsOpen, line
+ }
+}
+
+// load the rate limit discovery config parameters
+func loadLogThrottleParams(clnt IArvadosClient) {
+ loadDuration := func(dst *time.Duration, key string) {
+ if param, err := clnt.Discovery(key); err != nil {
+ return
+ } else if d, ok := param.(float64); !ok {
+ return
+ } else {
+ *dst = time.Duration(d) * time.Second
+ }
+ }
+ loadInt64 := func(dst *int64, key string) {
+ if param, err := clnt.Discovery(key); err != nil {
+ return
+ } else if val, ok := param.(float64); !ok {
+ return
+ } else {
+ *dst = int64(val)
+ }
+ }
+
+ loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
+ loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
+ loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
+ loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
+ loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
+ loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
+ loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
+ loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
+ loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
+
+}