+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
*log.Logger
buf *bytes.Buffer
sync.Mutex
- writer io.WriteCloser
- flush chan struct{}
- stopped chan struct{}
+ writer io.WriteCloser
+ flush chan struct{}
+ stopped chan struct{}
+ stopping chan struct{}
Timestamper
Immediate *log.Logger
pendingFlush bool
defer ticker.Stop()
for stopping := false; !stopping; {
select {
- case _, open := <-tl.flush:
- // if !open, will flush tl.buf and exit the loop
- stopping = !open
+ case <-tl.stopping:
+ // flush tl.buf and exit the loop
+ stopping = true
+ case <-tl.flush:
case <-ticker.C:
}
// underlying Writer.
func (tl *ThrottledLogger) Close() error {
select {
- case <-tl.flush:
+ case <-tl.stopping:
// already stopped
default:
- close(tl.flush)
+ close(tl.stopping)
}
<-tl.stopped
return tl.writer.Close()
tl := &ThrottledLogger{}
tl.flush = make(chan struct{}, 1)
tl.stopped = make(chan struct{})
+ tl.stopping = make(chan struct{})
tl.writer = writer
tl.Logger = log.New(tl, "", 0)
tl.Timestamper = RFC3339Timestamp
closing bool
}
-func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
+func (arvlog *ArvLogWriter) Write(p []byte) (int, error) {
// Write to the next writer in the chain (a file in Keep)
var err1 error
if arvlog.writeCloser != nil {
// write to API after checking rate limit
now := time.Now()
- bytesWritten := 0
if now.After(arvlog.logThrottleResetTime) {
// It has been more than throttle_period seconds since the last
}
}
- if (int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
+ if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
(now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
// write to API
"properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
err2 := arvlog.ArvClient.Create("logs", lr, nil)
- bytesWritten = arvlog.bufToFlush.Len()
arvlog.bufToFlush = bytes.Buffer{}
arvlog.bufFlushedAt = now
}
}
- return bytesWritten, nil
+ return len(p), nil
}
// Close the underlying writer
}
}
-// load the rate limit discovery config paramters
+// load the rate limit discovery config parameters
func loadLogThrottleParams(clnt IArvadosClient) {
param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
if err == nil {