X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/224f384d411bb1b4cccc7165c55bb64fd5c695ad..a8378b8deaa2bbf9d2c154d9d9bb072538c288cc:/services/crunch-run/logging.go diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go index 4f8f95c9a6..5254ff671e 100644 --- a/services/crunch-run/logging.go +++ b/services/crunch-run/logging.go @@ -4,11 +4,12 @@ import ( "bufio" "bytes" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "io" "log" "sync" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" ) // Timestamper is the signature for a function that takes a timestamp and @@ -31,9 +32,9 @@ type ThrottledLogger struct { *log.Logger buf *bytes.Buffer sync.Mutex - writer io.WriteCloser - stop bool - flusherDone chan bool + writer io.WriteCloser + stopping chan struct{} + stopped chan struct{} Timestamper Immediate *log.Logger } @@ -80,10 +81,13 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) { func (tl *ThrottledLogger) flusher() { ticker := time.NewTicker(time.Second) defer ticker.Stop() - for range ticker.C { - // We use a separate "stopping" var here to ensure we flush - // tl.buf after tl.stop becomes true. - stopping := tl.stop + for stopping := false; !stopping; { + select { + case <-tl.stopping: + // flush tl.buf, then exit the loop + stopping = true + case <-ticker.C: + } var ready *bytes.Buffer @@ -94,19 +98,20 @@ func (tl *ThrottledLogger) flusher() { if ready != nil && ready.Len() > 0 { tl.writer.Write(ready.Bytes()) } - - if stopping { - break - } } - close(tl.flusherDone) + close(tl.stopped) } // Close the flusher goroutine and wait for it to complete, then close the // underlying Writer. func (tl *ThrottledLogger) Close() error { - tl.stop = true - <-tl.flusherDone + select { + case <-tl.stopping: + // already stopped + default: + close(tl.stopping) + } + <-tl.stopped return tl.writer.Close() } @@ -154,7 +159,8 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) { // per second. func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { tl := &ThrottledLogger{} - tl.flusherDone = make(chan bool) + tl.stopping = make(chan struct{}) + tl.stopped = make(chan struct{}) tl.writer = writer tl.Logger = log.New(tl, "", 0) tl.Timestamper = RFC3339Timestamp