X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5d00ecb0932f86e4d2aced3d9258b96522ef38bd..4a09e252ea6dd94e14cdf5ddb324ccf574b42423:/services/crunch-run/logging.go diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go index cdf2d6e68b..0083f0999c 100644 --- a/services/crunch-run/logging.go +++ b/services/crunch-run/logging.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -34,9 +38,10 @@ type ThrottledLogger struct { *log.Logger buf *bytes.Buffer sync.Mutex - writer io.WriteCloser - flush chan struct{} - stopped chan struct{} + writer io.WriteCloser + flush chan struct{} + stopped chan struct{} + stopping chan struct{} Timestamper Immediate *log.Logger pendingFlush bool @@ -97,9 +102,10 @@ func (tl *ThrottledLogger) flusher() { defer ticker.Stop() for stopping := false; !stopping; { select { - case _, open := <-tl.flush: - // if !open, will flush tl.buf and exit the loop - stopping = !open + case <-tl.stopping: + // flush tl.buf and exit the loop + stopping = true + case <-tl.flush: case <-ticker.C: } @@ -120,10 +126,10 @@ func (tl *ThrottledLogger) flusher() { // underlying Writer. func (tl *ThrottledLogger) Close() error { select { - case <-tl.flush: + case <-tl.stopping: // already stopped default: - close(tl.flush) + close(tl.stopping) } <-tl.stopped return tl.writer.Close() @@ -175,6 +181,7 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { tl := &ThrottledLogger{} tl.flush = make(chan struct{}, 1) tl.stopped = make(chan struct{}) + tl.stopping = make(chan struct{}) tl.writer = writer tl.Logger = log.New(tl, "", 0) tl.Timestamper = RFC3339Timestamp @@ -211,6 +218,7 @@ type ArvLogWriter struct { logThrottleFirstPartialLine bool bufToFlush bytes.Buffer bufFlushedAt time.Time + closing bool } func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { @@ -257,8 +265,9 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { } } - if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent || - (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) { + if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent || + (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) || + arvlog.closing) && (arvlog.bufToFlush.Len() > 0) { // write to API lr := arvadosclient.Dict{"log": arvadosclient.Dict{ "object_uuid": arvlog.UUID, @@ -280,6 +289,8 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { // Close the underlying writer func (arvlog *ArvLogWriter) Close() (err error) { + arvlog.closing = true + arvlog.Write([]byte{}) if arvlog.writeCloser != nil { err = arvlog.writeCloser.Close() arvlog.writeCloser = nil @@ -362,7 +373,7 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) } } -// load the rate limit discovery config paramters +// load the rate limit discovery config parameters func loadLogThrottleParams(clnt IArvadosClient) { param, err := clnt.Discovery("crunchLimitLogBytesPerJob") if err == nil {