+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
*log.Logger
buf *bytes.Buffer
sync.Mutex
- writer io.WriteCloser
- flush chan struct{}
- stopped chan struct{}
+ writer io.WriteCloser
+ flush chan struct{}
+ stopped chan struct{}
+ stopping chan struct{}
Timestamper
Immediate *log.Logger
pendingFlush bool
defer ticker.Stop()
for stopping := false; !stopping; {
select {
- case _, open := <-tl.flush:
- // if !open, will flush tl.buf and exit the loop
- stopping = !open
+ case <-tl.stopping:
+ // flush tl.buf and exit the loop
+ stopping = true
+ case <-tl.flush:
case <-ticker.C:
}
// underlying Writer.
func (tl *ThrottledLogger) Close() error {
select {
- case <-tl.flush:
+ case <-tl.stopping:
// already stopped
default:
- close(tl.flush)
+ close(tl.stopping)
}
<-tl.stopped
return tl.writer.Close()
tl := &ThrottledLogger{}
tl.flush = make(chan struct{}, 1)
tl.stopped = make(chan struct{})
+ tl.stopping = make(chan struct{})
tl.writer = writer
tl.Logger = log.New(tl, "", 0)
tl.Timestamper = RFC3339Timestamp
var crunchLogThrottleLines int64 = 1024
var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
var crunchLogBytesPerEvent int64 = 4096
-var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
+var crunchLogSecondsBetweenEvents = time.Second
+var crunchLogUpdatePeriod = time.Hour / 2
+var crunchLogUpdateSize = int64(1 << 25)
// ArvLogWriter is an io.WriteCloser that processes each write by
// writing it through to another io.WriteCloser (typically a
closing bool
}
-func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
+func (arvlog *ArvLogWriter) Write(p []byte) (int, error) {
// Write to the next writer in the chain (a file in Keep)
var err1 error
if arvlog.writeCloser != nil {
// write to API after checking rate limit
now := time.Now()
- bytesWritten := 0
if now.After(arvlog.logThrottleResetTime) {
// It has been more than throttle_period seconds since the last
}
}
- if (int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
+ if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
(now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
// write to API
"properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
err2 := arvlog.ArvClient.Create("logs", lr, nil)
- bytesWritten = arvlog.bufToFlush.Len()
arvlog.bufToFlush = bytes.Buffer{}
arvlog.bufFlushedAt = now
}
}
- return bytesWritten, nil
+ return len(p), nil
}
// Close the underlying writer
}
}
-// load the rate limit discovery config paramters
+// load the rate limit discovery config parameters
func loadLogThrottleParams(clnt IArvadosClient) {
- param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
- if err == nil {
- crunchLimitLogBytesPerJob = int64(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottleBytes")
- if err == nil {
- crunchLogThrottleBytes = int64(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottlePeriod")
- if err == nil {
- crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottleLines")
- if err == nil {
- crunchLogThrottleLines = int64(param.(float64))
+ loadDuration := func(dst *time.Duration, key string) {
+ if param, err := clnt.Discovery(key); err != nil {
+ return
+ } else if d, ok := param.(float64); !ok {
+ return
+ } else {
+ *dst = time.Duration(d) * time.Second
+ }
}
-
- param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
- if err == nil {
- crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
+ loadInt64 := func(dst *int64, key string) {
+ if param, err := clnt.Discovery(key); err != nil {
+ return
+ } else if val, ok := param.(float64); !ok {
+ return
+ } else {
+ *dst = int64(val)
+ }
}
- param, err = clnt.Discovery("crunchLogBytesPerEvent")
- if err == nil {
- crunchLogBytesPerEvent = int64(param.(float64))
- }
+ loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
+ loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
+ loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
+ loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
+ loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
+ loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
+ loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
+ loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
+ loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
- param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
- if err == nil {
- crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
- }
}