From: Tom Clegg Date: Thu, 4 Aug 2016 21:44:34 +0000 (-0400) Subject: Simplify write flusher using a time.Ticker. X-Git-Tag: 1.1.0~803 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/13c821b6dd20ed658db9c6c3db0b8d10d9175fbb Simplify write flusher using a time.Ticker. No issue # --- diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go index a2aa5c6b48..4f8f95c9a6 100644 --- a/services/crunch-run/logging.go +++ b/services/crunch-run/logging.go @@ -18,7 +18,7 @@ type Timestamper func(t time.Time) string // Logging plumbing: // // ThrottledLogger.Logger -> ThrottledLogger.Write -> -// ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter -> +// ThrottledLogger.buf -> ThrottledLogger.flusher -> // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create // // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull @@ -51,10 +51,11 @@ func RFC3339Timestamp(t time.Time) string { // tl.Immediate, if tl.Immediate is not nil. func (tl *ThrottledLogger) Write(p []byte) (n int, err error) { tl.Mutex.Lock() + defer tl.Mutex.Unlock() + if tl.buf == nil { tl.buf = &bytes.Buffer{} } - defer tl.Mutex.Unlock() now := tl.Timestamper(time.Now().UTC()) sc := bufio.NewScanner(bytes.NewBuffer(p)) @@ -77,39 +78,28 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) { // Periodically check the current buffer; if not empty, send it on the // channel to the goWriter goroutine. func (tl *ThrottledLogger) flusher() { - bufchan := make(chan *bytes.Buffer) - bufterm := make(chan bool) - - // Use a separate goroutine for the actual write so that the writes are - // actually initiated closer every 1s instead of every - // 1s + (time to it takes to write). - go goWriter(tl.writer, bufchan, bufterm) - - // We use a separate "stopping" var here to ensure we flush - // tl.buf after tl.stop becomes true. - stopping := false - for !stopping { - time.Sleep(time.Second) - stopping = tl.stop + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for range ticker.C { + // We use a separate "stopping" var here to ensure we flush + // tl.buf after tl.stop becomes true. + stopping := tl.stop + + var ready *bytes.Buffer + tl.Mutex.Lock() - if tl.buf != nil && tl.buf.Len() > 0 { - oldbuf := tl.buf - tl.buf = nil - bufchan <- oldbuf - } + ready, tl.buf = tl.buf, nil tl.Mutex.Unlock() - } - close(bufchan) - <-bufterm - tl.flusherDone <- true -} -// Receive buffers from a channel and send to the underlying Writer -func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) { - for b := range c { - writer.Write(b.Bytes()) + if ready != nil && ready.Len() > 0 { + tl.writer.Write(ready.Bytes()) + } + + if stopping { + break + } } - t <- true + close(tl.flusherDone) } // Close the flusher goroutine and wait for it to complete, then close the