Merge branch 'master' into origin-8019-crunchrun-log-throttle
[arvados.git] / services / crunch-run / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "io"
8         "log"
9         "regexp"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15 )
16
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
20
21 // Logging plumbing:
22 //
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
26 //
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
29
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer.  It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
34         *log.Logger
35         buf *bytes.Buffer
36         sync.Mutex
37         writer  io.WriteCloser
38         flush   chan struct{}
39         stopped chan struct{}
40         Timestamper
41         Immediate    *log.Logger
42         pendingFlush bool
43 }
44
45 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
46 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
47
48 // RFC3339Timestamp formats t as RFC3339NanoFixed.
49 func RFC3339Timestamp(t time.Time) string {
50         return t.Format(RFC3339NanoFixed)
51 }
52
53 // Write prepends a timestamp to each line of the input data and
54 // appends to the internal buffer. Each line is also logged to
55 // tl.Immediate, if tl.Immediate is not nil.
56 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
57         tl.Mutex.Lock()
58         defer tl.Mutex.Unlock()
59
60         if tl.buf == nil {
61                 tl.buf = &bytes.Buffer{}
62         }
63
64         now := tl.Timestamper(time.Now().UTC())
65         sc := bufio.NewScanner(bytes.NewBuffer(p))
66         for err == nil && sc.Scan() {
67                 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
68                 if tl.Immediate != nil {
69                         tl.Immediate.Print(out[:len(out)-1])
70                 }
71                 _, err = io.WriteString(tl.buf, out)
72         }
73         if err == nil {
74                 err = sc.Err()
75                 if err == nil {
76                         n = len(p)
77                 }
78         }
79
80         if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
81                 // Non-blocking send.  Try send a flush if it is ready to
82                 // accept it.  Otherwise do nothing because a flush is already
83                 // pending.
84                 select {
85                 case tl.flush <- struct{}{}:
86                 default:
87                 }
88         }
89
90         return
91 }
92
93 // Periodically check the current buffer; if not empty, send it on the
94 // channel to the goWriter goroutine.
95 func (tl *ThrottledLogger) flusher() {
96         ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
97         defer ticker.Stop()
98         for stopping := false; !stopping; {
99                 select {
100                 case _, open := <-tl.flush:
101                         // if !open, will flush tl.buf and exit the loop
102                         stopping = !open
103                 case <-ticker.C:
104                 }
105
106                 var ready *bytes.Buffer
107
108                 tl.Mutex.Lock()
109                 ready, tl.buf = tl.buf, &bytes.Buffer{}
110                 tl.Mutex.Unlock()
111
112                 if ready != nil && ready.Len() > 0 {
113                         tl.writer.Write(ready.Bytes())
114                 }
115         }
116         close(tl.stopped)
117 }
118
119 // Close the flusher goroutine and wait for it to complete, then close the
120 // underlying Writer.
121 func (tl *ThrottledLogger) Close() error {
122         select {
123         case <-tl.flush:
124                 // already stopped
125         default:
126                 close(tl.flush)
127         }
128         <-tl.stopped
129         return tl.writer.Close()
130 }
131
132 const (
133         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
134         MaxLogLine = 1 << 12
135 )
136
137 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
138 // line splitting.
139 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
140         reader := bufio.NewReaderSize(in, MaxLogLine)
141         var prefix string
142         for {
143                 line, isPrefix, err := reader.ReadLine()
144                 if err == io.EOF {
145                         break
146                 } else if err != nil {
147                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
148                 }
149                 var suffix string
150                 if isPrefix {
151                         suffix = "[...]\n"
152                 }
153
154                 if prefix == "" && suffix == "" {
155                         writer.Write(line)
156                 } else {
157                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
158                 }
159
160                 // Set up prefix for following line
161                 if isPrefix {
162                         prefix = "[...]"
163                 } else {
164                         prefix = ""
165                 }
166         }
167         done <- true
168 }
169
170 // NewThrottledLogger creates a new thottled logger that
171 // (a) prepends timestamps to each line
172 // (b) batches log messages and only calls the underlying Writer
173 //  at most once per "crunchLogSecondsBetweenEvents" seconds.
174 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
175         tl := &ThrottledLogger{}
176         tl.flush = make(chan struct{}, 1)
177         tl.stopped = make(chan struct{})
178         tl.writer = writer
179         tl.Logger = log.New(tl, "", 0)
180         tl.Timestamper = RFC3339Timestamp
181         go tl.flusher()
182         return tl
183 }
184
185 // Log throttling rate limiting config parameters
186 var crunchLimitLogBytesPerJob int64 = 67108864
187 var crunchLogThrottleBytes int64 = 65536
188 var crunchLogThrottlePeriod time.Duration = time.Second * 60
189 var crunchLogThrottleLines int64 = 1024
190 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
191 var crunchLogBytesPerEvent int64 = 4096
192 var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
193
194 // ArvLogWriter is an io.WriteCloser that processes each write by
195 // writing it through to another io.WriteCloser (typically a
196 // CollectionFileWriter) and creating an Arvados log entry.
197 type ArvLogWriter struct {
198         ArvClient     IArvadosClient
199         UUID          string
200         loggingStream string
201         writeCloser   io.WriteCloser
202
203         // for rate limiting
204         bytesLogged                  int64
205         logThrottleResetTime         time.Time
206         logThrottleLinesSoFar        int64
207         logThrottleBytesSoFar        int64
208         logThrottleBytesSkipped      int64
209         logThrottleIsOpen            bool
210         logThrottlePartialLineNextAt time.Time
211         logThrottleFirstPartialLine  bool
212         bufToFlush                   bytes.Buffer
213         bufFlushedAt                 time.Time
214 }
215
216 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
217         // Write to the next writer in the chain (a file in Keep)
218         var err1 error
219         if arvlog.writeCloser != nil {
220                 _, err1 = arvlog.writeCloser.Write(p)
221         }
222
223         // write to API after checking rate limit
224         now := time.Now()
225         bytesWritten := 0
226
227         if now.After(arvlog.logThrottleResetTime) {
228                 // It has been more than throttle_period seconds since the last
229                 // checkpoint; so reset the throttle
230                 if arvlog.logThrottleBytesSkipped > 0 {
231                         arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
232                 }
233
234                 arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
235                 arvlog.logThrottleBytesSoFar = 0
236                 arvlog.logThrottleLinesSoFar = 0
237                 arvlog.logThrottleBytesSkipped = 0
238                 arvlog.logThrottleIsOpen = true
239         }
240
241         lines := bytes.Split(p, []byte("\n"))
242
243         for _, line := range lines {
244                 // Short circuit the counting code if we're just going to throw
245                 // away the data anyway.
246                 if !arvlog.logThrottleIsOpen {
247                         arvlog.logThrottleBytesSkipped += int64(len(line))
248                         continue
249                 } else if len(line) == 0 {
250                         continue
251                 }
252
253                 // check rateLimit
254                 logOpen, msg := arvlog.rateLimit(line, now)
255                 if logOpen {
256                         arvlog.bufToFlush.WriteString(string(msg) + "\n")
257                 }
258         }
259
260         if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
261                 (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) {
262                 // write to API
263                 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
264                         "object_uuid": arvlog.UUID,
265                         "event_type":  arvlog.loggingStream,
266                         "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
267                 err2 := arvlog.ArvClient.Create("logs", lr, nil)
268
269                 bytesWritten = arvlog.bufToFlush.Len()
270                 arvlog.bufToFlush = bytes.Buffer{}
271                 arvlog.bufFlushedAt = now
272
273                 if err1 != nil || err2 != nil {
274                         return 0, fmt.Errorf("%s ; %s", err1, err2)
275                 }
276         }
277
278         return bytesWritten, nil
279 }
280
281 // Close the underlying writer
282 func (arvlog *ArvLogWriter) Close() (err error) {
283         if arvlog.writeCloser != nil {
284                 err = arvlog.writeCloser.Close()
285                 arvlog.writeCloser = nil
286         }
287         return err
288 }
289
290 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
291
292 // Test for hard cap on total output and for log throttling. Returns whether
293 // the log line should go to output or not. Returns message if limit exceeded.
294 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
295         message := ""
296         lineSize := int64(len(line))
297
298         if arvlog.logThrottleIsOpen {
299                 matches := lineRegexp.FindStringSubmatch(string(line))
300
301                 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
302                         // This is a partial line.
303
304                         if arvlog.logThrottleFirstPartialLine {
305                                 // Partial should be suppressed.  First time this is happening for this line so provide a message instead.
306                                 arvlog.logThrottleFirstPartialLine = false
307                                 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
308                                 arvlog.logThrottleBytesSkipped += lineSize
309                                 return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
310                                         RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
311                         } else if now.After(arvlog.logThrottlePartialLineNextAt) {
312                                 // The throttle period has passed.  Update timestamp and let it through.
313                                 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
314                         } else {
315                                 // Suppress line.
316                                 arvlog.logThrottleBytesSkipped += lineSize
317                                 return false, line
318                         }
319                 } else {
320                         // Not a partial line so reset.
321                         arvlog.logThrottlePartialLineNextAt = time.Time{}
322                         arvlog.logThrottleFirstPartialLine = true
323                 }
324
325                 arvlog.bytesLogged += lineSize
326                 arvlog.logThrottleBytesSoFar += lineSize
327                 arvlog.logThrottleLinesSoFar += 1
328
329                 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
330                         message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
331                                 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
332                         arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
333                         arvlog.logThrottleIsOpen = false
334
335                 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
336                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
337                         message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
338                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
339                         arvlog.logThrottleIsOpen = false
340
341                 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
342                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
343                         message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
344                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
345                         arvlog.logThrottleIsOpen = false
346
347                 }
348         }
349
350         if !arvlog.logThrottleIsOpen {
351                 // Don't log anything if any limit has been exceeded. Just count lossage.
352                 arvlog.logThrottleBytesSkipped += lineSize
353         }
354
355         if message != "" {
356                 // Yes, write to logs, but use our "rate exceeded" message
357                 // instead of the log message that exceeded the limit.
358                 message += " A complete log is still being written to Keep, and will be available when the job finishes."
359                 return true, []byte(message)
360         } else {
361                 return arvlog.logThrottleIsOpen, line
362         }
363 }
364
365 // load the rate limit discovery config paramters
366 func loadLogThrottleParams(clnt IArvadosClient) {
367         param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
368         if err == nil {
369                 crunchLimitLogBytesPerJob = int64(param.(float64))
370         }
371
372         param, err = clnt.Discovery("crunchLogThrottleBytes")
373         if err == nil {
374                 crunchLogThrottleBytes = int64(param.(float64))
375         }
376
377         param, err = clnt.Discovery("crunchLogThrottlePeriod")
378         if err == nil {
379                 crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
380         }
381
382         param, err = clnt.Discovery("crunchLogThrottleLines")
383         if err == nil {
384                 crunchLogThrottleLines = int64(param.(float64))
385         }
386
387         param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
388         if err == nil {
389                 crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
390         }
391
392         param, err = clnt.Discovery("crunchLogBytesPerEvent")
393         if err == nil {
394                 crunchLogBytesPerEvent = int64(param.(float64))
395         }
396
397         param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
398         if err == nil {
399                 crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
400         }
401 }