7816: CopyReaderToLog renamed to ReadWriteLines. Use Writer instead of Logger
[arvados.git] / services / crunch-run / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "io"
9         "log"
10         "sync"
11         "time"
12 )
13
14 // Timestamper is the signature for a function that takes a timestamp and
15 // return a formated string value.
16 type Timestamper func(t time.Time) string
17
18 // Logging plumbing:
19 //
20 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
21 // ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
22 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
23 //
24 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
25 // data from the stdout/stderr Reader and send to the Logger.
26
27 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
28 // write, and periodically flushes to a downstream writer.  It supports the
29 // "Logger" and "WriteCloser" interfaces.
30 type ThrottledLogger struct {
31         *log.Logger
32         buf *bytes.Buffer
33         sync.Mutex
34         writer      io.WriteCloser
35         stop        bool
36         flusherDone chan bool
37         Timestamper
38 }
39
40 // RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision,
41 // because the RFC3339Nano format isn't fixed width.
42 const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00"
43
44 // RFC3339Timestamp return a RFC3339 formatted timestamp using RFC3339Fixed
45 func RFC3339Timestamp(now time.Time) string {
46         return now.Format(RFC3339Fixed)
47 }
48
49 // Write to the internal buffer.  Prepend a timestamp to each line of the input
50 // data.
51 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
52         tl.Mutex.Lock()
53         if tl.buf == nil {
54                 tl.buf = &bytes.Buffer{}
55         }
56         defer tl.Mutex.Unlock()
57
58         now := tl.Timestamper(time.Now().UTC())
59         sc := bufio.NewScanner(bytes.NewBuffer(p))
60         for sc.Scan() {
61                 _, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text())
62         }
63         return len(p), err
64 }
65
66 // Periodically check the current buffer; if not empty, send it on the
67 // channel to the goWriter goroutine.
68 func (tl *ThrottledLogger) flusher() {
69         bufchan := make(chan *bytes.Buffer)
70         bufterm := make(chan bool)
71
72         // Use a separate goroutine for the actual write so that the writes are
73         // actually initiated closer every 1s instead of every
74         // 1s + (time to it takes to write).
75         go goWriter(tl.writer, bufchan, bufterm)
76         for {
77                 if !tl.stop {
78                         time.Sleep(1 * time.Second)
79                 }
80                 tl.Mutex.Lock()
81                 if tl.buf != nil && tl.buf.Len() > 0 {
82                         oldbuf := tl.buf
83                         tl.buf = nil
84                         tl.Mutex.Unlock()
85                         bufchan <- oldbuf
86                 } else if tl.stop {
87                         tl.Mutex.Unlock()
88                         break
89                 } else {
90                         tl.Mutex.Unlock()
91                 }
92         }
93         close(bufchan)
94         <-bufterm
95         tl.flusherDone <- true
96 }
97
98 // Receive buffers from a channel and send to the underlying Writer
99 func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
100         for b := range c {
101                 writer.Write(b.Bytes())
102         }
103         t <- true
104 }
105
106 // Close the flusher goroutine and wait for it to complete, then close the
107 // underlying Writer.
108 func (tl *ThrottledLogger) Close() error {
109         tl.stop = true
110         <-tl.flusherDone
111         return tl.writer.Close()
112 }
113
114 const (
115         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
116         MaxLogLine = 1 << 12
117 )
118
119 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
120 // line splitting.
121 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
122         reader := bufio.NewReaderSize(in, MaxLogLine)
123         var prefix string
124         for {
125                 line, isPrefix, err := reader.ReadLine()
126                 if err == io.EOF {
127                         break
128                 } else if err != nil {
129                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
130                 }
131                 var suffix string
132                 if isPrefix {
133                         suffix = "[...]\n"
134                 }
135
136                 if prefix == "" && suffix == "" {
137                         writer.Write(line)
138                 } else {
139                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
140                 }
141
142                 // Set up prefix for following line
143                 if isPrefix {
144                         prefix = "[...]"
145                 } else {
146                         prefix = ""
147                 }
148         }
149         done <- true
150 }
151
152 // NewThrottledLogger creates a new thottled logger that
153 // (a) prepends timestamps to each line
154 // (b) batches log messages and only calls the underlying Writer at most once
155 // per second.
156 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
157         alw := &ThrottledLogger{}
158         alw.flusherDone = make(chan bool)
159         alw.writer = writer
160         alw.Logger = log.New(alw, "", 0)
161         alw.Timestamper = RFC3339Timestamp
162         go alw.flusher()
163         return alw
164 }
165
166 // ArvLogWriter implements a writer that writes to each of a WriteCloser
167 // (typically CollectionFileWriter) and creates an API server log entry.
168 type ArvLogWriter struct {
169         ArvClient     IArvadosClient
170         UUID          string
171         loggingStream string
172         writeCloser   io.WriteCloser
173 }
174
175 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
176         // Write to the next writer in the chain (a file in Keep)
177         var err1 error
178         if arvlog.writeCloser != nil {
179                 _, err1 = arvlog.writeCloser.Write(p)
180         }
181
182         // write to API
183         lr := arvadosclient.Dict{"object_uuid": arvlog.UUID,
184                 "event_type": arvlog.loggingStream,
185                 "properties": map[string]string{"text": string(p)}}
186         err2 := arvlog.ArvClient.Create("logs", lr, nil)
187
188         if err1 != nil || err2 != nil {
189                 return 0, fmt.Errorf("%s ; %s", err1, err2)
190         }
191         return len(p), nil
192 }
193
194 // Close the underlying writer
195 func (arvlog *ArvLogWriter) Close() (err error) {
196         if arvlog.writeCloser != nil {
197                 err = arvlog.writeCloser.Close()
198                 arvlog.writeCloser = nil
199         }
200         return err
201 }