Merge branch 'master' into 8019-crunchrun-log-throttle
[arvados.git] / services / crunch-run / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "io"
8         "log"
9         "regexp"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15 )
16
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
20
21 // Logging plumbing:
22 //
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
26 //
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
29
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer.  It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
34         *log.Logger
35         buf *bytes.Buffer
36         sync.Mutex
37         writer   io.WriteCloser
38         stopping chan struct{}
39         stopped  chan struct{}
40         Timestamper
41         Immediate *log.Logger
42 }
43
44 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
45 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
46
47 // RFC3339Timestamp formats t as RFC3339NanoFixed.
48 func RFC3339Timestamp(t time.Time) string {
49         return t.Format(RFC3339NanoFixed)
50 }
51
52 // Write prepends a timestamp to each line of the input data and
53 // appends to the internal buffer. Each line is also logged to
54 // tl.Immediate, if tl.Immediate is not nil.
55 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
56         tl.Mutex.Lock()
57         defer tl.Mutex.Unlock()
58
59         if tl.buf == nil {
60                 tl.buf = &bytes.Buffer{}
61         }
62
63         now := tl.Timestamper(time.Now().UTC())
64         sc := bufio.NewScanner(bytes.NewBuffer(p))
65         for err == nil && sc.Scan() {
66                 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
67                 if tl.Immediate != nil {
68                         tl.Immediate.Print(out[:len(out)-1])
69                 }
70                 _, err = io.WriteString(tl.buf, out)
71         }
72         if err == nil {
73                 err = sc.Err()
74                 if err == nil {
75                         n = len(p)
76                 }
77         }
78         return
79 }
80
81 // Periodically check the current buffer; if not empty, send it on the
82 // channel to the goWriter goroutine.
83 func (tl *ThrottledLogger) flusher() {
84         ticker := time.NewTicker(time.Second)
85         defer ticker.Stop()
86         for stopping := false; !stopping; {
87                 select {
88                 case <-tl.stopping:
89                         // flush tl.buf, then exit the loop
90                         stopping = true
91                 case <-ticker.C:
92                 }
93
94                 var ready *bytes.Buffer
95
96                 tl.Mutex.Lock()
97                 ready, tl.buf = tl.buf, nil
98                 tl.Mutex.Unlock()
99
100                 if ready != nil && ready.Len() > 0 {
101                         tl.writer.Write(ready.Bytes())
102                 }
103         }
104         close(tl.stopped)
105 }
106
107 // Close the flusher goroutine and wait for it to complete, then close the
108 // underlying Writer.
109 func (tl *ThrottledLogger) Close() error {
110         select {
111         case <-tl.stopping:
112                 // already stopped
113         default:
114                 close(tl.stopping)
115         }
116         <-tl.stopped
117         return tl.writer.Close()
118 }
119
120 const (
121         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
122         MaxLogLine = 1 << 12
123 )
124
125 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
126 // line splitting.
127 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
128         reader := bufio.NewReaderSize(in, MaxLogLine)
129         var prefix string
130         for {
131                 line, isPrefix, err := reader.ReadLine()
132                 if err == io.EOF {
133                         break
134                 } else if err != nil {
135                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
136                 }
137                 var suffix string
138                 if isPrefix {
139                         suffix = "[...]\n"
140                 }
141
142                 if prefix == "" && suffix == "" {
143                         writer.Write(line)
144                 } else {
145                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
146                 }
147
148                 // Set up prefix for following line
149                 if isPrefix {
150                         prefix = "[...]"
151                 } else {
152                         prefix = ""
153                 }
154         }
155         done <- true
156 }
157
158 // NewThrottledLogger creates a new thottled logger that
159 // (a) prepends timestamps to each line
160 // (b) batches log messages and only calls the underlying Writer at most once
161 // per second.
162 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
163         tl := &ThrottledLogger{}
164         tl.stopping = make(chan struct{})
165         tl.stopped = make(chan struct{})
166         tl.writer = writer
167         tl.Logger = log.New(tl, "", 0)
168         tl.Timestamper = RFC3339Timestamp
169         go tl.flusher()
170         return tl
171 }
172
173 // ArvLogWriter is an io.WriteCloser that processes each write by
174 // writing it through to another io.WriteCloser (typically a
175 // CollectionFileWriter) and creating an Arvados log entry.
176 type ArvLogWriter struct {
177         ArvClient     IArvadosClient
178         UUID          string
179         loggingStream string
180         writeCloser   io.WriteCloser
181
182         // for rate limiting
183         bytesLogged                  int64
184         logThrottleResetTime         time.Time
185         logThrottleLinesSoFar        int64
186         logThrottleBytesSoFar        int64
187         logThrottleBytesSkipped      int64
188         logThrottleIsOpen            bool
189         logThrottlePartialLineLastAt time.Time
190         logThrottleFirstPartialLine  bool
191         stderrBufToFlush             bytes.Buffer
192         stderrFlushedAt              time.Time
193 }
194
195 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
196         // Write to the next writer in the chain (a file in Keep)
197         var err1 error
198         if arvlog.writeCloser != nil {
199                 _, err1 = arvlog.writeCloser.Write(p)
200         }
201
202         // write to API after checking rate limit
203         crunchLogThrottlePeriod, err2 := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
204         crunchLogBytesPerEvent, err2 := arvlog.ArvClient.Discovery("crunchLogBytesPerEvent")
205         crunchLogSecondsBetweenEvents, err2 := arvlog.ArvClient.Discovery("crunchLogSecondsBetweenEvents")
206         if err2 != nil {
207                 return 0, fmt.Errorf("%s ; %s", err1, err2)
208         }
209
210         now := time.Now()
211         bytesWritten := 0
212
213         if now.After(arvlog.logThrottleResetTime) {
214                 // It has been more than throttle_period seconds since the last
215                 // checkpoint; so reset the throttle
216                 if arvlog.logThrottleBytesSkipped > 0 {
217                         arvlog.stderrBufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(time.Now()), arvlog.logThrottleBytesSkipped))
218                 }
219
220                 arvlog.logThrottleResetTime = time.Now().Add(time.Duration(int(crunchLogThrottlePeriod.(float64))))
221                 arvlog.logThrottleBytesSoFar = 0
222                 arvlog.logThrottleLinesSoFar = 0
223                 arvlog.logThrottleBytesSkipped = 0
224                 arvlog.logThrottleIsOpen = true
225                 arvlog.logThrottlePartialLineLastAt = time.Time{}
226                 arvlog.logThrottleFirstPartialLine = true
227         }
228
229         lines := bytes.Split(p, []byte("\n"))
230
231         for _, line := range lines {
232                 // Short circuit the counting code if we're just going to throw
233                 // away the data anyway.
234                 if !arvlog.logThrottleIsOpen {
235                         arvlog.logThrottleBytesSkipped += int64(len(line))
236                         continue
237                 } else if len(line) == 0 {
238                         continue
239                 }
240
241                 // check rateLimit
242                 _, msg, err2 := arvlog.rateLimit(line)
243                 if err2 != nil {
244                         return 0, fmt.Errorf("%s ; %s", err1, err2)
245                 }
246                 arvlog.stderrBufToFlush.WriteString(string(msg) + "\n")
247         }
248
249         if arvlog.stderrBufToFlush.Len() > int(crunchLogBytesPerEvent.(float64)) ||
250                 (time.Now().Sub(arvlog.stderrFlushedAt) >= time.Duration(int64(crunchLogSecondsBetweenEvents.(float64)))) {
251                 // write to API
252                 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
253                         "object_uuid": arvlog.UUID,
254                         "event_type":  arvlog.loggingStream,
255                         "properties":  map[string]string{"text": arvlog.stderrBufToFlush.String()}}}
256                 err2 := arvlog.ArvClient.Create("logs", lr, nil)
257
258                 bytesWritten = arvlog.stderrBufToFlush.Len()
259                 arvlog.stderrBufToFlush = bytes.Buffer{}
260                 arvlog.stderrFlushedAt = time.Now()
261
262                 if err1 != nil || err2 != nil {
263                         return 0, fmt.Errorf("%s ; %s", err1, err2)
264                 }
265         }
266
267         return bytesWritten, nil
268 }
269
270 // Close the underlying writer
271 func (arvlog *ArvLogWriter) Close() (err error) {
272         if arvlog.writeCloser != nil {
273                 err = arvlog.writeCloser.Close()
274                 arvlog.writeCloser = nil
275         }
276         return err
277 }
278
279 var lineRegexp = regexp.MustCompile(`^\S+ \S+ \d+ \d+ stderr (.*)`)
280
281 // Test for hard cap on total output and for log throttling. Returns whether
282 // the log line should go to output or not. Returns message if limit exceeded.
283 func (arvlog *ArvLogWriter) rateLimit(line []byte) (bool, []byte, error) {
284         message := ""
285         lineSize := int64(len(line))
286         partialLine := false
287         skipCounts := false
288         if arvlog.logThrottleIsOpen {
289                 matches := lineRegexp.FindStringSubmatch(string(line))
290
291                 crunchLogPartialLineThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogPartialLineThrottlePeriod")
292                 crunchLimitLogBytesPerJob, err := arvlog.ArvClient.Discovery("crunchLimitLogBytesPerJob")
293                 crunchLogThrottleBytes, err := arvlog.ArvClient.Discovery("crunchLogThrottleBytes")
294                 crunchLogThrottlePeriod, err := arvlog.ArvClient.Discovery("crunchLogThrottlePeriod")
295                 crunchLogThrottleLines, err := arvlog.ArvClient.Discovery("crunchLogThrottleLines")
296                 if err != nil {
297                         return false, []byte(""), err
298                 }
299
300                 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
301                         partialLine = true
302
303                         if time.Now().After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(int(crunchLogPartialLineThrottlePeriod.(float64))))) {
304                                 arvlog.logThrottlePartialLineLastAt = time.Now()
305                         } else {
306                                 skipCounts = true
307                         }
308                 }
309
310                 if !skipCounts {
311                         arvlog.logThrottleLinesSoFar += 1
312                         arvlog.logThrottleBytesSoFar += lineSize
313                         arvlog.bytesLogged += lineSize
314                 }
315
316                 if arvlog.bytesLogged > int64(crunchLimitLogBytesPerJob.(float64)) {
317                         message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(time.Now()), int(crunchLimitLogBytesPerJob.(float64)))
318                         arvlog.logThrottleResetTime = time.Now().Add(time.Duration(365 * 24 * time.Hour))
319                         arvlog.logThrottleIsOpen = false
320                 } else if arvlog.logThrottleBytesSoFar > int64(crunchLogThrottleBytes.(float64)) {
321                         remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
322                         message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now()), crunchLogThrottleBytes, int(crunchLogThrottlePeriod.(float64)), remainingTime)
323                         arvlog.logThrottleIsOpen = false
324                 } else if arvlog.logThrottleLinesSoFar > int64(crunchLogThrottleLines.(float64)) {
325                         remainingTime := arvlog.logThrottleResetTime.Sub(time.Now())
326                         message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(time.Now()), crunchLogThrottleLines, int(crunchLogThrottlePeriod.(float64)), remainingTime)
327                         arvlog.logThrottleIsOpen = false
328                 } else if partialLine && arvlog.logThrottleFirstPartialLine {
329                         arvlog.logThrottleFirstPartialLine = false
330                         message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(time.Now()), crunchLogPartialLineThrottlePeriod)
331                 }
332         }
333
334         if !arvlog.logThrottleIsOpen {
335                 // Don't log anything if any limit has been exceeded. Just count lossage.
336                 arvlog.logThrottleBytesSkipped += lineSize
337         }
338
339         if message != "" {
340                 // Yes, write to logs, but use our "rate exceeded" message
341                 // instead of the log message that exceeded the limit.
342                 message += " A complete log is still being written to Keep, and will be available when the job finishes.\n"
343                 return true, []byte(message), nil
344         } else if partialLine {
345                 return false, line, nil
346         } else {
347                 return arvlog.logThrottleIsOpen, line, nil
348         }
349 }