X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/045e3127cb48845c7d988d01488c055f02ae2ec3..3433f306caf560017377b32adf7a23842ba9ab31:/lib/crunchrun/logging.go diff --git a/lib/crunchrun/logging.go b/lib/crunchrun/logging.go index 050894383d..91a1b77cf4 100644 --- a/lib/crunchrun/logging.go +++ b/lib/crunchrun/logging.go @@ -7,6 +7,7 @@ package crunchrun import ( "bufio" "bytes" + "encoding/json" "fmt" "io" "log" @@ -174,9 +175,9 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) { } // NewThrottledLogger creates a new thottled logger that -// (a) prepends timestamps to each line -// (b) batches log messages and only calls the underlying Writer -// at most once per "crunchLogSecondsBetweenEvents" seconds. +// - prepends timestamps to each line, and +// - batches log messages and only calls the underlying Writer +// at most once per "crunchLogSecondsBetweenEvents" seconds. func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { tl := &ThrottledLogger{} tl.flush = make(chan struct{}, 1) @@ -404,3 +405,53 @@ func loadLogThrottleParams(clnt IArvadosClient) { loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod") } + +type filterKeepstoreErrorsOnly struct { + io.WriteCloser + buf []byte +} + +func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) { + log.Printf("filterKeepstoreErrorsOnly: write %q", p) + f.buf = append(f.buf, p...) + start := 0 + for i := len(f.buf) - len(p); i < len(f.buf); i++ { + if f.buf[i] == '\n' { + if f.check(f.buf[start:i]) { + _, err := f.WriteCloser.Write(f.buf[start : i+1]) + if err != nil { + return 0, err + } + } + start = i + 1 + } + } + if start > 0 { + copy(f.buf, f.buf[start:]) + f.buf = f.buf[:len(f.buf)-start] + } + return len(p), nil +} + +func (f *filterKeepstoreErrorsOnly) check(line []byte) bool { + if len(line) == 0 { + return false + } + if line[0] != '{' { + return true + } + var m map[string]interface{} + err := json.Unmarshal(line, &m) + if err != nil { + return true + } + if m["msg"] == "request" { + return false + } + if m["msg"] == "response" { + if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 { + return false + } + } + return true +}