9956: Remove obsolete TODO comment.
[arvados.git] / services / crunch-run / logging.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "io"
9         "log"
10         "sync"
11         "time"
12 )
13
14 // Timestamper is the signature for a function that takes a timestamp and
15 // return a formated string value.
16 type Timestamper func(t time.Time) string
17
18 // Logging plumbing:
19 //
20 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
21 // ThrottledLogger.buf -> ThrottledLogger.flusher ->
22 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
23 //
24 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
25 // data from the stdout/stderr Reader and send to the Logger.
26
27 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
28 // write, and periodically flushes to a downstream writer.  It supports the
29 // "Logger" and "WriteCloser" interfaces.
30 type ThrottledLogger struct {
31         *log.Logger
32         buf *bytes.Buffer
33         sync.Mutex
34         writer      io.WriteCloser
35         stop        bool
36         flusherDone chan bool
37         Timestamper
38         Immediate *log.Logger
39 }
40
41 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
42 const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
43
44 // RFC3339Timestamp formats t as RFC3339NanoFixed.
45 func RFC3339Timestamp(t time.Time) string {
46         return t.Format(RFC3339NanoFixed)
47 }
48
49 // Write prepends a timestamp to each line of the input data and
50 // appends to the internal buffer. Each line is also logged to
51 // tl.Immediate, if tl.Immediate is not nil.
52 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
53         tl.Mutex.Lock()
54         defer tl.Mutex.Unlock()
55
56         if tl.buf == nil {
57                 tl.buf = &bytes.Buffer{}
58         }
59
60         now := tl.Timestamper(time.Now().UTC())
61         sc := bufio.NewScanner(bytes.NewBuffer(p))
62         for err == nil && sc.Scan() {
63                 out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
64                 if tl.Immediate != nil {
65                         tl.Immediate.Print(out[:len(out)-1])
66                 }
67                 _, err = io.WriteString(tl.buf, out)
68         }
69         if err == nil {
70                 err = sc.Err()
71                 if err == nil {
72                         n = len(p)
73                 }
74         }
75         return
76 }
77
78 // Periodically check the current buffer; if not empty, send it on the
79 // channel to the goWriter goroutine.
80 func (tl *ThrottledLogger) flusher() {
81         ticker := time.NewTicker(time.Second)
82         defer ticker.Stop()
83         for range ticker.C {
84                 // We use a separate "stopping" var here to ensure we flush
85                 // tl.buf after tl.stop becomes true.
86                 stopping := tl.stop
87
88                 var ready *bytes.Buffer
89
90                 tl.Mutex.Lock()
91                 ready, tl.buf = tl.buf, nil
92                 tl.Mutex.Unlock()
93
94                 if ready != nil && ready.Len() > 0 {
95                         tl.writer.Write(ready.Bytes())
96                 }
97
98                 if stopping {
99                         break
100                 }
101         }
102         close(tl.flusherDone)
103 }
104
105 // Close the flusher goroutine and wait for it to complete, then close the
106 // underlying Writer.
107 func (tl *ThrottledLogger) Close() error {
108         tl.stop = true
109         <-tl.flusherDone
110         return tl.writer.Close()
111 }
112
113 const (
114         // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
115         MaxLogLine = 1 << 12
116 )
117
118 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
119 // line splitting.
120 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
121         reader := bufio.NewReaderSize(in, MaxLogLine)
122         var prefix string
123         for {
124                 line, isPrefix, err := reader.ReadLine()
125                 if err == io.EOF {
126                         break
127                 } else if err != nil {
128                         writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
129                 }
130                 var suffix string
131                 if isPrefix {
132                         suffix = "[...]\n"
133                 }
134
135                 if prefix == "" && suffix == "" {
136                         writer.Write(line)
137                 } else {
138                         writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
139                 }
140
141                 // Set up prefix for following line
142                 if isPrefix {
143                         prefix = "[...]"
144                 } else {
145                         prefix = ""
146                 }
147         }
148         done <- true
149 }
150
151 // NewThrottledLogger creates a new thottled logger that
152 // (a) prepends timestamps to each line
153 // (b) batches log messages and only calls the underlying Writer at most once
154 // per second.
155 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
156         tl := &ThrottledLogger{}
157         tl.flusherDone = make(chan bool)
158         tl.writer = writer
159         tl.Logger = log.New(tl, "", 0)
160         tl.Timestamper = RFC3339Timestamp
161         go tl.flusher()
162         return tl
163 }
164
165 // ArvLogWriter is an io.WriteCloser that processes each write by
166 // writing it through to another io.WriteCloser (typically a
167 // CollectionFileWriter) and creating an Arvados log entry.
168 type ArvLogWriter struct {
169         ArvClient     IArvadosClient
170         UUID          string
171         loggingStream string
172         writeCloser   io.WriteCloser
173 }
174
175 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
176         // Write to the next writer in the chain (a file in Keep)
177         var err1 error
178         if arvlog.writeCloser != nil {
179                 _, err1 = arvlog.writeCloser.Write(p)
180         }
181
182         // write to API
183         lr := arvadosclient.Dict{"log": arvadosclient.Dict{
184                 "object_uuid": arvlog.UUID,
185                 "event_type":  arvlog.loggingStream,
186                 "properties":  map[string]string{"text": string(p)}}}
187         err2 := arvlog.ArvClient.Create("logs", lr, nil)
188
189         if err1 != nil || err2 != nil {
190                 return 0, fmt.Errorf("%s ; %s", err1, err2)
191         }
192         return len(p), nil
193 }
194
195 // Close the underlying writer
196 func (arvlog *ArvLogWriter) Close() (err error) {
197         if arvlog.writeCloser != nil {
198                 err = arvlog.writeCloser.Close()
199                 arvlog.writeCloser = nil
200         }
201         return err
202 }