+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
*log.Logger
buf *bytes.Buffer
sync.Mutex
- writer io.WriteCloser
- flush chan struct{}
- stopped chan struct{}
+ writer io.WriteCloser
+ flush chan struct{}
+ stopped chan struct{}
+ stopping chan struct{}
Timestamper
Immediate *log.Logger
pendingFlush bool
defer ticker.Stop()
for stopping := false; !stopping; {
select {
- case _, open := <-tl.flush:
- // if !open, will flush tl.buf and exit the loop
- stopping = !open
+ case <-tl.stopping:
+ // flush tl.buf and exit the loop
+ stopping = true
+ case <-tl.flush:
case <-ticker.C:
}
// underlying Writer.
func (tl *ThrottledLogger) Close() error {
select {
- case <-tl.flush:
+ case <-tl.stopping:
// already stopped
default:
- close(tl.flush)
+ close(tl.stopping)
}
<-tl.stopped
return tl.writer.Close()
tl := &ThrottledLogger{}
tl.flush = make(chan struct{}, 1)
tl.stopped = make(chan struct{})
+ tl.stopping = make(chan struct{})
tl.writer = writer
tl.Logger = log.New(tl, "", 0)
tl.Timestamper = RFC3339Timestamp
}
}
- if (int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
+ if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
(now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
// write to API
}
}
-// load the rate limit discovery config paramters
+// load the rate limit discovery config parameters
func loadLogThrottleParams(clnt IArvadosClient) {
param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
if err == nil {