Simplify write flusher using a time.Ticker.
authorTom Clegg <tom@curoverse.com>
Thu, 4 Aug 2016 21:44:34 +0000 (17:44 -0400)
committerTom Clegg <tom@curoverse.com>
Thu, 4 Aug 2016 21:45:24 +0000 (17:45 -0400)
No issue #

services/crunch-run/logging.go

index a2aa5c6b48d8ca9b9d970323ab43175d45b063a6..4f8f95c9a6df5725dfc3bb3f4bb6a5eb9962250d 100644 (file)
@@ -18,7 +18,7 @@ type Timestamper func(t time.Time) string
 // Logging plumbing:
 //
 // ThrottledLogger.Logger -> ThrottledLogger.Write ->
-// ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
+// ThrottledLogger.buf -> ThrottledLogger.flusher ->
 // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
 //
 // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
@@ -51,10 +51,11 @@ func RFC3339Timestamp(t time.Time) string {
 // tl.Immediate, if tl.Immediate is not nil.
 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
        tl.Mutex.Lock()
+       defer tl.Mutex.Unlock()
+
        if tl.buf == nil {
                tl.buf = &bytes.Buffer{}
        }
-       defer tl.Mutex.Unlock()
 
        now := tl.Timestamper(time.Now().UTC())
        sc := bufio.NewScanner(bytes.NewBuffer(p))
@@ -77,39 +78,28 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
 // Periodically check the current buffer; if not empty, send it on the
 // channel to the goWriter goroutine.
 func (tl *ThrottledLogger) flusher() {
-       bufchan := make(chan *bytes.Buffer)
-       bufterm := make(chan bool)
-
-       // Use a separate goroutine for the actual write so that the writes are
-       // actually initiated closer every 1s instead of every
-       // 1s + (time to it takes to write).
-       go goWriter(tl.writer, bufchan, bufterm)
-
-       // We use a separate "stopping" var here to ensure we flush
-       // tl.buf after tl.stop becomes true.
-       stopping := false
-       for !stopping {
-               time.Sleep(time.Second)
-               stopping = tl.stop
+       ticker := time.NewTicker(time.Second)
+       defer ticker.Stop()
+       for range ticker.C {
+               // We use a separate "stopping" var here to ensure we flush
+               // tl.buf after tl.stop becomes true.
+               stopping := tl.stop
+
+               var ready *bytes.Buffer
+
                tl.Mutex.Lock()
-               if tl.buf != nil && tl.buf.Len() > 0 {
-                       oldbuf := tl.buf
-                       tl.buf = nil
-                       bufchan <- oldbuf
-               }
+               ready, tl.buf = tl.buf, nil
                tl.Mutex.Unlock()
-       }
-       close(bufchan)
-       <-bufterm
-       tl.flusherDone <- true
-}
 
-// Receive buffers from a channel and send to the underlying Writer
-func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
-       for b := range c {
-               writer.Write(b.Bytes())
+               if ready != nil && ready.Len() > 0 {
+                       tl.writer.Write(ready.Bytes())
+               }
+
+               if stopping {
+                       break
+               }
        }
-       t <- true
+       close(tl.flusherDone)
 }
 
 // Close the flusher goroutine and wait for it to complete, then close the