Merge branch '9187-crunchv2-dispatching' closes #9187
[arvados.git] / services / crunch-run / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "io"
9         "log"
10         "sync"
11         "time"
12 )
13
14 // Timestamper is the signature for a function that takes a timestamp and
15 // return a formated string value.
16 type Timestamper func(t time.Time) string
17
18 // Logging plumbing:
19 //
20 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
21 // ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
22 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
23 //
24 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
25 // data from the stdout/stderr Reader and send to the Logger.
26
27 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
28 // write, and periodically flushes to a downstream writer.  It supports the
29 // "Logger" and "WriteCloser" interfaces.
30 type ThrottledLogger struct {
31         *log.Logger
32         buf *bytes.Buffer
33         sync.Mutex
34         writer      io.WriteCloser
35         stop        bool
36         flusherDone chan bool
37         Timestamper
38         Immediate *log.Logger
39 }
40
41 // RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision,
42 // because the RFC3339Nano format isn't fixed width.
43 const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00"
44
45 // RFC3339Timestamp return a RFC3339 formatted timestamp using RFC3339Fixed
46 func RFC3339Timestamp(now time.Time) string {
47         return now.Format(RFC3339Fixed)
48 }
49
50 // Write to the internal buffer.  Prepend a timestamp to each line of the input
51 // data.
52 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
53         tl.Mutex.Lock()
54         if tl.buf == nil {
55                 tl.buf = &bytes.Buffer{}
56         }
57         defer tl.Mutex.Unlock()
58
59         now := tl.Timestamper(time.Now().UTC())
60         sc := bufio.NewScanner(bytes.NewBuffer(p))
61         for sc.Scan() {
62                 _, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text())
63                 if tl.Immediate != nil {
64                         tl.Immediate.Printf("%s %s\n", now, sc.Text())
65                 }
66         }
67         return len(p), err
68 }
69
70 // Periodically check the current buffer; if not empty, send it on the
71 // channel to the goWriter goroutine.
72 func (tl *ThrottledLogger) flusher() {
73         bufchan := make(chan *bytes.Buffer)
74         bufterm := make(chan bool)
75
76         // Use a separate goroutine for the actual write so that the writes are
77         // actually initiated closer every 1s instead of every
78         // 1s + (time to it takes to write).
79         go goWriter(tl.writer, bufchan, bufterm)
80         for {
81                 if !tl.stop {
82                         time.Sleep(1 * time.Second)
83                 }
84                 tl.Mutex.Lock()
85                 if tl.buf != nil && tl.buf.Len() > 0 {
86                         oldbuf := tl.buf
87                         tl.buf = nil
88                         tl.Mutex.Unlock()
89                         bufchan <- oldbuf
90                 } else if tl.stop {
91                         tl.Mutex.Unlock()
92                         break
93                 } else {
94                         tl.Mutex.Unlock()
95                 }
96         }
97         close(bufchan)
98         <-bufterm
99         tl.flusherDone <- true
100 }
101
102 // Receive buffers from a channel and send to the underlying Writer
103 func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
104         for b := range c {
105                 writer.Write(b.Bytes())
106         }
107         t <- true
108 }
109
110 // Close the flusher goroutine and wait for it to complete, then close the
111 // underlying Writer.
112 func (tl *ThrottledLogger) Close() error {
113         tl.stop = true
114         <-tl.flusherDone
115         return tl.writer.Close()
116 }
117
118 const (
119         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
120         MaxLogLine = 1 << 12
121 )
122
123 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
124 // line splitting.
125 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
126         reader := bufio.NewReaderSize(in, MaxLogLine)
127         var prefix string
128         for {
129                 line, isPrefix, err := reader.ReadLine()
130                 if err == io.EOF {
131                         break
132                 } else if err != nil {
133                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
134                 }
135                 var suffix string
136                 if isPrefix {
137                         suffix = "[...]\n"
138                 }
139
140                 if prefix == "" && suffix == "" {
141                         writer.Write(line)
142                 } else {
143                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
144                 }
145
146                 // Set up prefix for following line
147                 if isPrefix {
148                         prefix = "[...]"
149                 } else {
150                         prefix = ""
151                 }
152         }
153         done <- true
154 }
155
156 // NewThrottledLogger creates a new thottled logger that
157 // (a) prepends timestamps to each line
158 // (b) batches log messages and only calls the underlying Writer at most once
159 // per second.
160 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
161         alw := &ThrottledLogger{}
162         alw.flusherDone = make(chan bool)
163         alw.writer = writer
164         alw.Logger = log.New(alw, "", 0)
165         alw.Timestamper = RFC3339Timestamp
166         go alw.flusher()
167         return alw
168 }
169
170 // ArvLogWriter implements a writer that writes to each of a WriteCloser
171 // (typically CollectionFileWriter) and creates an API server log entry.
172 type ArvLogWriter struct {
173         ArvClient     IArvadosClient
174         UUID          string
175         loggingStream string
176         writeCloser   io.WriteCloser
177 }
178
179 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
180         // Write to the next writer in the chain (a file in Keep)
181         var err1 error
182         if arvlog.writeCloser != nil {
183                 _, err1 = arvlog.writeCloser.Write(p)
184         }
185
186         // write to API
187         lr := arvadosclient.Dict{"log": arvadosclient.Dict{
188                 "object_uuid": arvlog.UUID,
189                 "event_type":  arvlog.loggingStream,
190                 "properties":  map[string]string{"text": string(p)}}}
191         err2 := arvlog.ArvClient.Create("logs", lr, nil)
192
193         if err1 != nil || err2 != nil {
194                 return 0, fmt.Errorf("%s ; %s", err1, err2)
195         }
196         return len(p), nil
197 }
198
199 // Close the underlying writer
200 func (arvlog *ArvLogWriter) Close() (err error) {
201         if arvlog.writeCloser != nil {
202                 err = arvlog.writeCloser.Close()
203                 arvlog.writeCloser = nil
204         }
205         return err
206 }