X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f10431fe2de1037c3c0d51d8238cecf6c1206703..95be914af0ab0a82c4fa92b3f9c29ebec88e8595:/services/crunch-run/logging.go diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go index 96feb5faab..ce0a661263 100644 --- a/services/crunch-run/logging.go +++ b/services/crunch-run/logging.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -175,7 +179,7 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) { // at most once per "crunchLogSecondsBetweenEvents" seconds. func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { tl := &ThrottledLogger{} - tl.flush = make(chan struct{}) + tl.flush = make(chan struct{}, 1) tl.stopped = make(chan struct{}) tl.stopping = make(chan struct{}) tl.writer = writer @@ -217,7 +221,7 @@ type ArvLogWriter struct { closing bool } -func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { +func (arvlog *ArvLogWriter) Write(p []byte) (int, error) { // Write to the next writer in the chain (a file in Keep) var err1 error if arvlog.writeCloser != nil { @@ -226,7 +230,6 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { // write to API after checking rate limit now := time.Now() - bytesWritten := 0 if now.After(arvlog.logThrottleResetTime) { // It has been more than throttle_period seconds since the last @@ -261,7 +264,7 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { } } - if (int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent || + if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent || (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) || arvlog.closing) && (arvlog.bufToFlush.Len() > 0) { // write to API @@ -271,7 +274,6 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { "properties": map[string]string{"text": arvlog.bufToFlush.String()}}} err2 := arvlog.ArvClient.Create("logs", lr, nil) - bytesWritten = arvlog.bufToFlush.Len() arvlog.bufToFlush = bytes.Buffer{} arvlog.bufFlushedAt = now @@ -280,7 +282,7 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { } } - return bytesWritten, nil + return len(p), nil } // Close the underlying writer @@ -369,7 +371,7 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) } } -// load the rate limit discovery config paramters +// load the rate limit discovery config parameters func loadLogThrottleParams(clnt IArvadosClient) { param, err := clnt.Discovery("crunchLimitLogBytesPerJob") if err == nil {