15370: Re-enable docker tests.
[arvados.git] / lib / crunchrun / logging.go
index febfb1404d32b4f59776a97c45692d7b5c5ab0a9..76a55c4992bbd933085e83282391b3e3b241fb04 100644 (file)
@@ -7,6 +7,7 @@ package crunchrun
 import (
        "bufio"
        "bytes"
+       "encoding/json"
        "fmt"
        "io"
        "log"
@@ -335,7 +336,7 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
 
                arvlog.bytesLogged += lineSize
                arvlog.logThrottleBytesSoFar += lineSize
-               arvlog.logThrottleLinesSoFar += 1
+               arvlog.logThrottleLinesSoFar++
 
                if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
                        message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
@@ -404,3 +405,53 @@ func loadLogThrottleParams(clnt IArvadosClient) {
        loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
 
 }
+
+type filterKeepstoreErrorsOnly struct {
+       io.WriteCloser
+       buf []byte
+}
+
+func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
+       log.Printf("filterKeepstoreErrorsOnly: write %q", p)
+       f.buf = append(f.buf, p...)
+       start := 0
+       for i := len(f.buf) - len(p); i < len(f.buf); i++ {
+               if f.buf[i] == '\n' {
+                       if f.check(f.buf[start:i]) {
+                               _, err := f.WriteCloser.Write(f.buf[start : i+1])
+                               if err != nil {
+                                       return 0, err
+                               }
+                       }
+                       start = i + 1
+               }
+       }
+       if start > 0 {
+               copy(f.buf, f.buf[start:])
+               f.buf = f.buf[:len(f.buf)-start]
+       }
+       return len(p), nil
+}
+
+func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
+       if len(line) == 0 {
+               return false
+       }
+       if line[0] != '{' {
+               return true
+       }
+       var m map[string]interface{}
+       err := json.Unmarshal(line, &m)
+       if err != nil {
+               return true
+       }
+       if m["msg"] == "request" {
+               return false
+       }
+       if m["msg"] == "response" {
+               if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {
+                       return false
+               }
+       }
+       return true
+}