+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
var crunchLogThrottleLines int64 = 1024
var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
var crunchLogBytesPerEvent int64 = 4096
-var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
+var crunchLogSecondsBetweenEvents = time.Second
+var crunchLogUpdatePeriod = time.Hour / 2
+var crunchLogUpdateSize = int64(1 << 25)
// ArvLogWriter is an io.WriteCloser that processes each write by
// writing it through to another io.WriteCloser (typically a
closing bool
}
-func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
+func (arvlog *ArvLogWriter) Write(p []byte) (int, error) {
// Write to the next writer in the chain (a file in Keep)
var err1 error
if arvlog.writeCloser != nil {
// write to API after checking rate limit
now := time.Now()
- bytesWritten := 0
if now.After(arvlog.logThrottleResetTime) {
// It has been more than throttle_period seconds since the last
"properties": map[string]string{"text": arvlog.bufToFlush.String()}}}
err2 := arvlog.ArvClient.Create("logs", lr, nil)
- bytesWritten = arvlog.bufToFlush.Len()
arvlog.bufToFlush = bytes.Buffer{}
arvlog.bufFlushedAt = now
}
}
- return bytesWritten, nil
+ return len(p), nil
}
// Close the underlying writer
}
}
-// load the rate limit discovery config paramters
+// load the rate limit discovery config parameters
func loadLogThrottleParams(clnt IArvadosClient) {
- param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
- if err == nil {
- crunchLimitLogBytesPerJob = int64(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottleBytes")
- if err == nil {
- crunchLogThrottleBytes = int64(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottlePeriod")
- if err == nil {
- crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottleLines")
- if err == nil {
- crunchLogThrottleLines = int64(param.(float64))
+ loadDuration := func(dst *time.Duration, key string) {
+ if param, err := clnt.Discovery(key); err != nil {
+ return
+ } else if d, ok := param.(float64); !ok {
+ return
+ } else {
+ *dst = time.Duration(d) * time.Second
+ }
}
-
- param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
- if err == nil {
- crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
+ loadInt64 := func(dst *int64, key string) {
+ if param, err := clnt.Discovery(key); err != nil {
+ return
+ } else if val, ok := param.(float64); !ok {
+ return
+ } else {
+ *dst = int64(val)
+ }
}
- param, err = clnt.Discovery("crunchLogBytesPerEvent")
- if err == nil {
- crunchLogBytesPerEvent = int64(param.(float64))
- }
+ loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
+ loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
+ loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
+ loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
+ loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
+ loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
+ loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
+ loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
+ loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
- param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
- if err == nil {
- crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
- }
}