X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a3c7d9e03062e3246b0857fbae05f45d22e39169..2e727c5d2d000faa6f1d9a566dc59568f1b276fe:/lib/crunchrun/logging.go diff --git a/lib/crunchrun/logging.go b/lib/crunchrun/logging.go index d5de184e5c..76a55c4992 100644 --- a/lib/crunchrun/logging.go +++ b/lib/crunchrun/logging.go @@ -7,6 +7,7 @@ package crunchrun import ( "bufio" "bytes" + "encoding/json" "fmt" "io" "log" @@ -335,7 +336,7 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) arvlog.bytesLogged += lineSize arvlog.logThrottleBytesSoFar += lineSize - arvlog.logThrottleLinesSoFar += 1 + arvlog.logThrottleLinesSoFar++ if arvlog.bytesLogged > crunchLimitLogBytesPerJob { message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", @@ -368,9 +369,8 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) // instead of the log message that exceeded the limit. message += " A complete log is still being written to Keep, and will be available when the job finishes." return true, []byte(message) - } else { - return arvlog.logThrottleIsOpen, line } + return arvlog.logThrottleIsOpen, line } // load the rate limit discovery config parameters @@ -405,3 +405,53 @@ func loadLogThrottleParams(clnt IArvadosClient) { loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod") } + +type filterKeepstoreErrorsOnly struct { + io.WriteCloser + buf []byte +} + +func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) { + log.Printf("filterKeepstoreErrorsOnly: write %q", p) + f.buf = append(f.buf, p...) + start := 0 + for i := len(f.buf) - len(p); i < len(f.buf); i++ { + if f.buf[i] == '\n' { + if f.check(f.buf[start:i]) { + _, err := f.WriteCloser.Write(f.buf[start : i+1]) + if err != nil { + return 0, err + } + } + start = i + 1 + } + } + if start > 0 { + copy(f.buf, f.buf[start:]) + f.buf = f.buf[:len(f.buf)-start] + } + return len(p), nil +} + +func (f *filterKeepstoreErrorsOnly) check(line []byte) bool { + if len(line) == 0 { + return false + } + if line[0] != '{' { + return true + } + var m map[string]interface{} + err := json.Unmarshal(line, &m) + if err != nil { + return true + } + if m["msg"] == "request" { + return false + } + if m["msg"] == "response" { + if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 { + return false + } + } + return true +}