7816: Change ThrottledLogger Stop() to Close(). Choose finalState once so that
[arvados.git] / services / crunch-exec / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "io"
10         "log"
11         "sync"
12         "time"
13 )
14
15 type Timestamper func(t time.Time) string
16
17 // Logging plumbing:
18 //
19 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
20 // ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
21 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
22 //
23 // For stdout/stderr CopyReaderToLog additionally runs as a goroutine to pull
24 // data from the stdout/stderr Reader and send to the Logger.
25
26 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
27 // write, and periodically flushes to a downstream writer.  It supports the
28 // "Logger" and "WriteCloser" interfaces.
29 type ThrottledLogger struct {
30         *log.Logger
31         buf *bytes.Buffer
32         sync.Mutex
33         writer      io.WriteCloser
34         stop        bool
35         flusherDone chan bool
36         Timestamper
37 }
38
39 // Builtin RFC3339Nano format isn't fixed width so
40 // provide our own with microsecond precision (same as API server).
41 const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00"
42
43 func RFC3339Timestamp(now time.Time) string {
44         return now.Format(RFC3339Fixed)
45 }
46
47 // Write to the internal buffer.  Prepend a timestamp to each line of the input
48 // data.
49 func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
50         this.Mutex.Lock()
51         if this.buf == nil {
52                 this.buf = &bytes.Buffer{}
53         }
54         defer this.Mutex.Unlock()
55
56         now := this.Timestamper(time.Now().UTC())
57         sc := bufio.NewScanner(bytes.NewBuffer(p))
58         for sc.Scan() {
59                 _, err = fmt.Fprintf(this.buf, "%s %s\n", now, sc.Text())
60         }
61         return len(p), err
62 }
63
64 // Periodically check the current buffer; if not empty, send it on the
65 // channel to the goWriter goroutine.
66 func (this *ThrottledLogger) flusher() {
67         bufchan := make(chan *bytes.Buffer)
68         bufterm := make(chan bool)
69
70         // Use a separate goroutine for the actual write so that the writes are
71         // actually initiated closer every 1s instead of every
72         // 1s + (time to it takes to write).
73         go goWriter(this.writer, bufchan, bufterm)
74         for {
75                 if !this.stop {
76                         time.Sleep(1 * time.Second)
77                 }
78                 this.Mutex.Lock()
79                 if this.buf != nil && this.buf.Len() > 0 {
80                         oldbuf := this.buf
81                         this.buf = nil
82                         this.Mutex.Unlock()
83                         bufchan <- oldbuf
84                 } else if this.stop {
85                         this.Mutex.Unlock()
86                         break
87                 } else {
88                         this.Mutex.Unlock()
89                 }
90         }
91         close(bufchan)
92         <-bufterm
93         this.flusherDone <- true
94 }
95
96 // Receive buffers from a channel and send to the underlying Writer
97 func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
98         for b := range c {
99                 writer.Write(b.Bytes())
100         }
101         t <- true
102 }
103
104 // Stop the flusher goroutine and wait for it to complete, then close the
105 // underlying Writer.
106 func (this *ThrottledLogger) Close() error {
107         this.stop = true
108         <-this.flusherDone
109         return this.writer.Close()
110 }
111
112 const (
113         MaxLogLine = 1 << 12 // Child stderr lines >4KiB will be split
114 )
115
116 // Goroutine to copy from a reader to a logger, with long line splitting.
117 func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
118         reader := bufio.NewReaderSize(in, MaxLogLine)
119         var prefix string
120         for {
121                 line, isPrefix, err := reader.ReadLine()
122                 if err == io.EOF {
123                         break
124                 } else if err != nil {
125                         logger.Print("error reading container log:", err)
126                 }
127                 var suffix string
128                 if isPrefix {
129                         suffix = "[...]"
130                 }
131                 logger.Print(prefix, string(line), suffix)
132                 // Set up prefix for following line
133                 if isPrefix {
134                         prefix = "[...]"
135                 } else {
136                         prefix = ""
137                 }
138         }
139         done <- true
140 }
141
142 // Create a new thottled logger that
143 // (a) prepends timestamps to each line
144 // (b) batches log messages and only calls the underlying Writer at most once
145 // per second.
146
147 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
148         alw := &ThrottledLogger{}
149         alw.flusherDone = make(chan bool)
150         alw.writer = writer
151         alw.Logger = log.New(alw, "", 0)
152         alw.Timestamper = RFC3339Timestamp
153         go alw.flusher()
154         return alw
155 }
156
157 // Implements a writer that writes to each of a WriteCloser (typically
158 // CollectionFileWriter) and creates an API server log entry.
159 type ArvLogWriter struct {
160         Api           IArvadosClient
161         Uuid          string
162         loggingStream string
163         writeCloser   io.WriteCloser
164 }
165
166 func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
167         // Write to the next writer in the chain (a file in Keep)
168         var err1 error
169         if this.writeCloser != nil {
170                 _, err1 = this.writeCloser.Write(p)
171         }
172
173         // write to API
174         lr := arvadosclient.Dict{"object_uuid": this.Uuid,
175                 "event_type": this.loggingStream,
176                 "properties": map[string]string{"text": string(p)}}
177         err2 := this.Api.Create("logs", lr, nil)
178
179         if err1 != nil || err2 != nil {
180                 return 0, errors.New(fmt.Sprintf("%s ; %s", err1, err2))
181         } else {
182                 return len(p), nil
183         }
184
185 }
186
187 func (this *ArvLogWriter) Close() (err error) {
188         if this.writeCloser != nil {
189                 err = this.writeCloser.Close()
190                 this.writeCloser = nil
191         }
192         return err
193 }