8019: need to check and write log if things are closing down
[arvados.git] / services / crunch-run / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "io"
8         "log"
9         "regexp"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15 )
16
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
20
21 // Logging plumbing:
22 //
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
26 //
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
29
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer.  It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
34         *log.Logger
35         buf *bytes.Buffer
36         sync.Mutex
37         writer  io.WriteCloser
38         flush   chan struct{}
39         stopped chan struct{}
40         Timestamper
41         Immediate    *log.Logger
42         pendingFlush bool
43 }
44
45 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
46 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
47
48 // RFC3339Timestamp formats t as RFC3339NanoFixed.
49 func RFC3339Timestamp(t time.Time) string {
50         return t.Format(RFC3339NanoFixed)
51 }
52
53 // Write prepends a timestamp to each line of the input data and
54 // appends to the internal buffer. Each line is also logged to
55 // tl.Immediate, if tl.Immediate is not nil.
56 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
57         tl.Mutex.Lock()
58         defer tl.Mutex.Unlock()
59
60         if tl.buf == nil {
61                 tl.buf = &bytes.Buffer{}
62         }
63
64         now := tl.Timestamper(time.Now().UTC())
65         sc := bufio.NewScanner(bytes.NewBuffer(p))
66         for err == nil && sc.Scan() {
67                 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
68                 if tl.Immediate != nil {
69                         tl.Immediate.Print(out[:len(out)-1])
70                 }
71                 _, err = io.WriteString(tl.buf, out)
72         }
73         if err == nil {
74                 err = sc.Err()
75                 if err == nil {
76                         n = len(p)
77                 }
78         }
79
80         if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
81                 // Non-blocking send.  Try send a flush if it is ready to
82                 // accept it.  Otherwise do nothing because a flush is already
83                 // pending.
84                 select {
85                 case tl.flush <- struct{}{}:
86                 default:
87                 }
88         }
89
90         return
91 }
92
93 // Periodically check the current buffer; if not empty, send it on the
94 // channel to the goWriter goroutine.
95 func (tl *ThrottledLogger) flusher() {
96         ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
97         defer ticker.Stop()
98         for stopping := false; !stopping; {
99                 select {
100                 case _, open := <-tl.flush:
101                         // if !open, will flush tl.buf and exit the loop
102                         stopping = !open
103                 case <-ticker.C:
104                 }
105
106                 var ready *bytes.Buffer
107
108                 tl.Mutex.Lock()
109                 ready, tl.buf = tl.buf, &bytes.Buffer{}
110                 tl.Mutex.Unlock()
111
112                 if ready != nil && ready.Len() > 0 {
113                         tl.writer.Write(ready.Bytes())
114                 }
115         }
116         close(tl.stopped)
117 }
118
119 // Close the flusher goroutine and wait for it to complete, then close the
120 // underlying Writer.
121 func (tl *ThrottledLogger) Close() error {
122         select {
123         case <-tl.flush:
124                 // already stopped
125         default:
126                 close(tl.flush)
127         }
128         <-tl.stopped
129         return tl.writer.Close()
130 }
131
132 const (
133         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
134         MaxLogLine = 1 << 12
135 )
136
137 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
138 // line splitting.
139 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
140         reader := bufio.NewReaderSize(in, MaxLogLine)
141         var prefix string
142         for {
143                 line, isPrefix, err := reader.ReadLine()
144                 if err == io.EOF {
145                         break
146                 } else if err != nil {
147                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
148                 }
149                 var suffix string
150                 if isPrefix {
151                         suffix = "[...]\n"
152                 }
153
154                 if prefix == "" && suffix == "" {
155                         writer.Write(line)
156                 } else {
157                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
158                 }
159
160                 // Set up prefix for following line
161                 if isPrefix {
162                         prefix = "[...]"
163                 } else {
164                         prefix = ""
165                 }
166         }
167         done <- true
168 }
169
170 // NewThrottledLogger creates a new thottled logger that
171 // (a) prepends timestamps to each line
172 // (b) batches log messages and only calls the underlying Writer
173 //  at most once per "crunchLogSecondsBetweenEvents" seconds.
174 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
175         tl := &ThrottledLogger{}
176         tl.flush = make(chan struct{}, 1)
177         tl.stopped = make(chan struct{})
178         tl.writer = writer
179         tl.Logger = log.New(tl, "", 0)
180         tl.Timestamper = RFC3339Timestamp
181         go tl.flusher()
182         return tl
183 }
184
185 // Log throttling rate limiting config parameters
186 var crunchLimitLogBytesPerJob int64 = 67108864
187 var crunchLogThrottleBytes int64 = 65536
188 var crunchLogThrottlePeriod time.Duration = time.Second * 60
189 var crunchLogThrottleLines int64 = 1024
190 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
191 var crunchLogBytesPerEvent int64 = 4096
192 var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
193
194 // ArvLogWriter is an io.WriteCloser that processes each write by
195 // writing it through to another io.WriteCloser (typically a
196 // CollectionFileWriter) and creating an Arvados log entry.
197 type ArvLogWriter struct {
198         ArvClient     IArvadosClient
199         UUID          string
200         loggingStream string
201         writeCloser   io.WriteCloser
202
203         // for rate limiting
204         bytesLogged                  int64
205         logThrottleResetTime         time.Time
206         logThrottleLinesSoFar        int64
207         logThrottleBytesSoFar        int64
208         logThrottleBytesSkipped      int64
209         logThrottleIsOpen            bool
210         logThrottlePartialLineNextAt time.Time
211         logThrottleFirstPartialLine  bool
212         bufToFlush                   bytes.Buffer
213         bufFlushedAt                 time.Time
214         closing                      bool
215 }
216
217 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
218         // Write to the next writer in the chain (a file in Keep)
219         var err1 error
220         if arvlog.writeCloser != nil {
221                 _, err1 = arvlog.writeCloser.Write(p)
222         }
223
224         // write to API after checking rate limit
225         now := time.Now()
226         bytesWritten := 0
227
228         if now.After(arvlog.logThrottleResetTime) {
229                 // It has been more than throttle_period seconds since the last
230                 // checkpoint; so reset the throttle
231                 if arvlog.logThrottleBytesSkipped > 0 {
232                         arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
233                 }
234
235                 arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
236                 arvlog.logThrottleBytesSoFar = 0
237                 arvlog.logThrottleLinesSoFar = 0
238                 arvlog.logThrottleBytesSkipped = 0
239                 arvlog.logThrottleIsOpen = true
240         }
241
242         lines := bytes.Split(p, []byte("\n"))
243
244         for _, line := range lines {
245                 // Short circuit the counting code if we're just going to throw
246                 // away the data anyway.
247                 if !arvlog.logThrottleIsOpen {
248                         arvlog.logThrottleBytesSkipped += int64(len(line))
249                         continue
250                 } else if len(line) == 0 {
251                         continue
252                 }
253
254                 // check rateLimit
255                 logOpen, msg := arvlog.rateLimit(line, now)
256                 if logOpen {
257                         arvlog.bufToFlush.WriteString(string(msg) + "\n")
258                 }
259         }
260
261         if (int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
262                 (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
263                 arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
264                 // write to API
265                 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
266                         "object_uuid": arvlog.UUID,
267                         "event_type":  arvlog.loggingStream,
268                         "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
269                 err2 := arvlog.ArvClient.Create("logs", lr, nil)
270
271                 bytesWritten = arvlog.bufToFlush.Len()
272                 arvlog.bufToFlush = bytes.Buffer{}
273                 arvlog.bufFlushedAt = now
274
275                 if err1 != nil || err2 != nil {
276                         return 0, fmt.Errorf("%s ; %s", err1, err2)
277                 }
278         }
279
280         return bytesWritten, nil
281 }
282
283 // Close the underlying writer
284 func (arvlog *ArvLogWriter) Close() (err error) {
285         arvlog.closing = true
286         arvlog.Write([]byte{})
287         if arvlog.writeCloser != nil {
288                 err = arvlog.writeCloser.Close()
289                 arvlog.writeCloser = nil
290         }
291         return err
292 }
293
294 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
295
296 // Test for hard cap on total output and for log throttling. Returns whether
297 // the log line should go to output or not. Returns message if limit exceeded.
298 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
299         message := ""
300         lineSize := int64(len(line))
301
302         if arvlog.logThrottleIsOpen {
303                 matches := lineRegexp.FindStringSubmatch(string(line))
304
305                 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
306                         // This is a partial line.
307
308                         if arvlog.logThrottleFirstPartialLine {
309                                 // Partial should be suppressed.  First time this is happening for this line so provide a message instead.
310                                 arvlog.logThrottleFirstPartialLine = false
311                                 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
312                                 arvlog.logThrottleBytesSkipped += lineSize
313                                 return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
314                                         RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
315                         } else if now.After(arvlog.logThrottlePartialLineNextAt) {
316                                 // The throttle period has passed.  Update timestamp and let it through.
317                                 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
318                         } else {
319                                 // Suppress line.
320                                 arvlog.logThrottleBytesSkipped += lineSize
321                                 return false, line
322                         }
323                 } else {
324                         // Not a partial line so reset.
325                         arvlog.logThrottlePartialLineNextAt = time.Time{}
326                         arvlog.logThrottleFirstPartialLine = true
327                 }
328
329                 arvlog.bytesLogged += lineSize
330                 arvlog.logThrottleBytesSoFar += lineSize
331                 arvlog.logThrottleLinesSoFar += 1
332
333                 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
334                         message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
335                                 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
336                         arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
337                         arvlog.logThrottleIsOpen = false
338
339                 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
340                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
341                         message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
342                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
343                         arvlog.logThrottleIsOpen = false
344
345                 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
346                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
347                         message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
348                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
349                         arvlog.logThrottleIsOpen = false
350
351                 }
352         }
353
354         if !arvlog.logThrottleIsOpen {
355                 // Don't log anything if any limit has been exceeded. Just count lossage.
356                 arvlog.logThrottleBytesSkipped += lineSize
357         }
358
359         if message != "" {
360                 // Yes, write to logs, but use our "rate exceeded" message
361                 // instead of the log message that exceeded the limit.
362                 message += " A complete log is still being written to Keep, and will be available when the job finishes."
363                 return true, []byte(message)
364         } else {
365                 return arvlog.logThrottleIsOpen, line
366         }
367 }
368
369 // load the rate limit discovery config paramters
370 func loadLogThrottleParams(clnt IArvadosClient) {
371         param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
372         if err == nil {
373                 crunchLimitLogBytesPerJob = int64(param.(float64))
374         }
375
376         param, err = clnt.Discovery("crunchLogThrottleBytes")
377         if err == nil {
378                 crunchLogThrottleBytes = int64(param.(float64))
379         }
380
381         param, err = clnt.Discovery("crunchLogThrottlePeriod")
382         if err == nil {
383                 crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
384         }
385
386         param, err = clnt.Discovery("crunchLogThrottleLines")
387         if err == nil {
388                 crunchLogThrottleLines = int64(param.(float64))
389         }
390
391         param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
392         if err == nil {
393                 crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
394         }
395
396         param, err = clnt.Discovery("crunchLogBytesPerEvent")
397         if err == nil {
398                 crunchLogBytesPerEvent = int64(param.(float64))
399         }
400
401         param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
402         if err == nil {
403                 crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
404         }
405 }