Merge branch '9486-retry-instance-limit-exceeded' closes #9486
[arvados.git] / services / crunch-run / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "io"
9         "log"
10         "sync"
11         "time"
12 )
13
14 // Timestamper is the signature for a function that takes a timestamp and
15 // return a formated string value.
16 type Timestamper func(t time.Time) string
17
18 // Logging plumbing:
19 //
20 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
21 // ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
22 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
23 //
24 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
25 // data from the stdout/stderr Reader and send to the Logger.
26
27 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
28 // write, and periodically flushes to a downstream writer.  It supports the
29 // "Logger" and "WriteCloser" interfaces.
30 type ThrottledLogger struct {
31         *log.Logger
32         buf *bytes.Buffer
33         sync.Mutex
34         writer      io.WriteCloser
35         stop        bool
36         flusherDone chan bool
37         Timestamper
38         Immediate *log.Logger
39 }
40
41 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
42 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
43
44 // RFC3339Timestamp formats t as RFC3339NanoFixed.
45 func RFC3339Timestamp(t time.Time) string {
46         return t.Format(RFC3339NanoFixed)
47 }
48
49 // Write prepends a timestamp to each line of the input data and
50 // appends to the internal buffer. Each line is also logged to
51 // tl.Immediate, if tl.Immediate is not nil.
52 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
53         tl.Mutex.Lock()
54         if tl.buf == nil {
55                 tl.buf = &bytes.Buffer{}
56         }
57         defer tl.Mutex.Unlock()
58
59         now := tl.Timestamper(time.Now().UTC())
60         sc := bufio.NewScanner(bytes.NewBuffer(p))
61         for err == nil && sc.Scan() {
62                 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
63                 if tl.Immediate != nil {
64                         tl.Immediate.Print(out[:len(out)-1])
65                 }
66                 _, err = io.WriteString(tl.buf, out)
67         }
68         if err == nil {
69                 err = sc.Err()
70                 if err == nil {
71                         n = len(p)
72                 }
73         }
74         return
75 }
76
77 // Periodically check the current buffer; if not empty, send it on the
78 // channel to the goWriter goroutine.
79 func (tl *ThrottledLogger) flusher() {
80         bufchan := make(chan *bytes.Buffer)
81         bufterm := make(chan bool)
82
83         // Use a separate goroutine for the actual write so that the writes are
84         // actually initiated closer every 1s instead of every
85         // 1s + (time to it takes to write).
86         go goWriter(tl.writer, bufchan, bufterm)
87         for {
88                 if !tl.stop {
89                         time.Sleep(1 * time.Second)
90                 }
91                 tl.Mutex.Lock()
92                 if tl.buf != nil && tl.buf.Len() > 0 {
93                         oldbuf := tl.buf
94                         tl.buf = nil
95                         tl.Mutex.Unlock()
96                         bufchan <- oldbuf
97                 } else if tl.stop {
98                         tl.Mutex.Unlock()
99                         break
100                 } else {
101                         tl.Mutex.Unlock()
102                 }
103         }
104         close(bufchan)
105         <-bufterm
106         tl.flusherDone <- true
107 }
108
109 // Receive buffers from a channel and send to the underlying Writer
110 func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
111         for b := range c {
112                 writer.Write(b.Bytes())
113         }
114         t <- true
115 }
116
117 // Close the flusher goroutine and wait for it to complete, then close the
118 // underlying Writer.
119 func (tl *ThrottledLogger) Close() error {
120         tl.stop = true
121         <-tl.flusherDone
122         return tl.writer.Close()
123 }
124
125 const (
126         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
127         MaxLogLine = 1 << 12
128 )
129
130 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
131 // line splitting.
132 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
133         reader := bufio.NewReaderSize(in, MaxLogLine)
134         var prefix string
135         for {
136                 line, isPrefix, err := reader.ReadLine()
137                 if err == io.EOF {
138                         break
139                 } else if err != nil {
140                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
141                 }
142                 var suffix string
143                 if isPrefix {
144                         suffix = "[...]\n"
145                 }
146
147                 if prefix == "" && suffix == "" {
148                         writer.Write(line)
149                 } else {
150                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
151                 }
152
153                 // Set up prefix for following line
154                 if isPrefix {
155                         prefix = "[...]"
156                 } else {
157                         prefix = ""
158                 }
159         }
160         done <- true
161 }
162
163 // NewThrottledLogger creates a new thottled logger that
164 // (a) prepends timestamps to each line
165 // (b) batches log messages and only calls the underlying Writer at most once
166 // per second.
167 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
168         tl := &ThrottledLogger{}
169         tl.flusherDone = make(chan bool)
170         tl.writer = writer
171         tl.Logger = log.New(tl, "", 0)
172         tl.Timestamper = RFC3339Timestamp
173         go tl.flusher()
174         return tl
175 }
176
177 // ArvLogWriter is an io.WriteCloser that processes each write by
178 // writing it through to another io.WriteCloser (typically a
179 // CollectionFileWriter) and creating an Arvados log entry.
180 type ArvLogWriter struct {
181         ArvClient     IArvadosClient
182         UUID          string
183         loggingStream string
184         writeCloser   io.WriteCloser
185 }
186
187 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
188         // Write to the next writer in the chain (a file in Keep)
189         var err1 error
190         if arvlog.writeCloser != nil {
191                 _, err1 = arvlog.writeCloser.Write(p)
192         }
193
194         // write to API
195         lr := arvadosclient.Dict{"log": arvadosclient.Dict{
196                 "object_uuid": arvlog.UUID,
197                 "event_type":  arvlog.loggingStream,
198                 "properties":  map[string]string{"text": string(p)}}}
199         err2 := arvlog.ArvClient.Create("logs", lr, nil)
200
201         if err1 != nil || err2 != nil {
202                 return 0, fmt.Errorf("%s ; %s", err1, err2)
203         }
204         return len(p), nil
205 }
206
207 // Close the underlying writer
208 func (arvlog *ArvLogWriter) Close() (err error) {
209         if arvlog.writeCloser != nil {
210                 err = arvlog.writeCloser.Close()
211                 arvlog.writeCloser = nil
212         }
213         return err
214 }