11308: Merge branch 'master' into 11308-python3
[arvados.git] / services / crunch-run / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "io"
8         "log"
9         "sync"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
13 )
14
15 // Timestamper is the signature for a function that takes a timestamp and
16 // return a formated string value.
17 type Timestamper func(t time.Time) string
18
19 // Logging plumbing:
20 //
21 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
22 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
23 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
24 //
25 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
26 // data from the stdout/stderr Reader and send to the Logger.
27
28 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
29 // write, and periodically flushes to a downstream writer.  It supports the
30 // "Logger" and "WriteCloser" interfaces.
31 type ThrottledLogger struct {
32         *log.Logger
33         buf *bytes.Buffer
34         sync.Mutex
35         writer   io.WriteCloser
36         stopping chan struct{}
37         stopped  chan struct{}
38         Timestamper
39         Immediate *log.Logger
40 }
41
42 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
43 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
44
45 // RFC3339Timestamp formats t as RFC3339NanoFixed.
46 func RFC3339Timestamp(t time.Time) string {
47         return t.Format(RFC3339NanoFixed)
48 }
49
50 // Write prepends a timestamp to each line of the input data and
51 // appends to the internal buffer. Each line is also logged to
52 // tl.Immediate, if tl.Immediate is not nil.
53 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
54         tl.Mutex.Lock()
55         defer tl.Mutex.Unlock()
56
57         if tl.buf == nil {
58                 tl.buf = &bytes.Buffer{}
59         }
60
61         now := tl.Timestamper(time.Now().UTC())
62         sc := bufio.NewScanner(bytes.NewBuffer(p))
63         for err == nil && sc.Scan() {
64                 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
65                 if tl.Immediate != nil {
66                         tl.Immediate.Print(out[:len(out)-1])
67                 }
68                 _, err = io.WriteString(tl.buf, out)
69         }
70         if err == nil {
71                 err = sc.Err()
72                 if err == nil {
73                         n = len(p)
74                 }
75         }
76         return
77 }
78
79 // Periodically check the current buffer; if not empty, send it on the
80 // channel to the goWriter goroutine.
81 func (tl *ThrottledLogger) flusher() {
82         ticker := time.NewTicker(time.Second)
83         defer ticker.Stop()
84         for stopping := false; !stopping; {
85                 select {
86                 case <-tl.stopping:
87                         // flush tl.buf, then exit the loop
88                         stopping = true
89                 case <-ticker.C:
90                 }
91
92                 var ready *bytes.Buffer
93
94                 tl.Mutex.Lock()
95                 ready, tl.buf = tl.buf, nil
96                 tl.Mutex.Unlock()
97
98                 if ready != nil && ready.Len() > 0 {
99                         tl.writer.Write(ready.Bytes())
100                 }
101         }
102         close(tl.stopped)
103 }
104
105 // Close the flusher goroutine and wait for it to complete, then close the
106 // underlying Writer.
107 func (tl *ThrottledLogger) Close() error {
108         select {
109         case <-tl.stopping:
110                 // already stopped
111         default:
112                 close(tl.stopping)
113         }
114         <-tl.stopped
115         return tl.writer.Close()
116 }
117
118 const (
119         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
120         MaxLogLine = 1 << 12
121 )
122
123 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
124 // line splitting.
125 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
126         reader := bufio.NewReaderSize(in, MaxLogLine)
127         var prefix string
128         for {
129                 line, isPrefix, err := reader.ReadLine()
130                 if err == io.EOF {
131                         break
132                 } else if err != nil {
133                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
134                 }
135                 var suffix string
136                 if isPrefix {
137                         suffix = "[...]\n"
138                 }
139
140                 if prefix == "" && suffix == "" {
141                         writer.Write(line)
142                 } else {
143                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
144                 }
145
146                 // Set up prefix for following line
147                 if isPrefix {
148                         prefix = "[...]"
149                 } else {
150                         prefix = ""
151                 }
152         }
153         done <- true
154 }
155
156 // NewThrottledLogger creates a new thottled logger that
157 // (a) prepends timestamps to each line
158 // (b) batches log messages and only calls the underlying Writer at most once
159 // per second.
160 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
161         tl := &ThrottledLogger{}
162         tl.stopping = make(chan struct{})
163         tl.stopped = make(chan struct{})
164         tl.writer = writer
165         tl.Logger = log.New(tl, "", 0)
166         tl.Timestamper = RFC3339Timestamp
167         go tl.flusher()
168         return tl
169 }
170
171 // ArvLogWriter is an io.WriteCloser that processes each write by
172 // writing it through to another io.WriteCloser (typically a
173 // CollectionFileWriter) and creating an Arvados log entry.
174 type ArvLogWriter struct {
175         ArvClient     IArvadosClient
176         UUID          string
177         loggingStream string
178         writeCloser   io.WriteCloser
179 }
180
181 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
182         // Write to the next writer in the chain (a file in Keep)
183         var err1 error
184         if arvlog.writeCloser != nil {
185                 _, err1 = arvlog.writeCloser.Write(p)
186         }
187
188         // write to API
189         lr := arvadosclient.Dict{"log": arvadosclient.Dict{
190                 "object_uuid": arvlog.UUID,
191                 "event_type":  arvlog.loggingStream,
192                 "properties":  map[string]string{"text": string(p)}}}
193         err2 := arvlog.ArvClient.Create("logs", lr, nil)
194
195         if err1 != nil || err2 != nil {
196                 return 0, fmt.Errorf("%s ; %s", err1, err2)
197         }
198         return len(p), nil
199 }
200
201 // Close the underlying writer
202 func (arvlog *ArvLogWriter) Close() (err error) {
203         if arvlog.writeCloser != nil {
204                 err = arvlog.writeCloser.Close()
205                 arvlog.writeCloser = nil
206         }
207         return err
208 }