8016: Fix timestamp format (add missing nanoseconds), and use it in tests.
[arvados.git] / services / crunch-run / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "io"
9         "log"
10         "sync"
11         "time"
12 )
13
14 // Timestamper is the signature for a function that takes a timestamp and
15 // return a formated string value.
16 type Timestamper func(t time.Time) string
17
18 // Logging plumbing:
19 //
20 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
21 // ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
22 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
23 //
24 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
25 // data from the stdout/stderr Reader and send to the Logger.
26
27 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
28 // write, and periodically flushes to a downstream writer.  It supports the
29 // "Logger" and "WriteCloser" interfaces.
30 type ThrottledLogger struct {
31         *log.Logger
32         buf *bytes.Buffer
33         sync.Mutex
34         writer      io.WriteCloser
35         stop        bool
36         flusherDone chan bool
37         Timestamper
38         Immediate *log.Logger
39 }
40
41 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
42 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
43
44 // RFC3339Timestamp formats t as RFC3339NanoFixed.
45 func RFC3339Timestamp(t time.Time) string {
46         return t.Format(RFC3339NanoFixed)
47 }
48
49 // Write to the internal buffer.  Prepend a timestamp to each line of the input
50 // data.
51 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
52         tl.Mutex.Lock()
53         if tl.buf == nil {
54                 tl.buf = &bytes.Buffer{}
55         }
56         defer tl.Mutex.Unlock()
57
58         now := tl.Timestamper(time.Now().UTC())
59         sc := bufio.NewScanner(bytes.NewBuffer(p))
60         for sc.Scan() {
61                 _, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text())
62                 if tl.Immediate != nil {
63                         tl.Immediate.Printf("%s %s\n", now, sc.Text())
64                 }
65         }
66         return len(p), err
67 }
68
69 // Periodically check the current buffer; if not empty, send it on the
70 // channel to the goWriter goroutine.
71 func (tl *ThrottledLogger) flusher() {
72         bufchan := make(chan *bytes.Buffer)
73         bufterm := make(chan bool)
74
75         // Use a separate goroutine for the actual write so that the writes are
76         // actually initiated closer every 1s instead of every
77         // 1s + (time to it takes to write).
78         go goWriter(tl.writer, bufchan, bufterm)
79         for {
80                 if !tl.stop {
81                         time.Sleep(1 * time.Second)
82                 }
83                 tl.Mutex.Lock()
84                 if tl.buf != nil && tl.buf.Len() > 0 {
85                         oldbuf := tl.buf
86                         tl.buf = nil
87                         tl.Mutex.Unlock()
88                         bufchan <- oldbuf
89                 } else if tl.stop {
90                         tl.Mutex.Unlock()
91                         break
92                 } else {
93                         tl.Mutex.Unlock()
94                 }
95         }
96         close(bufchan)
97         <-bufterm
98         tl.flusherDone <- true
99 }
100
101 // Receive buffers from a channel and send to the underlying Writer
102 func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
103         for b := range c {
104                 writer.Write(b.Bytes())
105         }
106         t <- true
107 }
108
109 // Close the flusher goroutine and wait for it to complete, then close the
110 // underlying Writer.
111 func (tl *ThrottledLogger) Close() error {
112         tl.stop = true
113         <-tl.flusherDone
114         return tl.writer.Close()
115 }
116
117 const (
118         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
119         MaxLogLine = 1 << 12
120 )
121
122 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
123 // line splitting.
124 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
125         reader := bufio.NewReaderSize(in, MaxLogLine)
126         var prefix string
127         for {
128                 line, isPrefix, err := reader.ReadLine()
129                 if err == io.EOF {
130                         break
131                 } else if err != nil {
132                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
133                 }
134                 var suffix string
135                 if isPrefix {
136                         suffix = "[...]\n"
137                 }
138
139                 if prefix == "" && suffix == "" {
140                         writer.Write(line)
141                 } else {
142                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
143                 }
144
145                 // Set up prefix for following line
146                 if isPrefix {
147                         prefix = "[...]"
148                 } else {
149                         prefix = ""
150                 }
151         }
152         done <- true
153 }
154
155 // NewThrottledLogger creates a new thottled logger that
156 // (a) prepends timestamps to each line
157 // (b) batches log messages and only calls the underlying Writer at most once
158 // per second.
159 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
160         alw := &ThrottledLogger{}
161         alw.flusherDone = make(chan bool)
162         alw.writer = writer
163         alw.Logger = log.New(alw, "", 0)
164         alw.Timestamper = RFC3339Timestamp
165         go alw.flusher()
166         return alw
167 }
168
169 // ArvLogWriter implements a writer that writes to each of a WriteCloser
170 // (typically CollectionFileWriter) and creates an API server log entry.
171 type ArvLogWriter struct {
172         ArvClient     IArvadosClient
173         UUID          string
174         loggingStream string
175         writeCloser   io.WriteCloser
176 }
177
178 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
179         // Write to the next writer in the chain (a file in Keep)
180         var err1 error
181         if arvlog.writeCloser != nil {
182                 _, err1 = arvlog.writeCloser.Write(p)
183         }
184
185         // write to API
186         lr := arvadosclient.Dict{"log": arvadosclient.Dict{
187                 "object_uuid": arvlog.UUID,
188                 "event_type":  arvlog.loggingStream,
189                 "properties":  map[string]string{"text": string(p)}}}
190         err2 := arvlog.ArvClient.Create("logs", lr, nil)
191
192         if err1 != nil || err2 != nil {
193                 return 0, fmt.Errorf("%s ; %s", err1, err2)
194         }
195         return len(p), nil
196 }
197
198 // Close the underlying writer
199 func (arvlog *ArvLogWriter) Close() (err error) {
200         if arvlog.writeCloser != nil {
201                 err = arvlog.writeCloser.Close()
202                 arvlog.writeCloser = nil
203         }
204         return err
205 }