15467: Replace KeepServices with SbatchEnvironmentVariables
[arvados.git] / services / crunch-run / logging.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bufio"
9         "bytes"
10         "fmt"
11         "io"
12         "log"
13         "regexp"
14         "strings"
15         "sync"
16         "time"
17
18         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
19 )
20
21 // Timestamper is the signature for a function that takes a timestamp and
22 // return a formated string value.
23 type Timestamper func(t time.Time) string
24
25 // Logging plumbing:
26 //
27 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
28 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
29 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
30 //
31 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
32 // data from the stdout/stderr Reader and send to the Logger.
33
34 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
35 // write, and periodically flushes to a downstream writer.  It supports the
36 // "Logger" and "WriteCloser" interfaces.
37 type ThrottledLogger struct {
38         *log.Logger
39         buf *bytes.Buffer
40         sync.Mutex
41         writer   io.WriteCloser
42         flush    chan struct{}
43         stopped  chan struct{}
44         stopping chan struct{}
45         Timestamper
46         Immediate    *log.Logger
47         pendingFlush bool
48 }
49
50 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
51 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
52
53 // RFC3339Timestamp formats t as RFC3339NanoFixed.
54 func RFC3339Timestamp(t time.Time) string {
55         return t.Format(RFC3339NanoFixed)
56 }
57
58 // Write prepends a timestamp to each line of the input data and
59 // appends to the internal buffer. Each line is also logged to
60 // tl.Immediate, if tl.Immediate is not nil.
61 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
62         tl.Mutex.Lock()
63         defer tl.Mutex.Unlock()
64
65         if tl.buf == nil {
66                 tl.buf = &bytes.Buffer{}
67         }
68
69         now := tl.Timestamper(time.Now().UTC())
70         sc := bufio.NewScanner(bytes.NewBuffer(p))
71         for err == nil && sc.Scan() {
72                 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
73                 if tl.Immediate != nil {
74                         tl.Immediate.Print(out[:len(out)-1])
75                 }
76                 _, err = io.WriteString(tl.buf, out)
77         }
78         if err == nil {
79                 err = sc.Err()
80                 if err == nil {
81                         n = len(p)
82                 }
83         }
84
85         if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
86                 // Non-blocking send.  Try send a flush if it is ready to
87                 // accept it.  Otherwise do nothing because a flush is already
88                 // pending.
89                 select {
90                 case tl.flush <- struct{}{}:
91                 default:
92                 }
93         }
94
95         return
96 }
97
98 // Periodically check the current buffer; if not empty, send it on the
99 // channel to the goWriter goroutine.
100 func (tl *ThrottledLogger) flusher() {
101         ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
102         defer ticker.Stop()
103         for stopping := false; !stopping; {
104                 select {
105                 case <-tl.stopping:
106                         // flush tl.buf and exit the loop
107                         stopping = true
108                 case <-tl.flush:
109                 case <-ticker.C:
110                 }
111
112                 var ready *bytes.Buffer
113
114                 tl.Mutex.Lock()
115                 ready, tl.buf = tl.buf, &bytes.Buffer{}
116                 tl.Mutex.Unlock()
117
118                 if ready != nil && ready.Len() > 0 {
119                         tl.writer.Write(ready.Bytes())
120                 }
121         }
122         close(tl.stopped)
123 }
124
125 // Close the flusher goroutine and wait for it to complete, then close the
126 // underlying Writer.
127 func (tl *ThrottledLogger) Close() error {
128         select {
129         case <-tl.stopping:
130                 // already stopped
131         default:
132                 close(tl.stopping)
133         }
134         <-tl.stopped
135         return tl.writer.Close()
136 }
137
138 const (
139         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
140         MaxLogLine = 1 << 12
141 )
142
143 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
144 // line splitting.
145 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
146         reader := bufio.NewReaderSize(in, MaxLogLine)
147         var prefix string
148         for {
149                 line, isPrefix, err := reader.ReadLine()
150                 if err == io.EOF {
151                         break
152                 } else if err != nil {
153                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
154                 }
155                 var suffix string
156                 if isPrefix {
157                         suffix = "[...]\n"
158                 }
159
160                 if prefix == "" && suffix == "" {
161                         writer.Write(line)
162                 } else {
163                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
164                 }
165
166                 // Set up prefix for following line
167                 if isPrefix {
168                         prefix = "[...]"
169                 } else {
170                         prefix = ""
171                 }
172         }
173         done <- true
174 }
175
176 // NewThrottledLogger creates a new thottled logger that
177 // (a) prepends timestamps to each line
178 // (b) batches log messages and only calls the underlying Writer
179 //  at most once per "crunchLogSecondsBetweenEvents" seconds.
180 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
181         tl := &ThrottledLogger{}
182         tl.flush = make(chan struct{}, 1)
183         tl.stopped = make(chan struct{})
184         tl.stopping = make(chan struct{})
185         tl.writer = writer
186         tl.Logger = log.New(tl, "", 0)
187         tl.Timestamper = RFC3339Timestamp
188         go tl.flusher()
189         return tl
190 }
191
192 // Log throttling rate limiting config parameters
193 var crunchLimitLogBytesPerJob int64 = 67108864
194 var crunchLogThrottleBytes int64 = 65536
195 var crunchLogThrottlePeriod time.Duration = time.Second * 60
196 var crunchLogThrottleLines int64 = 1024
197 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
198 var crunchLogBytesPerEvent int64 = 4096
199 var crunchLogSecondsBetweenEvents = time.Second
200 var crunchLogUpdatePeriod = time.Hour / 2
201 var crunchLogUpdateSize = int64(1 << 25)
202
203 // ArvLogWriter is an io.WriteCloser that processes each write by
204 // writing it through to another io.WriteCloser (typically a
205 // CollectionFileWriter) and creating an Arvados log entry.
206 type ArvLogWriter struct {
207         ArvClient     IArvadosClient
208         UUID          string
209         loggingStream string
210         writeCloser   io.WriteCloser
211
212         // for rate limiting
213         bytesLogged                  int64
214         logThrottleResetTime         time.Time
215         logThrottleLinesSoFar        int64
216         logThrottleBytesSoFar        int64
217         logThrottleBytesSkipped      int64
218         logThrottleIsOpen            bool
219         logThrottlePartialLineNextAt time.Time
220         logThrottleFirstPartialLine  bool
221         bufToFlush                   bytes.Buffer
222         bufFlushedAt                 time.Time
223         closing                      bool
224 }
225
226 func (arvlog *ArvLogWriter) Write(p []byte) (int, error) {
227         // Write to the next writer in the chain (a file in Keep)
228         var err1 error
229         if arvlog.writeCloser != nil {
230                 _, err1 = arvlog.writeCloser.Write(p)
231         }
232
233         // write to API after checking rate limit
234         now := time.Now()
235
236         if now.After(arvlog.logThrottleResetTime) {
237                 // It has been more than throttle_period seconds since the last
238                 // checkpoint; so reset the throttle
239                 if arvlog.logThrottleBytesSkipped > 0 {
240                         arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
241                 }
242
243                 arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
244                 arvlog.logThrottleBytesSoFar = 0
245                 arvlog.logThrottleLinesSoFar = 0
246                 arvlog.logThrottleBytesSkipped = 0
247                 arvlog.logThrottleIsOpen = true
248         }
249
250         lines := bytes.Split(p, []byte("\n"))
251
252         for _, line := range lines {
253                 // Short circuit the counting code if we're just going to throw
254                 // away the data anyway.
255                 if !arvlog.logThrottleIsOpen {
256                         arvlog.logThrottleBytesSkipped += int64(len(line))
257                         continue
258                 } else if len(line) == 0 {
259                         continue
260                 }
261
262                 // check rateLimit
263                 logOpen, msg := arvlog.rateLimit(line, now)
264                 if logOpen {
265                         arvlog.bufToFlush.WriteString(string(msg) + "\n")
266                 }
267         }
268
269         if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
270                 (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
271                 arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
272                 // write to API
273                 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
274                         "object_uuid": arvlog.UUID,
275                         "event_type":  arvlog.loggingStream,
276                         "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
277                 err2 := arvlog.ArvClient.Create("logs", lr, nil)
278
279                 arvlog.bufToFlush = bytes.Buffer{}
280                 arvlog.bufFlushedAt = now
281
282                 if err1 != nil || err2 != nil {
283                         return 0, fmt.Errorf("%s ; %s", err1, err2)
284                 }
285         }
286
287         return len(p), nil
288 }
289
290 // Close the underlying writer
291 func (arvlog *ArvLogWriter) Close() (err error) {
292         arvlog.closing = true
293         arvlog.Write([]byte{})
294         if arvlog.writeCloser != nil {
295                 err = arvlog.writeCloser.Close()
296                 arvlog.writeCloser = nil
297         }
298         return err
299 }
300
301 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
302
303 // Test for hard cap on total output and for log throttling. Returns whether
304 // the log line should go to output or not. Returns message if limit exceeded.
305 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
306         message := ""
307         lineSize := int64(len(line))
308
309         if arvlog.logThrottleIsOpen {
310                 matches := lineRegexp.FindStringSubmatch(string(line))
311
312                 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
313                         // This is a partial line.
314
315                         if arvlog.logThrottleFirstPartialLine {
316                                 // Partial should be suppressed.  First time this is happening for this line so provide a message instead.
317                                 arvlog.logThrottleFirstPartialLine = false
318                                 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
319                                 arvlog.logThrottleBytesSkipped += lineSize
320                                 return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
321                                         RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
322                         } else if now.After(arvlog.logThrottlePartialLineNextAt) {
323                                 // The throttle period has passed.  Update timestamp and let it through.
324                                 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
325                         } else {
326                                 // Suppress line.
327                                 arvlog.logThrottleBytesSkipped += lineSize
328                                 return false, line
329                         }
330                 } else {
331                         // Not a partial line so reset.
332                         arvlog.logThrottlePartialLineNextAt = time.Time{}
333                         arvlog.logThrottleFirstPartialLine = true
334                 }
335
336                 arvlog.bytesLogged += lineSize
337                 arvlog.logThrottleBytesSoFar += lineSize
338                 arvlog.logThrottleLinesSoFar += 1
339
340                 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
341                         message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
342                                 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
343                         arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
344                         arvlog.logThrottleIsOpen = false
345
346                 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
347                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
348                         message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
349                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
350                         arvlog.logThrottleIsOpen = false
351
352                 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
353                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
354                         message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
355                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
356                         arvlog.logThrottleIsOpen = false
357
358                 }
359         }
360
361         if !arvlog.logThrottleIsOpen {
362                 // Don't log anything if any limit has been exceeded. Just count lossage.
363                 arvlog.logThrottleBytesSkipped += lineSize
364         }
365
366         if message != "" {
367                 // Yes, write to logs, but use our "rate exceeded" message
368                 // instead of the log message that exceeded the limit.
369                 message += " A complete log is still being written to Keep, and will be available when the job finishes."
370                 return true, []byte(message)
371         } else {
372                 return arvlog.logThrottleIsOpen, line
373         }
374 }
375
376 // load the rate limit discovery config parameters
377 func loadLogThrottleParams(clnt IArvadosClient) {
378         loadDuration := func(dst *time.Duration, key string) {
379                 if param, err := clnt.Discovery(key); err != nil {
380                         return
381                 } else if d, ok := param.(float64); !ok {
382                         return
383                 } else {
384                         *dst = time.Duration(d) * time.Second
385                 }
386         }
387         loadInt64 := func(dst *int64, key string) {
388                 if param, err := clnt.Discovery(key); err != nil {
389                         return
390                 } else if val, ok := param.(float64); !ok {
391                         return
392                 } else {
393                         *dst = int64(val)
394                 }
395         }
396
397         loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
398         loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
399         loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
400         loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
401         loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
402         loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
403         loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
404         loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
405         loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
406
407 }