7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
14 // Timestamper is the signature for a function that takes a timestamp and
15 // return a formated string value.
16 type Timestamper func(t time.Time) string
20 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
21 // ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
22 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
24 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
25 // data from the stdout/stderr Reader and send to the Logger.
27 // ThrottledLogger accepts writes, prepends a timestamp to each line of the
28 // write, and periodically flushes to a downstream writer. It supports the
29 // "Logger" and "WriteCloser" interfaces.
30 type ThrottledLogger struct {
41 // RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision,
42 // because the RFC3339Nano format isn't fixed width.
43 const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00"
45 // RFC3339Timestamp return a RFC3339 formatted timestamp using RFC3339Fixed
46 func RFC3339Timestamp(now time.Time) string {
47 return now.Format(RFC3339Fixed)
50 // Write to the internal buffer. Prepend a timestamp to each line of the input
52 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
55 tl.buf = &bytes.Buffer{}
57 defer tl.Mutex.Unlock()
59 now := tl.Timestamper(time.Now().UTC())
60 sc := bufio.NewScanner(bytes.NewBuffer(p))
62 _, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text())
63 if tl.Immediate != nil {
64 tl.Immediate.Printf("%s %s\n", now, sc.Text())
70 // Periodically check the current buffer; if not empty, send it on the
71 // channel to the goWriter goroutine.
72 func (tl *ThrottledLogger) flusher() {
73 bufchan := make(chan *bytes.Buffer)
74 bufterm := make(chan bool)
76 // Use a separate goroutine for the actual write so that the writes are
77 // actually initiated closer every 1s instead of every
78 // 1s + (time to it takes to write).
79 go goWriter(tl.writer, bufchan, bufterm)
82 time.Sleep(1 * time.Second)
85 if tl.buf != nil && tl.buf.Len() > 0 {
99 tl.flusherDone <- true
102 // Receive buffers from a channel and send to the underlying Writer
103 func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
105 writer.Write(b.Bytes())
110 // Close the flusher goroutine and wait for it to complete, then close the
111 // underlying Writer.
112 func (tl *ThrottledLogger) Close() error {
115 return tl.writer.Close()
119 // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
123 // ReadWriteLines reads lines from a reader and writes to a Writer, with long
125 func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
126 reader := bufio.NewReaderSize(in, MaxLogLine)
129 line, isPrefix, err := reader.ReadLine()
132 } else if err != nil {
133 writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
140 if prefix == "" && suffix == "" {
143 writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
146 // Set up prefix for following line
156 // NewThrottledLogger creates a new thottled logger that
157 // (a) prepends timestamps to each line
158 // (b) batches log messages and only calls the underlying Writer at most once
160 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
161 alw := &ThrottledLogger{}
162 alw.flusherDone = make(chan bool)
164 alw.Logger = log.New(alw, "", 0)
165 alw.Timestamper = RFC3339Timestamp
170 // ArvLogWriter implements a writer that writes to each of a WriteCloser
171 // (typically CollectionFileWriter) and creates an API server log entry.
172 type ArvLogWriter struct {
173 ArvClient IArvadosClient
176 writeCloser io.WriteCloser
179 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
180 // Write to the next writer in the chain (a file in Keep)
182 if arvlog.writeCloser != nil {
183 _, err1 = arvlog.writeCloser.Write(p)
187 lr := arvadosclient.Dict{"log": arvadosclient.Dict{
188 "object_uuid": arvlog.UUID,
189 "event_type": arvlog.loggingStream,
190 "properties": map[string]string{"text": string(p)}}}
191 err2 := arvlog.ArvClient.Create("logs", lr, nil)
193 if err1 != nil || err2 != nil {
194 return 0, fmt.Errorf("%s ; %s", err1, err2)
199 // Close the underlying writer
200 func (arvlog *ArvLogWriter) Close() (err error) {
201 if arvlog.writeCloser != nil {
202 err = arvlog.writeCloser.Close()
203 arvlog.writeCloser = nil