// Logging plumbing:
//
// ThrottledLogger.Logger -> ThrottledLogger.Write ->
-// ThrottledLogger.buf -> ThrottledLogger.flusher -> goWriter ->
+// ThrottledLogger.buf -> ThrottledLogger.flusher ->
// ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
//
// For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
// tl.Immediate, if tl.Immediate is not nil.
func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
tl.Mutex.Lock()
+ defer tl.Mutex.Unlock()
+
if tl.buf == nil {
tl.buf = &bytes.Buffer{}
}
- defer tl.Mutex.Unlock()
now := tl.Timestamper(time.Now().UTC())
sc := bufio.NewScanner(bytes.NewBuffer(p))
// Periodically check the current buffer; if not empty, send it on the
// channel to the goWriter goroutine.
func (tl *ThrottledLogger) flusher() {
- bufchan := make(chan *bytes.Buffer)
- bufterm := make(chan bool)
-
- // Use a separate goroutine for the actual write so that the writes are
- // actually initiated closer every 1s instead of every
- // 1s + (time to it takes to write).
- go goWriter(tl.writer, bufchan, bufterm)
-
- // We use a separate "stopping" var here to ensure we flush
- // tl.buf after tl.stop becomes true.
- stopping := false
- for !stopping {
- time.Sleep(time.Second)
- stopping = tl.stop
+ ticker := time.NewTicker(time.Second)
+ defer ticker.Stop()
+ for range ticker.C {
+ // We use a separate "stopping" var here to ensure we flush
+ // tl.buf after tl.stop becomes true.
+ stopping := tl.stop
+
+ var ready *bytes.Buffer
+
tl.Mutex.Lock()
- if tl.buf != nil && tl.buf.Len() > 0 {
- oldbuf := tl.buf
- tl.buf = nil
- bufchan <- oldbuf
- }
+ ready, tl.buf = tl.buf, nil
tl.Mutex.Unlock()
- }
- close(bufchan)
- <-bufterm
- tl.flusherDone <- true
-}
-// Receive buffers from a channel and send to the underlying Writer
-func goWriter(writer io.Writer, c <-chan *bytes.Buffer, t chan<- bool) {
- for b := range c {
- writer.Write(b.Bytes())
+ if ready != nil && ready.Len() > 0 {
+ tl.writer.Write(ready.Bytes())
+ }
+
+ if stopping {
+ break
+ }
}
- t <- true
+ close(tl.flusherDone)
}
// Close the flusher goroutine and wait for it to complete, then close the