10777: Close and flush logs right away instead of waiting for next tick.
authorTom Clegg <tom@curoverse.com>
Sat, 25 Feb 2017 06:56:53 +0000 (01:56 -0500)
committerTom Clegg <tom@curoverse.com>
Sat, 25 Feb 2017 06:57:48 +0000 (01:57 -0500)
services/crunch-run/logging.go

index 4f8f95c9a6df5725dfc3bb3f4bb6a5eb9962250d..5254ff671e6d3826356722259069515d450c1066 100644 (file)
@@ -4,11 +4,12 @@ import (
        "bufio"
        "bytes"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "io"
        "log"
        "sync"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 )
 
 // Timestamper is the signature for a function that takes a timestamp and
@@ -31,9 +32,9 @@ type ThrottledLogger struct {
        *log.Logger
        buf *bytes.Buffer
        sync.Mutex
-       writer      io.WriteCloser
-       stop        bool
-       flusherDone chan bool
+       writer   io.WriteCloser
+       stopping chan struct{}
+       stopped  chan struct{}
        Timestamper
        Immediate *log.Logger
 }
@@ -80,10 +81,13 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
 func (tl *ThrottledLogger) flusher() {
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
-       for range ticker.C {
-               // We use a separate "stopping" var here to ensure we flush
-               // tl.buf after tl.stop becomes true.
-               stopping := tl.stop
+       for stopping := false; !stopping; {
+               select {
+               case <-tl.stopping:
+                       // flush tl.buf, then exit the loop
+                       stopping = true
+               case <-ticker.C:
+               }
 
                var ready *bytes.Buffer
 
@@ -94,19 +98,20 @@ func (tl *ThrottledLogger) flusher() {
                if ready != nil && ready.Len() > 0 {
                        tl.writer.Write(ready.Bytes())
                }
-
-               if stopping {
-                       break
-               }
        }
-       close(tl.flusherDone)
+       close(tl.stopped)
 }
 
 // Close the flusher goroutine and wait for it to complete, then close the
 // underlying Writer.
 func (tl *ThrottledLogger) Close() error {
-       tl.stop = true
-       <-tl.flusherDone
+       select {
+       case <-tl.stopping:
+               // already stopped
+       default:
+               close(tl.stopping)
+       }
+       <-tl.stopped
        return tl.writer.Close()
 }
 
@@ -154,7 +159,8 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
 // per second.
 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
        tl := &ThrottledLogger{}
-       tl.flusherDone = make(chan bool)
+       tl.stopping = make(chan struct{})
+       tl.stopped = make(chan struct{})
        tl.writer = writer
        tl.Logger = log.New(tl, "", 0)
        tl.Timestamper = RFC3339Timestamp