X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3a0aa1db801154916f50b1b299d5100945a3e1df..de4cfeaab629e99014deb10792637649511c91d0:/services/crunch-run/logging.go diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go index db9d101b58..4f8f95c9a6 100644 --- a/services/crunch-run/logging.go +++ b/services/crunch-run/logging.go @@ -18,7 +18,7 @@ type Timestamper func(t time.Time) string // Logging plumbing: // // ThrottledLogger.Logger -> ThrottledLogger.Write -> -// ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter -> +// ThrottledLogger.buf -> ThrottledLogger.flusher -> // ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create // // For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull @@ -51,10 +51,11 @@ func RFC3339Timestamp(t time.Time) string { // tl.Immediate, if tl.Immediate is not nil. func (tl *ThrottledLogger) Write(p []byte) (n int, err error) { tl.Mutex.Lock() + defer tl.Mutex.Unlock() + if tl.buf == nil { tl.buf = &bytes.Buffer{} } - defer tl.Mutex.Unlock() now := tl.Timestamper(time.Now().UTC()) sc := bufio.NewScanner(bytes.NewBuffer(p)) @@ -77,41 +78,28 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) { // Periodically check the current buffer; if not empty, send it on the // channel to the goWriter goroutine. func (tl *ThrottledLogger) flusher() { - bufchan := make(chan *bytes.Buffer) - bufterm := make(chan bool) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for range ticker.C { + // We use a separate "stopping" var here to ensure we flush + // tl.buf after tl.stop becomes true. + stopping := tl.stop + + var ready *bytes.Buffer - // Use a separate goroutine for the actual write so that the writes are - // actually initiated closer every 1s instead of every - // 1s + (time to it takes to write). - go goWriter(tl.writer, bufchan, bufterm) - for { - if !tl.stop { - time.Sleep(1 * time.Second) - } tl.Mutex.Lock() - if tl.buf != nil && tl.buf.Len() > 0 { - oldbuf := tl.buf - tl.buf = nil - tl.Mutex.Unlock() - bufchan <- oldbuf - } else if tl.stop { - tl.Mutex.Unlock() - break - } else { - tl.Mutex.Unlock() + ready, tl.buf = tl.buf, nil + tl.Mutex.Unlock() + + if ready != nil && ready.Len() > 0 { + tl.writer.Write(ready.Bytes()) } - } - close(bufchan) - <-bufterm - tl.flusherDone <- true -} -// Receive buffers from a channel and send to the underlying Writer -func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) { - for b := range c { - writer.Write(b.Bytes()) + if stopping { + break + } } - t <- true + close(tl.flusherDone) } // Close the flusher goroutine and wait for it to complete, then close the