15370: Re-enable docker tests.
[arvados.git] / lib / crunchrun / logging.go
index 050894383d757a1e90e79999ef9fa8f2f08b9f01..76a55c4992bbd933085e83282391b3e3b241fb04 100644 (file)
@@ -7,6 +7,7 @@ package crunchrun
 import (
        "bufio"
        "bytes"
+       "encoding/json"
        "fmt"
        "io"
        "log"
@@ -404,3 +405,53 @@ func loadLogThrottleParams(clnt IArvadosClient) {
        loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
 
 }
+
+type filterKeepstoreErrorsOnly struct {
+       io.WriteCloser
+       buf []byte
+}
+
+func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
+       log.Printf("filterKeepstoreErrorsOnly: write %q", p)
+       f.buf = append(f.buf, p...)
+       start := 0
+       for i := len(f.buf) - len(p); i < len(f.buf); i++ {
+               if f.buf[i] == '\n' {
+                       if f.check(f.buf[start:i]) {
+                               _, err := f.WriteCloser.Write(f.buf[start : i+1])
+                               if err != nil {
+                                       return 0, err
+                               }
+                       }
+                       start = i + 1
+               }
+       }
+       if start > 0 {
+               copy(f.buf, f.buf[start:])
+               f.buf = f.buf[:len(f.buf)-start]
+       }
+       return len(p), nil
+}
+
+func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
+       if len(line) == 0 {
+               return false
+       }
+       if line[0] != '{' {
+               return true
+       }
+       var m map[string]interface{}
+       err := json.Unmarshal(line, &m)
+       if err != nil {
+               return true
+       }
+       if m["msg"] == "request" {
+               return false
+       }
+       if m["msg"] == "response" {
+               if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {
+                       return false
+               }
+       }
+       return true
+}