Fix typo.
[arvados.git] / services / crunch-run / logging.go
index cdf2d6e68b2cb4ae95cc08bfcd553d5689de47a9..0083f0999ce7f27a4c50e94729e3dbd344f89a4e 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
@@ -34,9 +38,10 @@ type ThrottledLogger struct {
        *log.Logger
        buf *bytes.Buffer
        sync.Mutex
-       writer  io.WriteCloser
-       flush   chan struct{}
-       stopped chan struct{}
+       writer   io.WriteCloser
+       flush    chan struct{}
+       stopped  chan struct{}
+       stopping chan struct{}
        Timestamper
        Immediate    *log.Logger
        pendingFlush bool
@@ -97,9 +102,10 @@ func (tl *ThrottledLogger) flusher() {
        defer ticker.Stop()
        for stopping := false; !stopping; {
                select {
-               case _, open := <-tl.flush:
-                       // if !open, will flush tl.buf and exit the loop
-                       stopping = !open
+               case <-tl.stopping:
+                       // flush tl.buf and exit the loop
+                       stopping = true
+               case <-tl.flush:
                case <-ticker.C:
                }
 
@@ -120,10 +126,10 @@ func (tl *ThrottledLogger) flusher() {
 // underlying Writer.
 func (tl *ThrottledLogger) Close() error {
        select {
-       case <-tl.flush:
+       case <-tl.stopping:
                // already stopped
        default:
-               close(tl.flush)
+               close(tl.stopping)
        }
        <-tl.stopped
        return tl.writer.Close()
@@ -175,6 +181,7 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
        tl := &ThrottledLogger{}
        tl.flush = make(chan struct{}, 1)
        tl.stopped = make(chan struct{})
+       tl.stopping = make(chan struct{})
        tl.writer = writer
        tl.Logger = log.New(tl, "", 0)
        tl.Timestamper = RFC3339Timestamp
@@ -211,6 +218,7 @@ type ArvLogWriter struct {
        logThrottleFirstPartialLine  bool
        bufToFlush                   bytes.Buffer
        bufFlushedAt                 time.Time
+       closing                      bool
 }
 
 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
@@ -257,8 +265,9 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
                }
        }
 
-       if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
-               (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) {
+       if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
+               (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
+               arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
                // write to API
                lr := arvadosclient.Dict{"log": arvadosclient.Dict{
                        "object_uuid": arvlog.UUID,
@@ -280,6 +289,8 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 
 // Close the underlying writer
 func (arvlog *ArvLogWriter) Close() (err error) {
+       arvlog.closing = true
+       arvlog.Write([]byte{})
        if arvlog.writeCloser != nil {
                err = arvlog.writeCloser.Close()
                arvlog.writeCloser = nil
@@ -362,7 +373,7 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
        }
 }
 
-// load the rate limit discovery config paramters
+// load the rate limit discovery config parameters
 func loadLogThrottleParams(clnt IArvadosClient) {
        param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
        if err == nil {