X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/28e50cc9480fdad416404542511a172cdc7253c7..b1ffc878a5ea6ba083b8fbc8f20c15c7a6f1e1ec:/lib/crunchrun/logging.go diff --git a/lib/crunchrun/logging.go b/lib/crunchrun/logging.go index d5de184e5c..91a1b77cf4 100644 --- a/lib/crunchrun/logging.go +++ b/lib/crunchrun/logging.go @@ -7,6 +7,7 @@ package crunchrun import ( "bufio" "bytes" + "encoding/json" "fmt" "io" "log" @@ -174,9 +175,9 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) { } // NewThrottledLogger creates a new thottled logger that -// (a) prepends timestamps to each line -// (b) batches log messages and only calls the underlying Writer -// at most once per "crunchLogSecondsBetweenEvents" seconds. +// - prepends timestamps to each line, and +// - batches log messages and only calls the underlying Writer +// at most once per "crunchLogSecondsBetweenEvents" seconds. func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { tl := &ThrottledLogger{} tl.flush = make(chan struct{}, 1) @@ -335,7 +336,7 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) arvlog.bytesLogged += lineSize arvlog.logThrottleBytesSoFar += lineSize - arvlog.logThrottleLinesSoFar += 1 + arvlog.logThrottleLinesSoFar++ if arvlog.bytesLogged > crunchLimitLogBytesPerJob { message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", @@ -368,9 +369,8 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) // instead of the log message that exceeded the limit. message += " A complete log is still being written to Keep, and will be available when the job finishes." return true, []byte(message) - } else { - return arvlog.logThrottleIsOpen, line } + return arvlog.logThrottleIsOpen, line } // load the rate limit discovery config parameters @@ -405,3 +405,53 @@ func loadLogThrottleParams(clnt IArvadosClient) { loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod") } + +type filterKeepstoreErrorsOnly struct { + io.WriteCloser + buf []byte +} + +func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) { + log.Printf("filterKeepstoreErrorsOnly: write %q", p) + f.buf = append(f.buf, p...) + start := 0 + for i := len(f.buf) - len(p); i < len(f.buf); i++ { + if f.buf[i] == '\n' { + if f.check(f.buf[start:i]) { + _, err := f.WriteCloser.Write(f.buf[start : i+1]) + if err != nil { + return 0, err + } + } + start = i + 1 + } + } + if start > 0 { + copy(f.buf, f.buf[start:]) + f.buf = f.buf[:len(f.buf)-start] + } + return len(p), nil +} + +func (f *filterKeepstoreErrorsOnly) check(line []byte) bool { + if len(line) == 0 { + return false + } + if line[0] != '{' { + return true + } + var m map[string]interface{} + err := json.Unmarshal(line, &m) + if err != nil { + return true + } + if m["msg"] == "request" { + return false + } + if m["msg"] == "response" { + if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 { + return false + } + } + return true +}