X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e54bce82515daeef450f53aa93d3acae397bfa5f..cbf93e8d897448dbd52369afe89fef2392140ff1:/services/crunch-run/logging.go?ds=sidebyside diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go index 3aedf51779..f8ddd563c6 100644 --- a/services/crunch-run/logging.go +++ b/services/crunch-run/logging.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -34,9 +38,10 @@ type ThrottledLogger struct { *log.Logger buf *bytes.Buffer sync.Mutex - writer io.WriteCloser - flush chan struct{} - stopped chan struct{} + writer io.WriteCloser + flush chan struct{} + stopped chan struct{} + stopping chan struct{} Timestamper Immediate *log.Logger pendingFlush bool @@ -97,9 +102,10 @@ func (tl *ThrottledLogger) flusher() { defer ticker.Stop() for stopping := false; !stopping; { select { - case _, open := <-tl.flush: - // if !open, will flush tl.buf and exit the loop - stopping = !open + case <-tl.stopping: + // flush tl.buf and exit the loop + stopping = true + case <-tl.flush: case <-ticker.C: } @@ -120,10 +126,10 @@ func (tl *ThrottledLogger) flusher() { // underlying Writer. func (tl *ThrottledLogger) Close() error { select { - case <-tl.flush: + case <-tl.stopping: // already stopped default: - close(tl.flush) + close(tl.stopping) } <-tl.stopped return tl.writer.Close() @@ -175,6 +181,7 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { tl := &ThrottledLogger{} tl.flush = make(chan struct{}, 1) tl.stopped = make(chan struct{}) + tl.stopping = make(chan struct{}) tl.writer = writer tl.Logger = log.New(tl, "", 0) tl.Timestamper = RFC3339Timestamp @@ -189,7 +196,9 @@ var crunchLogThrottlePeriod time.Duration = time.Second * 60 var crunchLogThrottleLines int64 = 1024 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5 var crunchLogBytesPerEvent int64 = 4096 -var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1 +var crunchLogSecondsBetweenEvents = time.Second +var crunchLogUpdatePeriod = time.Hour / 2 +var crunchLogUpdateSize = int64(1 << 25) // ArvLogWriter is an io.WriteCloser that processes each write by // writing it through to another io.WriteCloser (typically a @@ -214,7 +223,7 @@ type ArvLogWriter struct { closing bool } -func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { +func (arvlog *ArvLogWriter) Write(p []byte) (int, error) { // Write to the next writer in the chain (a file in Keep) var err1 error if arvlog.writeCloser != nil { @@ -223,7 +232,6 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { // write to API after checking rate limit now := time.Now() - bytesWritten := 0 if now.After(arvlog.logThrottleResetTime) { // It has been more than throttle_period seconds since the last @@ -258,7 +266,7 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { } } - if (int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent || + if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent || (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) || arvlog.closing) && (arvlog.bufToFlush.Len() > 0) { // write to API @@ -268,7 +276,6 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { "properties": map[string]string{"text": arvlog.bufToFlush.String()}}} err2 := arvlog.ArvClient.Create("logs", lr, nil) - bytesWritten = arvlog.bufToFlush.Len() arvlog.bufToFlush = bytes.Buffer{} arvlog.bufFlushedAt = now @@ -277,7 +284,7 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { } } - return bytesWritten, nil + return len(p), nil } // Close the underlying writer @@ -366,40 +373,35 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) } } -// load the rate limit discovery config paramters +// load the rate limit discovery config parameters func loadLogThrottleParams(clnt IArvadosClient) { - param, err := clnt.Discovery("crunchLimitLogBytesPerJob") - if err == nil { - crunchLimitLogBytesPerJob = int64(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogThrottleBytes") - if err == nil { - crunchLogThrottleBytes = int64(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogThrottlePeriod") - if err == nil { - crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64)) - } - - param, err = clnt.Discovery("crunchLogThrottleLines") - if err == nil { - crunchLogThrottleLines = int64(param.(float64)) + loadDuration := func(dst *time.Duration, key string) { + if param, err := clnt.Discovery(key); err != nil { + return + } else if d, ok := param.(float64); !ok { + return + } else { + *dst = time.Duration(d) * time.Second + } } - - param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod") - if err == nil { - crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64)) + loadInt64 := func(dst *int64, key string) { + if param, err := clnt.Discovery(key); err != nil { + return + } else if val, ok := param.(float64); !ok { + return + } else { + *dst = int64(val) + } } - param, err = clnt.Discovery("crunchLogBytesPerEvent") - if err == nil { - crunchLogBytesPerEvent = int64(param.(float64)) - } + loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob") + loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes") + loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod") + loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines") + loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod") + loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent") + loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents") + loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize") + loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod") - param, err = clnt.Discovery("crunchLogSecondsBetweenEvents") - if err == nil { - crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64)) - } }