21891: Move string concatenations out of per-segment loop.
[arvados.git] / lib / crunchrun / logging.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "bytes"
9         "encoding/json"
10         "io"
11         "log"
12         "time"
13 )
14
15 // rfc3339NanoFixed is a fixed-width version of time.RFC3339Nano.
16 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
17
18 // prefixer wraps an io.Writer, inserting a string returned by
19 // prefixFunc at the beginning of each line.
20 type prefixer struct {
21         writer     io.Writer
22         prefixFunc func() string
23         unfinished bool // true if the most recent write ended with a non-newline char
24 }
25
26 // newTimestamper wraps an io.Writer, inserting an RFC3339NanoFixed
27 // timestamp at the beginning of each line.
28 func newTimestamper(w io.Writer) *prefixer {
29         return &prefixer{
30                 writer:     w,
31                 prefixFunc: func() string { return time.Now().UTC().Format(rfc3339NanoFixed + " ") },
32         }
33 }
34
35 // newStringPrefixer wraps an io.Writer, inserting the given string at
36 // the beginning of each line. The given string should include a
37 // trailing space for readability.
38 func newStringPrefixer(w io.Writer, s string) *prefixer {
39         return &prefixer{
40                 writer:     w,
41                 prefixFunc: func() string { return s },
42         }
43 }
44
45 func (tp *prefixer) Write(p []byte) (n int, err error) {
46         for len(p) > 0 && err == nil {
47                 if !tp.unfinished {
48                         _, err = io.WriteString(tp.writer, tp.prefixFunc())
49                         if err != nil {
50                                 return
51                         }
52                 }
53                 newline := bytes.IndexRune(p, '\n')
54                 var nn int
55                 if newline < 0 {
56                         tp.unfinished = true
57                         nn, err = tp.writer.Write(p)
58                         p = nil
59                 } else {
60                         tp.unfinished = false
61                         nn, err = tp.writer.Write(p[:newline+1])
62                         p = p[nn:]
63                 }
64                 n += nn
65         }
66         return
67 }
68
69 // logWriter adds log.Logger methods to an io.Writer.
70 type logWriter struct {
71         io.Writer
72         *log.Logger
73 }
74
75 func newLogWriter(w io.Writer) *logWriter {
76         return &logWriter{
77                 Writer: w,
78                 Logger: log.New(w, "", 0),
79         }
80 }
81
82 var crunchLogUpdatePeriod = time.Hour / 2
83 var crunchLogUpdateSize = int64(1 << 25)
84
85 // load the rate limit discovery config parameters
86 func loadLogThrottleParams(clnt IArvadosClient) {
87         loadDuration := func(dst *time.Duration, key string) {
88                 if param, err := clnt.Discovery(key); err != nil {
89                         return
90                 } else if d, ok := param.(float64); !ok {
91                         return
92                 } else {
93                         *dst = time.Duration(d) * time.Second
94                 }
95         }
96         loadInt64 := func(dst *int64, key string) {
97                 if param, err := clnt.Discovery(key); err != nil {
98                         return
99                 } else if val, ok := param.(float64); !ok {
100                         return
101                 } else {
102                         *dst = int64(val)
103                 }
104         }
105
106         loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
107         loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
108
109 }
110
111 type filterKeepstoreErrorsOnly struct {
112         io.WriteCloser
113         buf []byte
114 }
115
116 func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
117         log.Printf("filterKeepstoreErrorsOnly: write %q", p)
118         f.buf = append(f.buf, p...)
119         start := 0
120         for i := len(f.buf) - len(p); i < len(f.buf); i++ {
121                 if f.buf[i] == '\n' {
122                         if f.check(f.buf[start:i]) {
123                                 _, err := f.WriteCloser.Write(f.buf[start : i+1])
124                                 if err != nil {
125                                         return 0, err
126                                 }
127                         }
128                         start = i + 1
129                 }
130         }
131         if start > 0 {
132                 copy(f.buf, f.buf[start:])
133                 f.buf = f.buf[:len(f.buf)-start]
134         }
135         return len(p), nil
136 }
137
138 func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
139         if len(line) == 0 {
140                 return false
141         }
142         if line[0] != '{' {
143                 return true
144         }
145         var m map[string]interface{}
146         err := json.Unmarshal(line, &m)
147         if err != nil {
148                 return true
149         }
150         if m["msg"] == "request" {
151                 return false
152         }
153         if m["msg"] == "response" {
154                 if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {
155                         return false
156                 }
157         }
158         return true
159 }