8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15 type Timestamper func(t time.Time) string
19 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
20 // ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
21 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
23 // For stdout/stderr CopyReaderToLog additionally runs as a goroutine to pull
24 // data from the stdout/stderr Reader and send to the Logger.
26 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
27 // write, and periodically flushes to a downstream writer. It supports the
28 // "Logger" and "WriteCloser" interfaces.
29 type ThrottledLogger struct {
39 // Builtin RFC3339Nano format isn't fixed width so
40 // provide our own with microsecond precision (same as API server).
41 const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00"
43 func RFC3339Timestamp(now time.Time) string {
44 return now.Format(RFC3339Fixed)
47 // Write to the internal buffer. Prepend a timestamp to each line of the input
49 func (this *ThrottledLogger) Write(p []byte) (n int, err error) {
52 this.buf = &bytes.Buffer{}
54 defer this.Mutex.Unlock()
56 now := this.Timestamper(time.Now().UTC())
57 sc := bufio.NewScanner(bytes.NewBuffer(p))
59 _, err = fmt.Fprintf(this.buf, "%s %s\n", now, sc.Text())
64 // Periodically check the current buffer; if not empty, send it on the
65 // channel to the goWriter goroutine.
66 func (this *ThrottledLogger) flusher() {
67 bufchan := make(chan *bytes.Buffer)
68 bufterm := make(chan bool)
70 // Use a separate goroutine for the actual write so that the writes are
71 // actually initiated closer every 1s instead of every
72 // 1s + (time to it takes to write).
73 go goWriter(this.writer, bufchan, bufterm)
76 time.Sleep(1 * time.Second)
79 if this.buf != nil && this.buf.Len() > 0 {
93 this.flusherDone <- true
96 // Receive buffers from a channel and send to the underlying Writer
97 func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
99 writer.Write(b.Bytes())
104 // Stop the flusher goroutine and wait for it to complete, then close the
105 // underlying Writer.
106 func (this *ThrottledLogger) Close() error {
109 return this.writer.Close()
113 MaxLogLine = 1 << 12 // Child stderr lines >4KiB will be split
116 // Goroutine to copy from a reader to a logger, with long line splitting.
117 func CopyReaderToLog(in io.Reader, logger *log.Logger, done chan<- bool) {
118 reader := bufio.NewReaderSize(in, MaxLogLine)
121 line, isPrefix, err := reader.ReadLine()
124 } else if err != nil {
125 logger.Print("error reading container log:", err)
131 logger.Print(prefix, string(line), suffix)
132 // Set up prefix for following line
142 // Create a new thottled logger that
143 // (a) prepends timestamps to each line
144 // (b) batches log messages and only calls the underlying Writer at most once
147 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
148 alw := &ThrottledLogger{}
149 alw.flusherDone = make(chan bool)
151 alw.Logger = log.New(alw, "", 0)
152 alw.Timestamper = RFC3339Timestamp
157 // Implements a writer that writes to each of a WriteCloser (typically
158 // CollectionFileWriter) and creates an API server log entry.
159 type ArvLogWriter struct {
163 writeCloser io.WriteCloser
166 func (this *ArvLogWriter) Write(p []byte) (n int, err error) {
167 // Write to the next writer in the chain (a file in Keep)
169 if this.writeCloser != nil {
170 _, err1 = this.writeCloser.Write(p)
174 lr := arvadosclient.Dict{"object_uuid": this.Uuid,
175 "event_type": this.loggingStream,
176 "properties": map[string]string{"text": string(p)}}
177 err2 := this.Api.Create("logs", lr, nil)
179 if err1 != nil || err2 != nil {
180 return 0, errors.New(fmt.Sprintf("%s ; %s", err1, err2))
187 func (this *ArvLogWriter) Close() (err error) {
188 if this.writeCloser != nil {
189 err = this.writeCloser.Close()
190 this.writeCloser = nil