Merge branch '14406-remote-data-in-output'
[arvados.git] / services / crunch-run / logging.go
index 3aedf51779838c9359fb8c0d8071a7600f891237..f8ddd563c6825aa9814f4c8e71673856b6e5f6f3 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
@@ -34,9 +38,10 @@ type ThrottledLogger struct {
        *log.Logger
        buf *bytes.Buffer
        sync.Mutex
-       writer  io.WriteCloser
-       flush   chan struct{}
-       stopped chan struct{}
+       writer   io.WriteCloser
+       flush    chan struct{}
+       stopped  chan struct{}
+       stopping chan struct{}
        Timestamper
        Immediate    *log.Logger
        pendingFlush bool
@@ -97,9 +102,10 @@ func (tl *ThrottledLogger) flusher() {
        defer ticker.Stop()
        for stopping := false; !stopping; {
                select {
-               case _, open := <-tl.flush:
-                       // if !open, will flush tl.buf and exit the loop
-                       stopping = !open
+               case <-tl.stopping:
+                       // flush tl.buf and exit the loop
+                       stopping = true
+               case <-tl.flush:
                case <-ticker.C:
                }
 
@@ -120,10 +126,10 @@ func (tl *ThrottledLogger) flusher() {
 // underlying Writer.
 func (tl *ThrottledLogger) Close() error {
        select {
-       case <-tl.flush:
+       case <-tl.stopping:
                // already stopped
        default:
-               close(tl.flush)
+               close(tl.stopping)
        }
        <-tl.stopped
        return tl.writer.Close()
@@ -175,6 +181,7 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
        tl := &ThrottledLogger{}
        tl.flush = make(chan struct{}, 1)
        tl.stopped = make(chan struct{})
+       tl.stopping = make(chan struct{})
        tl.writer = writer
        tl.Logger = log.New(tl, "", 0)
        tl.Timestamper = RFC3339Timestamp
@@ -189,7 +196,9 @@ var crunchLogThrottlePeriod time.Duration = time.Second * 60
 var crunchLogThrottleLines int64 = 1024
 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
 var crunchLogBytesPerEvent int64 = 4096
-var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
+var crunchLogSecondsBetweenEvents = time.Second
+var crunchLogUpdatePeriod = time.Hour / 2
+var crunchLogUpdateSize = int64(1 << 25)
 
 // ArvLogWriter is an io.WriteCloser that processes each write by
 // writing it through to another io.WriteCloser (typically a
@@ -214,7 +223,7 @@ type ArvLogWriter struct {
        closing                      bool
 }
 
-func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
+func (arvlog *ArvLogWriter) Write(p []byte) (int, error) {
        // Write to the next writer in the chain (a file in Keep)
        var err1 error
        if arvlog.writeCloser != nil {
@@ -223,7 +232,6 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
 
        // write to API after checking rate limit
        now := time.Now()
-       bytesWritten := 0
 
        if now.After(arvlog.logThrottleResetTime) {
                // It has been more than throttle_period seconds since the last
@@ -258,7 +266,7 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
                }
        }
 
-       if (int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
+       if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
                (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
                arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
                // write to API
@@ -268,7 +276,6 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
                        "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
                err2 := arvlog.ArvClient.Create("logs", lr, nil)
 
-               bytesWritten = arvlog.bufToFlush.Len()
                arvlog.bufToFlush = bytes.Buffer{}
                arvlog.bufFlushedAt = now
 
@@ -277,7 +284,7 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
                }
        }
 
-       return bytesWritten, nil
+       return len(p), nil
 }
 
 // Close the underlying writer
@@ -366,40 +373,35 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
        }
 }
 
-// load the rate limit discovery config paramters
+// load the rate limit discovery config parameters
 func loadLogThrottleParams(clnt IArvadosClient) {
-       param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
-       if err == nil {
-               crunchLimitLogBytesPerJob = int64(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogThrottleBytes")
-       if err == nil {
-               crunchLogThrottleBytes = int64(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogThrottlePeriod")
-       if err == nil {
-               crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogThrottleLines")
-       if err == nil {
-               crunchLogThrottleLines = int64(param.(float64))
+       loadDuration := func(dst *time.Duration, key string) {
+               if param, err := clnt.Discovery(key); err != nil {
+                       return
+               } else if d, ok := param.(float64); !ok {
+                       return
+               } else {
+                       *dst = time.Duration(d) * time.Second
+               }
        }
-
-       param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
-       if err == nil {
-               crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
+       loadInt64 := func(dst *int64, key string) {
+               if param, err := clnt.Discovery(key); err != nil {
+                       return
+               } else if val, ok := param.(float64); !ok {
+                       return
+               } else {
+                       *dst = int64(val)
+               }
        }
 
-       param, err = clnt.Discovery("crunchLogBytesPerEvent")
-       if err == nil {
-               crunchLogBytesPerEvent = int64(param.(float64))
-       }
+       loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
+       loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
+       loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
+       loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
+       loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
+       loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
+       loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
+       loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
+       loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
 
-       param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
-       if err == nil {
-               crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
-       }
 }