10484: Track non-s3 errors by Go type.
[arvados.git] / services / crunch-run / logging.go
index 9d97384109597d9b0e7e56ad96657c1171fafbf5..4f8f95c9a6df5725dfc3bb3f4bb6a5eb9962250d 100644 (file)
@@ -18,7 +18,7 @@ type Timestamper func(t time.Time) string
 // Logging plumbing:
 //
 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
-// ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
+// ThrottledLogger.buf -> ThrottledLogger.flusher ->
 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
 //
 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
@@ -35,72 +35,71 @@ type ThrottledLogger struct {
        stop        bool
        flusherDone chan bool
        Timestamper
+       Immediate *log.Logger
 }
 
-// RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision,
-// because the RFC3339Nano format isn't fixed width.
-const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00"
+// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
+const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
 
-// RFC3339Timestamp return a RFC3339 formatted timestamp using RFC3339Fixed
-func RFC3339Timestamp(now time.Time) string {
-       return now.Format(RFC3339Fixed)
+// RFC3339Timestamp formats t as RFC3339NanoFixed.
+func RFC3339Timestamp(t time.Time) string {
+       return t.Format(RFC3339NanoFixed)
 }
 
-// Write to the internal buffer.  Prepend a timestamp to each line of the input
-// data.
+// Write prepends a timestamp to each line of the input data and
+// appends to the internal buffer. Each line is also logged to
+// tl.Immediate, if tl.Immediate is not nil.
 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
        tl.Mutex.Lock()
+       defer tl.Mutex.Unlock()
+
        if tl.buf == nil {
                tl.buf = &bytes.Buffer{}
        }
-       defer tl.Mutex.Unlock()
 
        now := tl.Timestamper(time.Now().UTC())
        sc := bufio.NewScanner(bytes.NewBuffer(p))
-       for sc.Scan() {
-               _, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text())
+       for err == nil && sc.Scan() {
+               out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
+               if tl.Immediate != nil {
+                       tl.Immediate.Print(out[:len(out)-1])
+               }
+               _, err = io.WriteString(tl.buf, out)
        }
-       return len(p), err
+       if err == nil {
+               err = sc.Err()
+               if err == nil {
+                       n = len(p)
+               }
+       }
+       return
 }
 
 // Periodically check the current buffer; if not empty, send it on the
 // channel to the goWriter goroutine.
 func (tl *ThrottledLogger) flusher() {
-       bufchan := make(chan *bytes.Buffer)
-       bufterm := make(chan bool)
+       ticker := time.NewTicker(time.Second)
+       defer ticker.Stop()
+       for range ticker.C {
+               // We use a separate "stopping" var here to ensure we flush
+               // tl.buf after tl.stop becomes true.
+               stopping := tl.stop
+
+               var ready *bytes.Buffer
 
-       // Use a separate goroutine for the actual write so that the writes are
-       // actually initiated closer every 1s instead of every
-       // 1s + (time to it takes to write).
-       go goWriter(tl.writer, bufchan, bufterm)
-       for {
-               if !tl.stop {
-                       time.Sleep(1 * time.Second)
-               }
                tl.Mutex.Lock()
-               if tl.buf != nil && tl.buf.Len() > 0 {
-                       oldbuf := tl.buf
-                       tl.buf = nil
-                       tl.Mutex.Unlock()
-                       bufchan <- oldbuf
-               } else if tl.stop {
-                       tl.Mutex.Unlock()
-                       break
-               } else {
-                       tl.Mutex.Unlock()
+               ready, tl.buf = tl.buf, nil
+               tl.Mutex.Unlock()
+
+               if ready != nil && ready.Len() > 0 {
+                       tl.writer.Write(ready.Bytes())
                }
-       }
-       close(bufchan)
-       <-bufterm
-       tl.flusherDone <- true
-}
 
-// Receive buffers from a channel and send to the underlying Writer
-func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
-       for b := range c {
-               writer.Write(b.Bytes())
+               if stopping {
+                       break
+               }
        }
-       t <- true
+       close(tl.flusherDone)
 }
 
 // Close the flusher goroutine and wait for it to complete, then close the
@@ -154,17 +153,18 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
 // (b) batches log messages and only calls the underlying Writer at most once
 // per second.
 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
-       alw := &ThrottledLogger{}
-       alw.flusherDone = make(chan bool)
-       alw.writer = writer
-       alw.Logger = log.New(alw, "", 0)
-       alw.Timestamper = RFC3339Timestamp
-       go alw.flusher()
-       return alw
+       tl := &ThrottledLogger{}
+       tl.flusherDone = make(chan bool)
+       tl.writer = writer
+       tl.Logger = log.New(tl, "", 0)
+       tl.Timestamper = RFC3339Timestamp
+       go tl.flusher()
+       return tl
 }
 
-// ArvLogWriter implements a writer that writes to each of a WriteCloser
-// (typically CollectionFileWriter) and creates an API server log entry.
+// ArvLogWriter is an io.WriteCloser that processes each write by
+// writing it through to another io.WriteCloser (typically a
+// CollectionFileWriter) and creating an Arvados log entry.
 type ArvLogWriter struct {
        ArvClient     IArvadosClient
        UUID          string
@@ -180,9 +180,10 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
        }
 
        // write to API
-       lr := arvadosclient.Dict{"object_uuid": arvlog.UUID,
-               "event_type": arvlog.loggingStream,
-               "properties": map[string]string{"text": string(p)}}
+       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+               "object_uuid": arvlog.UUID,
+               "event_type":  arvlog.loggingStream,
+               "properties":  map[string]string{"text": string(p)}}}
        err2 := arvlog.ArvClient.Create("logs", lr, nil)
 
        if err1 != nil || err2 != nil {