8019: partial line throttling etc
[arvados.git] / services / crunch-run / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "io"
8         "log"
9         "regexp"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15 )
16
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
20
21 // Logging plumbing:
22 //
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
26 //
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
29
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer.  It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
34         *log.Logger
35         buf *bytes.Buffer
36         sync.Mutex
37         writer  io.WriteCloser
38         flush   chan struct{}
39         stopped chan struct{}
40         Timestamper
41         Immediate    *log.Logger
42         pendingFlush bool
43 }
44
45 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
46 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
47
48 // RFC3339Timestamp formats t as RFC3339NanoFixed.
49 func RFC3339Timestamp(t time.Time) string {
50         return t.Format(RFC3339NanoFixed)
51 }
52
53 // Write prepends a timestamp to each line of the input data and
54 // appends to the internal buffer. Each line is also logged to
55 // tl.Immediate, if tl.Immediate is not nil.
56 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
57         tl.Mutex.Lock()
58         defer tl.Mutex.Unlock()
59
60         if tl.buf == nil {
61                 tl.buf = &bytes.Buffer{}
62         }
63
64         //if int64(tl.buf.Len()) >= crunchLogBytesPerEvent && !tl.pendingFlush {
65         //      tl.pendingFlush = true
66         //      tl.flush <- struct{}{}
67         //}
68
69         now := tl.Timestamper(time.Now().UTC())
70         sc := bufio.NewScanner(bytes.NewBuffer(p))
71         for err == nil && sc.Scan() {
72                 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
73                 if tl.Immediate != nil {
74                         tl.Immediate.Print(out[:len(out)-1])
75                 }
76                 _, err = io.WriteString(tl.buf, out)
77         }
78         if err == nil {
79                 err = sc.Err()
80                 if err == nil {
81                         n = len(p)
82                 }
83         }
84         return
85 }
86
87 // Periodically check the current buffer; if not empty, send it on the
88 // channel to the goWriter goroutine.
89 func (tl *ThrottledLogger) flusher() {
90         ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
91         defer ticker.Stop()
92         for stopping := false; !stopping; {
93                 select {
94                 case _, open := <-tl.flush:
95                         // if !open, flush tl.buf and exit the loop
96                         stopping = !open
97                 case <-ticker.C:
98                 }
99
100                 var ready *bytes.Buffer
101
102                 tl.Mutex.Lock()
103                 ready, tl.buf = tl.buf, &bytes.Buffer{}
104                 tl.pendingFlush = false
105                 tl.Mutex.Unlock()
106
107                 if ready != nil && ready.Len() > 0 {
108                         tl.writer.Write(ready.Bytes())
109                 }
110         }
111         close(tl.stopped)
112 }
113
114 // Close the flusher goroutine and wait for it to complete, then close the
115 // underlying Writer.
116 func (tl *ThrottledLogger) Close() error {
117         select {
118         case <-tl.flush:
119                 // already stopped
120         default:
121                 close(tl.flush)
122         }
123         <-tl.stopped
124         return tl.writer.Close()
125 }
126
127 const (
128         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
129         MaxLogLine = 1 << 12
130 )
131
132 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
133 // line splitting.
134 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
135         reader := bufio.NewReaderSize(in, MaxLogLine)
136         var prefix string
137         for {
138                 line, isPrefix, err := reader.ReadLine()
139                 if err == io.EOF {
140                         break
141                 } else if err != nil {
142                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
143                 }
144                 var suffix string
145                 if isPrefix {
146                         suffix = "[...]\n"
147                 }
148
149                 if prefix == "" && suffix == "" {
150                         writer.Write(line)
151                 } else {
152                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
153                 }
154
155                 // Set up prefix for following line
156                 if isPrefix {
157                         prefix = "[...]"
158                 } else {
159                         prefix = ""
160                 }
161         }
162         done <- true
163 }
164
165 // NewThrottledLogger creates a new thottled logger that
166 // (a) prepends timestamps to each line
167 // (b) batches log messages and only calls the underlying Writer
168 //  at most once per "crunchLogSecondsBetweenEvents" seconds.
169 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
170         tl := &ThrottledLogger{}
171         tl.flush = make(chan struct{}, 1)
172         tl.stopped = make(chan struct{})
173         tl.writer = writer
174         tl.Logger = log.New(tl, "", 0)
175         tl.Timestamper = RFC3339Timestamp
176         go tl.flusher()
177         return tl
178 }
179
180 // Log throttling rate limiting config parameters
181 var crunchLimitLogBytesPerJob int64
182 var crunchLogThrottleBytes int64
183 var crunchLogThrottlePeriod int
184 var crunchLogThrottleLines int64
185 var crunchLogPartialLineThrottlePeriod int
186 var crunchLogBytesPerEvent int64
187 var crunchLogSecondsBetweenEvents int
188
189 // ArvLogWriter is an io.WriteCloser that processes each write by
190 // writing it through to another io.WriteCloser (typically a
191 // CollectionFileWriter) and creating an Arvados log entry.
192 type ArvLogWriter struct {
193         ArvClient     IArvadosClient
194         UUID          string
195         loggingStream string
196         writeCloser   io.WriteCloser
197
198         // for rate limiting
199         bytesLogged                  int64
200         logThrottleResetTime         time.Time
201         logThrottleLinesSoFar        int64
202         logThrottleBytesSoFar        int64
203         logThrottleBytesSkipped      int64
204         logThrottleIsOpen            bool
205         logThrottlePartialLineLastAt time.Time
206         logThrottleFirstPartialLine  bool
207         bufToFlush                   bytes.Buffer
208         bufFlushedAt                 time.Time
209 }
210
211 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
212         // Write to the next writer in the chain (a file in Keep)
213         var err1 error
214         if arvlog.writeCloser != nil {
215                 _, err1 = arvlog.writeCloser.Write(p)
216         }
217
218         // write to API after checking rate limit
219         now := time.Now()
220         bytesWritten := 0
221
222         if now.After(arvlog.logThrottleResetTime) {
223                 // It has been more than throttle_period seconds since the last
224                 // checkpoint; so reset the throttle
225                 if arvlog.logThrottleBytesSkipped > 0 {
226                         arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
227                 }
228
229                 arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(crunchLogThrottlePeriod))
230                 arvlog.logThrottleBytesSoFar = 0
231                 arvlog.logThrottleLinesSoFar = 0
232                 arvlog.logThrottleBytesSkipped = 0
233                 arvlog.logThrottleIsOpen = true
234                 arvlog.logThrottlePartialLineLastAt = time.Time{}
235                 arvlog.logThrottleFirstPartialLine = true
236         }
237
238         lines := bytes.Split(p, []byte("\n"))
239
240         for _, line := range lines {
241                 // Short circuit the counting code if we're just going to throw
242                 // away the data anyway.
243                 if !arvlog.logThrottleIsOpen {
244                         arvlog.logThrottleBytesSkipped += int64(len(line))
245                         continue
246                 } else if len(line) == 0 {
247                         continue
248                 }
249
250                 // check rateLimit
251                 logOpen, msg := arvlog.rateLimit(line, now)
252                 if logOpen {
253                         arvlog.bufToFlush.WriteString(string(msg) + "\n")
254                 }
255         }
256
257         if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
258                 (now.Sub(arvlog.bufFlushedAt) >= time.Duration(crunchLogSecondsBetweenEvents)) {
259                 // write to API
260                 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
261                         "object_uuid": arvlog.UUID,
262                         "event_type":  arvlog.loggingStream,
263                         "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
264                 err2 := arvlog.ArvClient.Create("logs", lr, nil)
265
266                 bytesWritten = arvlog.bufToFlush.Len()
267                 arvlog.bufToFlush = bytes.Buffer{}
268                 arvlog.bufFlushedAt = now
269
270                 if err1 != nil || err2 != nil {
271                         return 0, fmt.Errorf("%s ; %s", err1, err2)
272                 }
273         }
274
275         return bytesWritten, nil
276 }
277
278 // Close the underlying writer
279 func (arvlog *ArvLogWriter) Close() (err error) {
280         if arvlog.writeCloser != nil {
281                 err = arvlog.writeCloser.Close()
282                 arvlog.writeCloser = nil
283         }
284         return err
285 }
286
287 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
288
289 // Test for hard cap on total output and for log throttling. Returns whether
290 // the log line should go to output or not. Returns message if limit exceeded.
291 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
292         message := ""
293         lineSize := int64(len(line))
294         partialLine := false
295
296         if arvlog.logThrottleIsOpen {
297                 matches := lineRegexp.FindStringSubmatch(string(line))
298
299                 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
300                         partialLine = true
301                         if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
302                                 arvlog.logThrottlePartialLineLastAt = now
303                                 arvlog.logThrottleFirstPartialLine = true
304                         }
305                 }
306
307                 arvlog.logThrottleLinesSoFar += 1
308                 arvlog.logThrottleBytesSoFar += lineSize
309                 arvlog.bytesLogged += lineSize
310
311                 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
312                         message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
313                                 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
314                         arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
315                         arvlog.logThrottleIsOpen = false
316
317                 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
318                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
319                         message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
320                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod, remainingTime/time.Second)
321                         arvlog.logThrottleIsOpen = false
322
323                 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
324                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
325                         message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
326                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod, remainingTime/time.Second)
327                         arvlog.logThrottleIsOpen = false
328
329                 } else if partialLine && arvlog.logThrottleFirstPartialLine {
330                         arvlog.logThrottleFirstPartialLine = false
331                         message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
332                                 RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod)
333
334                 }
335         }
336
337         if !arvlog.logThrottleIsOpen {
338                 // Don't log anything if any limit has been exceeded. Just count lossage.
339                 arvlog.logThrottleBytesSkipped += lineSize
340         }
341
342         if message != "" {
343                 // Yes, write to logs, but use our "rate exceeded" message
344                 // instead of the log message that exceeded the limit.
345                 message += " A complete log is still being written to Keep, and will be available when the job finishes."
346                 return true, []byte(message)
347         } else {
348                 return arvlog.logThrottleIsOpen, line
349         }
350 }
351
352 // load the rate limit discovery config paramters
353 func loadLogThrottleParams(clnt IArvadosClient) {
354         param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
355         if err != nil {
356                 crunchLimitLogBytesPerJob = 67108864
357         } else {
358                 crunchLimitLogBytesPerJob = int64(param.(float64))
359         }
360
361         param, err = clnt.Discovery("crunchLogThrottleBytes")
362         if err != nil {
363                 crunchLogThrottleBytes = 65536
364         } else {
365                 crunchLogThrottleBytes = int64(param.(float64))
366         }
367
368         param, err = clnt.Discovery("crunchLogThrottlePeriod")
369         if err != nil {
370                 crunchLogThrottlePeriod = 60
371         } else {
372                 crunchLogThrottlePeriod = int(param.(float64))
373         }
374
375         param, err = clnt.Discovery("crunchLogThrottleLines")
376         if err != nil {
377                 crunchLogThrottleLines = 1024
378         } else {
379                 crunchLogThrottleLines = int64(param.(float64))
380         }
381
382         param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
383         if err != nil {
384                 crunchLogPartialLineThrottlePeriod = 5
385         } else {
386                 crunchLogPartialLineThrottlePeriod = int(param.(float64))
387         }
388
389         param, err = clnt.Discovery("crunchLogBytesPerEvent")
390         if err != nil {
391                 crunchLogBytesPerEvent = 4096
392         } else {
393                 crunchLogBytesPerEvent = int64(param.(float64))
394         }
395
396         param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
397         if err != nil {
398                 crunchLogSecondsBetweenEvents = 1
399         } else {
400                 crunchLogSecondsBetweenEvents = int(param.(float64))
401         }
402 }