8019: load log throttling config params during NewContainerRunner
[arvados.git] / services / crunch-run / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "io"
8         "log"
9         "regexp"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15 )
16
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
20
21 // Logging plumbing:
22 //
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
26 //
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
29
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer.  It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
34         *log.Logger
35         buf *bytes.Buffer
36         sync.Mutex
37         writer  io.WriteCloser
38         flush   chan struct{}
39         stopped chan struct{}
40         Timestamper
41         Immediate *log.Logger
42 }
43
44 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
45 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
46
47 // RFC3339Timestamp formats t as RFC3339NanoFixed.
48 func RFC3339Timestamp(t time.Time) string {
49         return t.Format(RFC3339NanoFixed)
50 }
51
52 // Write prepends a timestamp to each line of the input data and
53 // appends to the internal buffer. Each line is also logged to
54 // tl.Immediate, if tl.Immediate is not nil.
55 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
56         tl.Mutex.Lock()
57         defer tl.Mutex.Unlock()
58
59         if tl.buf == nil {
60                 tl.buf = &bytes.Buffer{}
61         }
62
63         now := tl.Timestamper(time.Now().UTC())
64         sc := bufio.NewScanner(bytes.NewBuffer(p))
65         for err == nil && sc.Scan() {
66                 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
67                 if tl.Immediate != nil {
68                         tl.Immediate.Print(out[:len(out)-1])
69                 }
70                 _, err = io.WriteString(tl.buf, out)
71         }
72         if err == nil {
73                 err = sc.Err()
74                 if err == nil {
75                         n = len(p)
76                 }
77         }
78         return
79 }
80
81 // Periodically check the current buffer; if not empty, send it on the
82 // channel to the goWriter goroutine.
83 func (tl *ThrottledLogger) flusher() {
84         ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
85         defer ticker.Stop()
86         for stopping := false; !stopping; {
87                 select {
88                 case _, open := <-tl.flush:
89                         // if !open, flush tl.buf and exit the loop
90                         stopping = !open
91                 case <-ticker.C:
92                 }
93
94                 var ready *bytes.Buffer
95
96                 tl.Mutex.Lock()
97                 ready, tl.buf = tl.buf, nil
98                 tl.Mutex.Unlock()
99
100                 if ready != nil && ready.Len() > 0 {
101                         tl.writer.Write(ready.Bytes())
102                 }
103         }
104         close(tl.stopped)
105 }
106
107 // Close the flusher goroutine and wait for it to complete, then close the
108 // underlying Writer.
109 func (tl *ThrottledLogger) Close() error {
110         select {
111         case <-tl.flush:
112                 // already stopped
113         default:
114                 close(tl.flush)
115         }
116         <-tl.stopped
117         return tl.writer.Close()
118 }
119
120 const (
121         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
122         MaxLogLine = 1 << 12
123 )
124
125 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
126 // line splitting.
127 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
128         reader := bufio.NewReaderSize(in, MaxLogLine)
129         var prefix string
130         for {
131                 line, isPrefix, err := reader.ReadLine()
132                 if err == io.EOF {
133                         break
134                 } else if err != nil {
135                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
136                 }
137                 var suffix string
138                 if isPrefix {
139                         suffix = "[...]\n"
140                 }
141
142                 if prefix == "" && suffix == "" {
143                         writer.Write(line)
144                 } else {
145                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
146                 }
147
148                 // Set up prefix for following line
149                 if isPrefix {
150                         prefix = "[...]"
151                 } else {
152                         prefix = ""
153                 }
154         }
155         done <- true
156 }
157
158 // NewThrottledLogger creates a new thottled logger that
159 // (a) prepends timestamps to each line
160 // (b) batches log messages and only calls the underlying Writer at most once
161 // per second.
162 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
163         tl := &ThrottledLogger{}
164         tl.flush = make(chan struct{}, 1)
165         tl.stopped = make(chan struct{})
166         tl.writer = writer
167         tl.Logger = log.New(tl, "", 0)
168         tl.Timestamper = RFC3339Timestamp
169         go tl.flusher()
170         return tl
171 }
172
173 // Log throttling rate limiting config parameters
174 var crunchLimitLogBytesPerJob int64
175 var crunchLogThrottleBytes int64
176 var crunchLogThrottlePeriod int
177 var crunchLogThrottleLines int64
178 var crunchLogPartialLineThrottlePeriod int
179 var crunchLogBytesPerEvent int64
180 var crunchLogSecondsBetweenEvents int
181
182 // ArvLogWriter is an io.WriteCloser that processes each write by
183 // writing it through to another io.WriteCloser (typically a
184 // CollectionFileWriter) and creating an Arvados log entry.
185 type ArvLogWriter struct {
186         ArvClient     IArvadosClient
187         UUID          string
188         loggingStream string
189         writeCloser   io.WriteCloser
190
191         // for rate limiting
192         bytesLogged                  int64
193         logThrottleResetTime         time.Time
194         logThrottleLinesSoFar        int64
195         logThrottleBytesSoFar        int64
196         logThrottleBytesSkipped      int64
197         logThrottleIsOpen            bool
198         logThrottlePartialLineLastAt time.Time
199         logThrottleFirstPartialLine  bool
200         bufToFlush                   bytes.Buffer
201         bufFlushedAt                 time.Time
202 }
203
204 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
205         // Write to the next writer in the chain (a file in Keep)
206         var err1 error
207         if arvlog.writeCloser != nil {
208                 _, err1 = arvlog.writeCloser.Write(p)
209         }
210
211         // write to API after checking rate limit
212         now := time.Now()
213         bytesWritten := 0
214
215         if now.After(arvlog.logThrottleResetTime) {
216                 // It has been more than throttle_period seconds since the last
217                 // checkpoint; so reset the throttle
218                 if arvlog.logThrottleBytesSkipped > 0 {
219                         arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
220                 }
221
222                 arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(crunchLogThrottlePeriod))
223                 arvlog.logThrottleBytesSoFar = 0
224                 arvlog.logThrottleLinesSoFar = 0
225                 arvlog.logThrottleBytesSkipped = 0
226                 arvlog.logThrottleIsOpen = true
227                 arvlog.logThrottlePartialLineLastAt = time.Time{}
228                 arvlog.logThrottleFirstPartialLine = true
229         }
230
231         lines := bytes.Split(p, []byte("\n"))
232
233         for _, line := range lines {
234                 // Short circuit the counting code if we're just going to throw
235                 // away the data anyway.
236                 if !arvlog.logThrottleIsOpen {
237                         arvlog.logThrottleBytesSkipped += int64(len(line))
238                         continue
239                 } else if len(line) == 0 {
240                         continue
241                 }
242
243                 // check rateLimit
244                 logOpen, msg := arvlog.rateLimit(line, now)
245                 arvlog.bufToFlush.WriteString(string(msg) + "\n")
246                 arvlog.logThrottleIsOpen = logOpen
247         }
248
249         if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
250                 (now.Sub(arvlog.bufFlushedAt) >= time.Duration(crunchLogSecondsBetweenEvents)) {
251                 // write to API
252                 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
253                         "object_uuid": arvlog.UUID,
254                         "event_type":  arvlog.loggingStream,
255                         "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
256                 err2 := arvlog.ArvClient.Create("logs", lr, nil)
257
258                 bytesWritten = arvlog.bufToFlush.Len()
259                 arvlog.bufToFlush = bytes.Buffer{}
260                 arvlog.bufFlushedAt = now
261
262                 if err1 != nil || err2 != nil {
263                         return 0, fmt.Errorf("%s ; %s", err1, err2)
264                 }
265         }
266
267         return bytesWritten, nil
268 }
269
270 // Close the underlying writer
271 func (arvlog *ArvLogWriter) Close() (err error) {
272         if arvlog.writeCloser != nil {
273                 err = arvlog.writeCloser.Close()
274                 arvlog.writeCloser = nil
275         }
276         return err
277 }
278
279 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
280
281 // Test for hard cap on total output and for log throttling. Returns whether
282 // the log line should go to output or not. Returns message if limit exceeded.
283 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
284         message := ""
285         lineSize := int64(len(line))
286         partialLine := false
287         skipCounts := false
288
289         if arvlog.logThrottleIsOpen {
290                 matches := lineRegexp.FindStringSubmatch(string(line))
291
292                 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
293                         partialLine = true
294
295                         if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
296                                 arvlog.logThrottlePartialLineLastAt = now
297                         } else {
298                                 skipCounts = true
299                         }
300                 }
301
302                 if !skipCounts {
303                         arvlog.logThrottleLinesSoFar += 1
304                         arvlog.logThrottleBytesSoFar += lineSize
305                         arvlog.bytesLogged += lineSize
306                 }
307
308                 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
309                         message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
310                                 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
311                         arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
312                         arvlog.logThrottleIsOpen = false
313
314                 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
315                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
316                         message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
317                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod, remainingTime/time.Second)
318                         arvlog.logThrottleIsOpen = false
319
320                 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
321                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
322                         message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
323                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod, remainingTime/time.Second)
324                         arvlog.logThrottleIsOpen = false
325
326                 } else if partialLine && arvlog.logThrottleFirstPartialLine {
327                         arvlog.logThrottleFirstPartialLine = false
328                         message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
329                                 RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod)
330
331                 }
332         }
333
334         if !arvlog.logThrottleIsOpen {
335                 // Don't log anything if any limit has been exceeded. Just count lossage.
336                 arvlog.logThrottleBytesSkipped += lineSize
337         }
338
339         if message != "" {
340                 // Yes, write to logs, but use our "rate exceeded" message
341                 // instead of the log message that exceeded the limit.
342                 message += " A complete log is still being written to Keep, and will be available when the job finishes."
343                 return true, []byte(message)
344         } else if partialLine {
345                 return false, line
346         } else {
347                 return arvlog.logThrottleIsOpen, line
348         }
349 }
350
351 // load the rate limit discovery config paramters
352 func loadLogThrottleParams(clnt IArvadosClient) {
353         param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
354         if err != nil {
355                 crunchLimitLogBytesPerJob = 67108864
356         } else {
357                 crunchLimitLogBytesPerJob = int64(param.(float64))
358         }
359
360         param, err = clnt.Discovery("crunchLogThrottleBytes")
361         if err != nil {
362                 crunchLogThrottleBytes = 65536
363         } else {
364                 crunchLogThrottleBytes = int64(param.(float64))
365         }
366
367         param, err = clnt.Discovery("crunchLogThrottlePeriod")
368         if err != nil {
369                 crunchLogThrottlePeriod = 60
370         } else {
371                 crunchLogThrottlePeriod = int(param.(float64))
372         }
373
374         param, err = clnt.Discovery("crunchLogThrottleLines")
375         if err != nil {
376                 crunchLogThrottleLines = 1024
377         } else {
378                 crunchLogThrottleLines = int64(param.(float64))
379         }
380
381         param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
382         if err != nil {
383                 crunchLogPartialLineThrottlePeriod = 5
384         } else {
385                 crunchLogPartialLineThrottlePeriod = int(param.(float64))
386         }
387
388         param, err = clnt.Discovery("crunchLogBytesPerEvent")
389         if err != nil {
390                 crunchLogBytesPerEvent = 4096
391         } else {
392                 crunchLogBytesPerEvent = int64(param.(float64))
393         }
394
395         param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
396         if err != nil {
397                 crunchLogSecondsBetweenEvents = 1
398         } else {
399                 crunchLogSecondsBetweenEvents = int(param.(float64))
400         }
401 }