11644: Add volume replication level to /mounts response.
[arvados.git] / services / crunch-run / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "io"
8         "log"
9         "regexp"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15 )
16
17 // Timestamper is the signature for a function that takes a timestamp and
18 // return a formated string value.
19 type Timestamper func(t time.Time) string
20
21 // Logging plumbing:
22 //
23 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
24 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
25 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
26 //
27 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
28 // data from the stdout/stderr Reader and send to the Logger.
29
30 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
31 // write, and periodically flushes to a downstream writer.  It supports the
32 // "Logger" and "WriteCloser" interfaces.
33 type ThrottledLogger struct {
34         *log.Logger
35         buf *bytes.Buffer
36         sync.Mutex
37         writer   io.WriteCloser
38         flush    chan struct{}
39         stopped  chan struct{}
40         stopping chan struct{}
41         Timestamper
42         Immediate    *log.Logger
43         pendingFlush bool
44 }
45
46 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
47 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
48
49 // RFC3339Timestamp formats t as RFC3339NanoFixed.
50 func RFC3339Timestamp(t time.Time) string {
51         return t.Format(RFC3339NanoFixed)
52 }
53
54 // Write prepends a timestamp to each line of the input data and
55 // appends to the internal buffer. Each line is also logged to
56 // tl.Immediate, if tl.Immediate is not nil.
57 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
58         tl.Mutex.Lock()
59         defer tl.Mutex.Unlock()
60
61         if tl.buf == nil {
62                 tl.buf = &bytes.Buffer{}
63         }
64
65         now := tl.Timestamper(time.Now().UTC())
66         sc := bufio.NewScanner(bytes.NewBuffer(p))
67         for err == nil && sc.Scan() {
68                 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
69                 if tl.Immediate != nil {
70                         tl.Immediate.Print(out[:len(out)-1])
71                 }
72                 _, err = io.WriteString(tl.buf, out)
73         }
74         if err == nil {
75                 err = sc.Err()
76                 if err == nil {
77                         n = len(p)
78                 }
79         }
80
81         if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
82                 // Non-blocking send.  Try send a flush if it is ready to
83                 // accept it.  Otherwise do nothing because a flush is already
84                 // pending.
85                 select {
86                 case tl.flush <- struct{}{}:
87                 default:
88                 }
89         }
90
91         return
92 }
93
94 // Periodically check the current buffer; if not empty, send it on the
95 // channel to the goWriter goroutine.
96 func (tl *ThrottledLogger) flusher() {
97         ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
98         defer ticker.Stop()
99         for stopping := false; !stopping; {
100                 select {
101                 case <-tl.stopping:
102                         // flush tl.buf and exit the loop
103                         stopping = true
104                 case <-tl.flush:
105                 case <-ticker.C:
106                 }
107
108                 var ready *bytes.Buffer
109
110                 tl.Mutex.Lock()
111                 ready, tl.buf = tl.buf, &bytes.Buffer{}
112                 tl.Mutex.Unlock()
113
114                 if ready != nil && ready.Len() > 0 {
115                         tl.writer.Write(ready.Bytes())
116                 }
117         }
118         close(tl.stopped)
119 }
120
121 // Close the flusher goroutine and wait for it to complete, then close the
122 // underlying Writer.
123 func (tl *ThrottledLogger) Close() error {
124         select {
125         case <-tl.stopping:
126                 // already stopped
127         default:
128                 close(tl.stopping)
129         }
130         <-tl.stopped
131         return tl.writer.Close()
132 }
133
134 const (
135         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
136         MaxLogLine = 1 << 12
137 )
138
139 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
140 // line splitting.
141 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
142         reader := bufio.NewReaderSize(in, MaxLogLine)
143         var prefix string
144         for {
145                 line, isPrefix, err := reader.ReadLine()
146                 if err == io.EOF {
147                         break
148                 } else if err != nil {
149                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
150                 }
151                 var suffix string
152                 if isPrefix {
153                         suffix = "[...]\n"
154                 }
155
156                 if prefix == "" && suffix == "" {
157                         writer.Write(line)
158                 } else {
159                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
160                 }
161
162                 // Set up prefix for following line
163                 if isPrefix {
164                         prefix = "[...]"
165                 } else {
166                         prefix = ""
167                 }
168         }
169         done <- true
170 }
171
172 // NewThrottledLogger creates a new thottled logger that
173 // (a) prepends timestamps to each line
174 // (b) batches log messages and only calls the underlying Writer
175 //  at most once per "crunchLogSecondsBetweenEvents" seconds.
176 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
177         tl := &ThrottledLogger{}
178         tl.flush = make(chan struct{}, 1)
179         tl.stopped = make(chan struct{})
180         tl.stopping = make(chan struct{})
181         tl.writer = writer
182         tl.Logger = log.New(tl, "", 0)
183         tl.Timestamper = RFC3339Timestamp
184         go tl.flusher()
185         return tl
186 }
187
188 // Log throttling rate limiting config parameters
189 var crunchLimitLogBytesPerJob int64 = 67108864
190 var crunchLogThrottleBytes int64 = 65536
191 var crunchLogThrottlePeriod time.Duration = time.Second * 60
192 var crunchLogThrottleLines int64 = 1024
193 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
194 var crunchLogBytesPerEvent int64 = 4096
195 var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
196
197 // ArvLogWriter is an io.WriteCloser that processes each write by
198 // writing it through to another io.WriteCloser (typically a
199 // CollectionFileWriter) and creating an Arvados log entry.
200 type ArvLogWriter struct {
201         ArvClient     IArvadosClient
202         UUID          string
203         loggingStream string
204         writeCloser   io.WriteCloser
205
206         // for rate limiting
207         bytesLogged                  int64
208         logThrottleResetTime         time.Time
209         logThrottleLinesSoFar        int64
210         logThrottleBytesSoFar        int64
211         logThrottleBytesSkipped      int64
212         logThrottleIsOpen            bool
213         logThrottlePartialLineNextAt time.Time
214         logThrottleFirstPartialLine  bool
215         bufToFlush                   bytes.Buffer
216         bufFlushedAt                 time.Time
217         closing                      bool
218 }
219
220 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
221         // Write to the next writer in the chain (a file in Keep)
222         var err1 error
223         if arvlog.writeCloser != nil {
224                 _, err1 = arvlog.writeCloser.Write(p)
225         }
226
227         // write to API after checking rate limit
228         now := time.Now()
229         bytesWritten := 0
230
231         if now.After(arvlog.logThrottleResetTime) {
232                 // It has been more than throttle_period seconds since the last
233                 // checkpoint; so reset the throttle
234                 if arvlog.logThrottleBytesSkipped > 0 {
235                         arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
236                 }
237
238                 arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
239                 arvlog.logThrottleBytesSoFar = 0
240                 arvlog.logThrottleLinesSoFar = 0
241                 arvlog.logThrottleBytesSkipped = 0
242                 arvlog.logThrottleIsOpen = true
243         }
244
245         lines := bytes.Split(p, []byte("\n"))
246
247         for _, line := range lines {
248                 // Short circuit the counting code if we're just going to throw
249                 // away the data anyway.
250                 if !arvlog.logThrottleIsOpen {
251                         arvlog.logThrottleBytesSkipped += int64(len(line))
252                         continue
253                 } else if len(line) == 0 {
254                         continue
255                 }
256
257                 // check rateLimit
258                 logOpen, msg := arvlog.rateLimit(line, now)
259                 if logOpen {
260                         arvlog.bufToFlush.WriteString(string(msg) + "\n")
261                 }
262         }
263
264         if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
265                 (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
266                 arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
267                 // write to API
268                 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
269                         "object_uuid": arvlog.UUID,
270                         "event_type":  arvlog.loggingStream,
271                         "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
272                 err2 := arvlog.ArvClient.Create("logs", lr, nil)
273
274                 bytesWritten = arvlog.bufToFlush.Len()
275                 arvlog.bufToFlush = bytes.Buffer{}
276                 arvlog.bufFlushedAt = now
277
278                 if err1 != nil || err2 != nil {
279                         return 0, fmt.Errorf("%s ; %s", err1, err2)
280                 }
281         }
282
283         return bytesWritten, nil
284 }
285
286 // Close the underlying writer
287 func (arvlog *ArvLogWriter) Close() (err error) {
288         arvlog.closing = true
289         arvlog.Write([]byte{})
290         if arvlog.writeCloser != nil {
291                 err = arvlog.writeCloser.Close()
292                 arvlog.writeCloser = nil
293         }
294         return err
295 }
296
297 var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
298
299 // Test for hard cap on total output and for log throttling. Returns whether
300 // the log line should go to output or not. Returns message if limit exceeded.
301 func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
302         message := ""
303         lineSize := int64(len(line))
304
305         if arvlog.logThrottleIsOpen {
306                 matches := lineRegexp.FindStringSubmatch(string(line))
307
308                 if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
309                         // This is a partial line.
310
311                         if arvlog.logThrottleFirstPartialLine {
312                                 // Partial should be suppressed.  First time this is happening for this line so provide a message instead.
313                                 arvlog.logThrottleFirstPartialLine = false
314                                 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
315                                 arvlog.logThrottleBytesSkipped += lineSize
316                                 return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
317                                         RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
318                         } else if now.After(arvlog.logThrottlePartialLineNextAt) {
319                                 // The throttle period has passed.  Update timestamp and let it through.
320                                 arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
321                         } else {
322                                 // Suppress line.
323                                 arvlog.logThrottleBytesSkipped += lineSize
324                                 return false, line
325                         }
326                 } else {
327                         // Not a partial line so reset.
328                         arvlog.logThrottlePartialLineNextAt = time.Time{}
329                         arvlog.logThrottleFirstPartialLine = true
330                 }
331
332                 arvlog.bytesLogged += lineSize
333                 arvlog.logThrottleBytesSoFar += lineSize
334                 arvlog.logThrottleLinesSoFar += 1
335
336                 if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
337                         message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
338                                 RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
339                         arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
340                         arvlog.logThrottleIsOpen = false
341
342                 } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
343                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
344                         message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
345                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
346                         arvlog.logThrottleIsOpen = false
347
348                 } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
349                         remainingTime := arvlog.logThrottleResetTime.Sub(now)
350                         message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
351                                 RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
352                         arvlog.logThrottleIsOpen = false
353
354                 }
355         }
356
357         if !arvlog.logThrottleIsOpen {
358                 // Don't log anything if any limit has been exceeded. Just count lossage.
359                 arvlog.logThrottleBytesSkipped += lineSize
360         }
361
362         if message != "" {
363                 // Yes, write to logs, but use our "rate exceeded" message
364                 // instead of the log message that exceeded the limit.
365                 message += " A complete log is still being written to Keep, and will be available when the job finishes."
366                 return true, []byte(message)
367         } else {
368                 return arvlog.logThrottleIsOpen, line
369         }
370 }
371
372 // load the rate limit discovery config paramters
373 func loadLogThrottleParams(clnt IArvadosClient) {
374         param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
375         if err == nil {
376                 crunchLimitLogBytesPerJob = int64(param.(float64))
377         }
378
379         param, err = clnt.Discovery("crunchLogThrottleBytes")
380         if err == nil {
381                 crunchLogThrottleBytes = int64(param.(float64))
382         }
383
384         param, err = clnt.Discovery("crunchLogThrottlePeriod")
385         if err == nil {
386                 crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
387         }
388
389         param, err = clnt.Discovery("crunchLogThrottleLines")
390         if err == nil {
391                 crunchLogThrottleLines = int64(param.(float64))
392         }
393
394         param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
395         if err == nil {
396                 crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
397         }
398
399         param, err = clnt.Discovery("crunchLogBytesPerEvent")
400         if err == nil {
401                 crunchLogBytesPerEvent = int64(param.(float64))
402         }
403
404         param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
405         if err == nil {
406                 crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
407         }
408 }