Merge branch '20937-arv-copy-http' refs #20937
[arvados.git] / lib / crunchrun / logging.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "bufio"
9         "bytes"
10         "encoding/json"
11         "fmt"
12         "io"
13         "log"
14         "regexp"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
20 )
21
22 // Timestamper is the signature for a function that takes a timestamp and
23 // return a formated string value.
24 type Timestamper func(t time.Time) string
25
26 // Logging plumbing:
27 //
28 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
29 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
30 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
31 //
32 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
33 // data from the stdout/stderr Reader and send to the Logger.
34
35 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
36 // write, and periodically flushes to a downstream writer.  It supports the
37 // "Logger" and "WriteCloser" interfaces.
38 type ThrottledLogger struct {
39         *log.Logger
40         buf *bytes.Buffer
41         sync.Mutex
42         writer   io.WriteCloser
43         flush    chan struct{}
44         stopped  chan struct{}
45         stopping chan struct{}
46         Timestamper
47         Immediate    *log.Logger
48         pendingFlush bool
49 }
50
51 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
52 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
53
54 // RFC3339Timestamp formats t as RFC3339NanoFixed.
55 func RFC3339Timestamp(t time.Time) string {
56         return t.Format(RFC3339NanoFixed)
57 }
58
59 // Write prepends a timestamp to each line of the input data and
60 // appends to the internal buffer. Each line is also logged to
61 // tl.Immediate, if tl.Immediate is not nil.
62 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
63         tl.Mutex.Lock()
64         defer tl.Mutex.Unlock()
65
66         if tl.buf == nil {
67                 tl.buf = &bytes.Buffer{}
68         }
69
70         now := tl.Timestamper(time.Now().UTC())
71         sc := bufio.NewScanner(bytes.NewBuffer(p))
72         for err == nil && sc.Scan() {
73                 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
74                 if tl.Immediate != nil {
75                         tl.Immediate.Print(out[:len(out)-1])
76                 }
77                 _, err = io.WriteString(tl.buf, out)
78         }
79         if err == nil {
80                 err = sc.Err()
81                 if err == nil {
82                         n = len(p)
83                 }
84         }
85
86         if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
87                 // Non-blocking send.  Try send a flush if it is ready to
88                 // accept it.  Otherwise do nothing because a flush is already
89                 // pending.
90                 select {
91                 case tl.flush <- struct{}{}:
92                 default:
93                 }
94         }
95
96         return
97 }
98
99 // Periodically check the current buffer; if not empty, send it on the
100 // channel to the goWriter goroutine.
101 func (tl *ThrottledLogger) flusher() {
102         ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
103         defer ticker.Stop()
104         for stopping := false; !stopping; {
105                 select {
106                 case <-tl.stopping:
107                         // flush tl.buf and exit the loop
108                         stopping = true
109                 case <-tl.flush:
110                 case <-ticker.C:
111                 }
112
113                 var ready *bytes.Buffer
114
115                 tl.Mutex.Lock()
116                 ready, tl.buf = tl.buf, &bytes.Buffer{}
117                 tl.Mutex.Unlock()
118
119                 if ready != nil && ready.Len() > 0 {
120                         tl.writer.Write(ready.Bytes())
121                 }
122         }
123         close(tl.stopped)
124 }
125
126 // Close the flusher goroutine and wait for it to complete, then close the
127 // underlying Writer.
128 func (tl *ThrottledLogger) Close() error {
129         select {
130         case <-tl.stopping:
131                 // already stopped
132         default:
133                 close(tl.stopping)
134         }
135         <-tl.stopped
136         return tl.writer.Close()
137 }
138
139 const (
140         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
141         MaxLogLine = 1 << 12
142 )
143
144 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
145 // line splitting.
146 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
147         reader := bufio.NewReaderSize(in, MaxLogLine)
148         var prefix string
149         for {
150                 line, isPrefix, err := reader.ReadLine()
151                 if err == io.EOF {
152                         break
153                 } else if err != nil {
154                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
155                 }
156                 var suffix string
157                 if isPrefix {
158                         suffix = "[...]\n"
159                 }
160
161                 if prefix == "" && suffix == "" {
162                         writer.Write(line)
163                 } else {
164                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
165                 }
166
167                 // Set up prefix for following line
168                 if isPrefix {
169                         prefix = "[...]"
170                 } else {
171                         prefix = ""
172                 }
173         }
174         done <- true
175 }
176
177 // NewThrottledLogger creates a new thottled logger that
178 //   - prepends timestamps to each line, and
179 //   - batches log messages and only calls the underlying Writer
180 //     at most once per "crunchLogSecondsBetweenEvents" seconds.
181 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
182         tl := &ThrottledLogger{}
183         tl.flush = make(chan struct{}, 1)
184         tl.stopped = make(chan struct{})
185         tl.stopping = make(chan struct{})
186         tl.writer = writer
187         tl.Logger = log.New(tl, "", 0)
188         tl.Timestamper = RFC3339Timestamp
189         go tl.flusher()
190         return tl
191 }
192
193 // Log throttling rate limiting config parameters
194 var crunchLimitLogBytesPerJob int64 = 67108864
195 var crunchLogThrottleBytes int64 = 65536
196 var crunchLogThrottlePeriod time.Duration = time.Second * 60
197 var crunchLogThrottleLines int64 = 1024
198 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
199 var crunchLogBytesPerEvent int64 = 4096
200 var crunchLogSecondsBetweenEvents = time.Second
201 var crunchLogUpdatePeriod = time.Hour / 2
202 var crunchLogUpdateSize = int64(1 << 25)
203
204 // ArvLogWriter is an io.WriteCloser that processes each write by
205 // writing it through to another io.WriteCloser (typically a
206 // CollectionFileWriter) and creating an Arvados log entry.
207 type ArvLogWriter struct {
208         ArvClient     IArvadosClient
209         UUID          string
210         loggingStream string
211         writeCloser   io.WriteCloser
212
213         // for rate limiting
214         bytesLogged                  int64
215         logThrottleResetTime         time.Time
216         logThrottleLinesSoFar        int64
217         logThrottleBytesSoFar        int64
218         logThrottleBytesSkipped      int64
219         logThrottleIsOpen            bool
220         logThrottlePartialLineNextAt time.Time
221         logThrottleFirstPartialLine  bool
222         bufToFlush                   bytes.Buffer
223         bufFlushedAt                 time.Time
224         closing                      bool
225 }
226
227 func (arvlog *ArvLogWriter) Write(p []byte) (int, error) {
228         // Write to the next writer in the chain (a file in Keep)
229         var err1 error
230         if arvlog.writeCloser != nil {
231                 _, err1 = arvlog.writeCloser.Write(p)
232         }
233
234         // write to API after checking rate limit
235         now := time.Now()
236
237         if now.After(arvlog.logThrottleResetTime) {
238                 // It has been more than throttle_period seconds since the last
239                 // checkpoint; so reset the throttle
240                 if arvlog.logThrottleBytesSkipped > 0 {
241                         arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
242                 }
243
244                 arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
245                 arvlog.logThrottleBytesSoFar = 0
246                 arvlog.logThrottleLinesSoFar = 0
247                 arvlog.logThrottleBytesSkipped = 0
248                 arvlog.logThrottleIsOpen = true
249         }
250
251         lines := bytes.Split(p, []byte("\n"))
252
253         for _, line := range lines {
254                 // Short circuit the counting code if we're just going to throw
255                 // away the data anyway.
256                 if !arvlog.logThrottleIsOpen {
257                         arvlog.logThrottleBytesSkipped += int64(len(line))
258                         continue
259                 } else if len(line) == 0 {
260                         continue
261                 }
262
263                 // check rateLimit
264                 logOpen, msg := arvlog.rateLimit(line, now)
265                 if logOpen {
266                         arvlog.bufToFlush.WriteString(string(msg) + "\n")
267                 }
268         }
269
270         if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
271                 (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
272                 arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
273                 // write to API
274                 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
275                         "object_uuid": arvlog.UUID,
276                         "event_type":  arvlog.loggingStream,
277                         "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
278                 err2 := arvlog.ArvClient.Create("logs", lr, nil)
279
280                 arvlog.bufToFlush = bytes.Buffer{}
281                 arvlog.bufFlushedAt = now
282
283                 if err1 != nil || err2 != nil {
284                         return 0, fmt.Errorf("%s ; %s", err1, err2)
285                 }
286         }
287
288         return len(p), nil
289 }
290
291 // Close the underlying writer
292 func (arvlog *ArvLogWriter) Close() (err error) {
293         arvlog.closing = true
294         arvlog.Write([]byte{})
295         if arvlog.writeCloser != nil {
296                 err = arvlog.writeCloser.Close()
297                 arvlog.writeCloser = nil
298         }
299         return err
300 }
301
302 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
303
304 // Test for hard cap on total output and for log throttling. Returns whether
305 // the log line should go to output or not. Returns message if limit exceeded.
306 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
307         message := ""
308         lineSize := int64(len(line))
309
310         if arvlog.logThrottleIsOpen {
311                 matches := lineRegexp.FindStringSubmatch(string(line))
312
313                 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
314                         // This is a partial line.
315
316                         if arvlog.logThrottleFirstPartialLine {
317                                 // Partial should be suppressed.  First time this is happening for this line so provide a message instead.
318                                 arvlog.logThrottleFirstPartialLine = false
319                                 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
320                                 arvlog.logThrottleBytesSkipped += lineSize
321                                 return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
322                                         RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
323                         } else if now.After(arvlog.logThrottlePartialLineNextAt) {
324                                 // The throttle period has passed.  Update timestamp and let it through.
325                                 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
326                         } else {
327                                 // Suppress line.
328                                 arvlog.logThrottleBytesSkipped += lineSize
329                                 return false, line
330                         }
331                 } else {
332                         // Not a partial line so reset.
333                         arvlog.logThrottlePartialLineNextAt = time.Time{}
334                         arvlog.logThrottleFirstPartialLine = true
335                 }
336
337                 arvlog.bytesLogged += lineSize
338                 arvlog.logThrottleBytesSoFar += lineSize
339                 arvlog.logThrottleLinesSoFar++
340
341                 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
342                         message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
343                                 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
344                         arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
345                         arvlog.logThrottleIsOpen = false
346
347                 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
348                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
349                         message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
350                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
351                         arvlog.logThrottleIsOpen = false
352
353                 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
354                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
355                         message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
356                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
357                         arvlog.logThrottleIsOpen = false
358
359                 }
360         }
361
362         if !arvlog.logThrottleIsOpen {
363                 // Don't log anything if any limit has been exceeded. Just count lossage.
364                 arvlog.logThrottleBytesSkipped += lineSize
365         }
366
367         if message != "" {
368                 // Yes, write to logs, but use our "rate exceeded" message
369                 // instead of the log message that exceeded the limit.
370                 message += " A complete log is still being written to Keep, and will be available when the job finishes."
371                 return true, []byte(message)
372         }
373         return arvlog.logThrottleIsOpen, line
374 }
375
376 // load the rate limit discovery config parameters
377 func loadLogThrottleParams(clnt IArvadosClient) {
378         loadDuration := func(dst *time.Duration, key string) {
379                 if param, err := clnt.Discovery(key); err != nil {
380                         return
381                 } else if d, ok := param.(float64); !ok {
382                         return
383                 } else {
384                         *dst = time.Duration(d) * time.Second
385                 }
386         }
387         loadInt64 := func(dst *int64, key string) {
388                 if param, err := clnt.Discovery(key); err != nil {
389                         return
390                 } else if val, ok := param.(float64); !ok {
391                         return
392                 } else {
393                         *dst = int64(val)
394                 }
395         }
396
397         loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
398         loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
399         loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
400         loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
401         loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
402         loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
403         loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
404         loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
405         loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
406
407 }
408
409 type filterKeepstoreErrorsOnly struct {
410         io.WriteCloser
411         buf []byte
412 }
413
414 func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
415         log.Printf("filterKeepstoreErrorsOnly: write %q", p)
416         f.buf = append(f.buf, p...)
417         start := 0
418         for i := len(f.buf) - len(p); i < len(f.buf); i++ {
419                 if f.buf[i] == '\n' {
420                         if f.check(f.buf[start:i]) {
421                                 _, err := f.WriteCloser.Write(f.buf[start : i+1])
422                                 if err != nil {
423                                         return 0, err
424                                 }
425                         }
426                         start = i + 1
427                 }
428         }
429         if start > 0 {
430                 copy(f.buf, f.buf[start:])
431                 f.buf = f.buf[:len(f.buf)-start]
432         }
433         return len(p), nil
434 }
435
436 func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
437         if len(line) == 0 {
438                 return false
439         }
440         if line[0] != '{' {
441                 return true
442         }
443         var m map[string]interface{}
444         err := json.Unmarshal(line, &m)
445         if err != nil {
446                 return true
447         }
448         if m["msg"] == "request" {
449                 return false
450         }
451         if m["msg"] == "response" {
452                 if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {
453                         return false
454                 }
455         }
456         return true
457 }