6e32d723e87d1e031dcaf94241358fc9c6e3a8e4
[arvados.git] / services / crunch-run / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "io"
8         "log"
9         "regexp"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15 )
16
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
20
21 // Logging plumbing:
22 //
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
26 //
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
29
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer.  It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
34         *log.Logger
35         buf *bytes.Buffer
36         sync.Mutex
37         writer   io.WriteCloser
38         stopping chan struct{}
39         stopped  chan struct{}
40         Timestamper
41         Immediate *log.Logger
42 }
43
44 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
45 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
46
47 // RFC3339Timestamp formats t as RFC3339NanoFixed.
48 func RFC3339Timestamp(t time.Time) string {
49         return t.Format(RFC3339NanoFixed)
50 }
51
52 // Write prepends a timestamp to each line of the input data and
53 // appends to the internal buffer. Each line is also logged to
54 // tl.Immediate, if tl.Immediate is not nil.
55 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
56         tl.Mutex.Lock()
57         defer tl.Mutex.Unlock()
58
59         if tl.buf == nil {
60                 tl.buf = &bytes.Buffer{}
61         }
62
63         now := tl.Timestamper(time.Now().UTC())
64         sc := bufio.NewScanner(bytes.NewBuffer(p))
65         for err == nil && sc.Scan() {
66                 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
67                 if tl.Immediate != nil {
68                         tl.Immediate.Print(out[:len(out)-1])
69                 }
70                 _, err = io.WriteString(tl.buf, out)
71         }
72         if err == nil {
73                 err = sc.Err()
74                 if err == nil {
75                         n = len(p)
76                 }
77         }
78         return
79 }
80
81 // Periodically check the current buffer; if not empty, send it on the
82 // channel to the goWriter goroutine.
83 func (tl *ThrottledLogger) flusher() {
84         ticker := time.NewTicker(time.Second)
85         defer ticker.Stop()
86         for stopping := false; !stopping; {
87                 select {
88                 case <-tl.stopping:
89                         // flush tl.buf, then exit the loop
90                         stopping = true
91                 case <-ticker.C:
92                 }
93
94                 var ready *bytes.Buffer
95
96                 tl.Mutex.Lock()
97                 ready, tl.buf = tl.buf, nil
98                 tl.Mutex.Unlock()
99
100                 if ready != nil && ready.Len() > 0 {
101                         tl.writer.Write(ready.Bytes())
102                 }
103         }
104         close(tl.stopped)
105 }
106
107 // Close the flusher goroutine and wait for it to complete, then close the
108 // underlying Writer.
109 func (tl *ThrottledLogger) Close() error {
110         select {
111         case <-tl.stopping:
112                 // already stopped
113         default:
114                 close(tl.stopping)
115         }
116         <-tl.stopped
117         return tl.writer.Close()
118 }
119
120 const (
121         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
122         MaxLogLine = 1 << 12
123 )
124
125 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
126 // line splitting.
127 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
128         reader := bufio.NewReaderSize(in, MaxLogLine)
129         var prefix string
130         for {
131                 line, isPrefix, err := reader.ReadLine()
132                 if err == io.EOF {
133                         break
134                 } else if err != nil {
135                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
136                 }
137                 var suffix string
138                 if isPrefix {
139                         suffix = "[...]\n"
140                 }
141
142                 if prefix == "" && suffix == "" {
143                         writer.Write(line)
144                 } else {
145                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
146                 }
147
148                 // Set up prefix for following line
149                 if isPrefix {
150                         prefix = "[...]"
151                 } else {
152                         prefix = ""
153                 }
154         }
155         done <- true
156 }
157
158 // NewThrottledLogger creates a new thottled logger that
159 // (a) prepends timestamps to each line
160 // (b) batches log messages and only calls the underlying Writer at most once
161 // per second.
162 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
163         tl := &ThrottledLogger{}
164         tl.stopping = make(chan struct{})
165         tl.stopped = make(chan struct{})
166         tl.writer = writer
167         tl.Logger = log.New(tl, "", 0)
168         tl.Timestamper = RFC3339Timestamp
169         go tl.flusher()
170         return tl
171 }
172
173 // ArvLogWriter is an io.WriteCloser that processes each write by
174 // writing it through to another io.WriteCloser (typically a
175 // CollectionFileWriter) and creating an Arvados log entry.
176 type ArvLogWriter struct {
177         ArvClient     IArvadosClient
178         UUID          string
179         loggingStream string
180         writeCloser   io.WriteCloser
181
182         // for rate limiting
183         bytesLogged                  int64
184         logThrottleResetTime         time.Time
185         logThrottleLinesSoFar        int64
186         logThrottleBytesSoFar        int64
187         logThrottleBytesSkipped      int64
188         logThrottleIsOpen            bool
189         logThrottlePartialLineLastAt time.Time
190         logThrottleFirstPartialLine  bool
191         bufToFlush                   bytes.Buffer
192         bufFlushedAt                 time.Time
193
194         // rate limiting config parameters
195         crunchLimitLogBytesPerJob          int64
196         crunchLogThrottleBytes             int64
197         crunchLogThrottlePeriod            int
198         crunchLogThrottleLines             int64
199         crunchLogPartialLineThrottlePeriod int
200         crunchLogBytesPerEvent             int64
201         crunchLogSecondsBetweenEvents      int
202 }
203
204 // NewArvLogWriter creates new ArvLogWriter and loads the rate limiting config params
205 func NewArvLogWriter(clnt IArvadosClient, uuid string, ls string, wc io.WriteCloser) *ArvLogWriter {
206         w := &ArvLogWriter{ArvClient: clnt, UUID: uuid, loggingStream: ls, writeCloser: wc}
207
208         // load the rate limit discovery config paramters
209         param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
210         if err != nil {
211                 w.crunchLimitLogBytesPerJob = 67108864
212         } else {
213                 w.crunchLimitLogBytesPerJob = int64(param.(float64))
214         }
215
216         param, err = clnt.Discovery("crunchLogThrottleBytes")
217         if err != nil {
218                 w.crunchLogThrottleBytes = 65536
219         } else {
220                 w.crunchLogThrottleBytes = int64(param.(float64))
221         }
222
223         param, err = clnt.Discovery("crunchLogThrottlePeriod")
224         if err != nil {
225                 w.crunchLogThrottlePeriod = 60
226         } else {
227                 w.crunchLogThrottlePeriod = int(param.(float64))
228         }
229
230         param, err = clnt.Discovery("crunchLogThrottleLines")
231         if err != nil {
232                 w.crunchLogThrottleLines = 1024
233         } else {
234                 w.crunchLogThrottleLines = int64(param.(float64))
235         }
236
237         param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
238         if err != nil {
239                 w.crunchLogPartialLineThrottlePeriod = 5
240         } else {
241                 w.crunchLogPartialLineThrottlePeriod = int(param.(float64))
242         }
243
244         param, err = clnt.Discovery("crunchLogBytesPerEvent")
245         if err != nil {
246                 w.crunchLogBytesPerEvent = 4096
247         } else {
248                 w.crunchLogBytesPerEvent = int64(param.(float64))
249         }
250
251         param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
252         if err != nil {
253                 w.crunchLogSecondsBetweenEvents = 1
254         } else {
255                 w.crunchLogSecondsBetweenEvents = int(param.(float64))
256         }
257
258         return w
259 }
260
261 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
262         // Write to the next writer in the chain (a file in Keep)
263         var err1 error
264         if arvlog.writeCloser != nil {
265                 _, err1 = arvlog.writeCloser.Write(p)
266         }
267
268         // write to API after checking rate limit
269         now := time.Now()
270         bytesWritten := 0
271
272         if now.After(arvlog.logThrottleResetTime) {
273                 // It has been more than throttle_period seconds since the last
274                 // checkpoint; so reset the throttle
275                 if arvlog.logThrottleBytesSkipped > 0 {
276                         arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
277                 }
278
279                 arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
280                 arvlog.logThrottleBytesSoFar = 0
281                 arvlog.logThrottleLinesSoFar = 0
282                 arvlog.logThrottleBytesSkipped = 0
283                 arvlog.logThrottleIsOpen = true
284                 arvlog.logThrottlePartialLineLastAt = time.Time{}
285                 arvlog.logThrottleFirstPartialLine = true
286         }
287
288         lines := bytes.Split(p, []byte("\n"))
289
290         for _, line := range lines {
291                 // Short circuit the counting code if we're just going to throw
292                 // away the data anyway.
293                 if !arvlog.logThrottleIsOpen {
294                         arvlog.logThrottleBytesSkipped += int64(len(line))
295                         continue
296                 } else if len(line) == 0 {
297                         continue
298                 }
299
300                 // check rateLimit
301                 logOpen, msg := arvlog.rateLimit(line, now)
302                 arvlog.bufToFlush.WriteString(string(msg) + "\n")
303                 arvlog.logThrottleIsOpen = logOpen
304         }
305
306         if int64(arvlog.bufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
307                 (now.Sub(arvlog.bufFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
308                 // write to API
309                 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
310                         "object_uuid": arvlog.UUID,
311                         "event_type":  arvlog.loggingStream,
312                         "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
313                 err2 := arvlog.ArvClient.Create("logs", lr, nil)
314
315                 bytesWritten = arvlog.bufToFlush.Len()
316                 arvlog.bufToFlush = bytes.Buffer{}
317                 arvlog.bufFlushedAt = now
318
319                 if err1 != nil || err2 != nil {
320                         return 0, fmt.Errorf("%s ; %s", err1, err2)
321                 }
322         }
323
324         return bytesWritten, nil
325 }
326
327 // Close the underlying writer
328 func (arvlog *ArvLogWriter) Close() (err error) {
329         if arvlog.writeCloser != nil {
330                 err = arvlog.writeCloser.Close()
331                 arvlog.writeCloser = nil
332         }
333         return err
334 }
335
336 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
337
338 // Test for hard cap on total output and for log throttling. Returns whether
339 // the log line should go to output or not. Returns message if limit exceeded.
340 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
341         message := ""
342         lineSize := int64(len(line))
343         partialLine := false
344         skipCounts := false
345
346         if arvlog.logThrottleIsOpen {
347                 matches := lineRegexp.FindStringSubmatch(string(line))
348
349                 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
350                         partialLine = true
351
352                         if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
353                                 arvlog.logThrottlePartialLineLastAt = now
354                         } else {
355                                 skipCounts = true
356                         }
357                 }
358
359                 if !skipCounts {
360                         arvlog.logThrottleLinesSoFar += 1
361                         arvlog.logThrottleBytesSoFar += lineSize
362                         arvlog.bytesLogged += lineSize
363                 }
364
365                 if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob {
366                         message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(now.UTC()), arvlog.crunchLimitLogBytesPerJob)
367                         arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
368                         arvlog.logThrottleIsOpen = false
369
370                 } else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes {
371                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
372                         message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
373                         arvlog.logThrottleIsOpen = false
374
375                 } else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines {
376                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
377                         message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
378                         arvlog.logThrottleIsOpen = false
379
380                 } else if partialLine && arvlog.logThrottleFirstPartialLine {
381                         arvlog.logThrottleFirstPartialLine = false
382                         message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
383
384                 }
385         }
386
387         if !arvlog.logThrottleIsOpen {
388                 // Don't log anything if any limit has been exceeded. Just count lossage.
389                 arvlog.logThrottleBytesSkipped += lineSize
390         }
391
392         if message != "" {
393                 // Yes, write to logs, but use our "rate exceeded" message
394                 // instead of the log message that exceeded the limit.
395                 message += " A complete log is still being written to Keep, and will be available when the job finishes."
396                 return true, []byte(message)
397         } else if partialLine {
398                 return false, line
399         } else {
400                 return arvlog.logThrottleIsOpen, line
401         }
402 }