8019: load log throttling config params during NewContainerRunner
[arvados.git] / services / crunch-run / logging.go
index 6e32d723e87d1e031dcaf94241358fc9c6e3a8e4..383d8ad025d71b37135214b3b66e8f0cdd917ae1 100644 (file)
@@ -34,9 +34,9 @@ type ThrottledLogger struct {
        *log.Logger
        buf *bytes.Buffer
        sync.Mutex
-       writer   io.WriteCloser
-       stopping chan struct{}
-       stopped  chan struct{}
+       writer  io.WriteCloser
+       flush   chan struct{}
+       stopped chan struct{}
        Timestamper
        Immediate *log.Logger
 }
@@ -81,13 +81,13 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
 // Periodically check the current buffer; if not empty, send it on the
 // channel to the goWriter goroutine.
 func (tl *ThrottledLogger) flusher() {
-       ticker := time.NewTicker(time.Second)
+       ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
        defer ticker.Stop()
        for stopping := false; !stopping; {
                select {
-               case <-tl.stopping:
-                       // flush tl.buf, then exit the loop
-                       stopping = true
+               case _, open := <-tl.flush:
+                       // if !open, flush tl.buf and exit the loop
+                       stopping = !open
                case <-ticker.C:
                }
 
@@ -108,10 +108,10 @@ func (tl *ThrottledLogger) flusher() {
 // underlying Writer.
 func (tl *ThrottledLogger) Close() error {
        select {
-       case <-tl.stopping:
+       case <-tl.flush:
                // already stopped
        default:
-               close(tl.stopping)
+               close(tl.flush)
        }
        <-tl.stopped
        return tl.writer.Close()
@@ -161,7 +161,7 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
 // per second.
 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
        tl := &ThrottledLogger{}
-       tl.stopping = make(chan struct{})
+       tl.flush = make(chan struct{}, 1)
        tl.stopped = make(chan struct{})
        tl.writer = writer
        tl.Logger = log.New(tl, "", 0)
@@ -170,6 +170,15 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
        return tl
 }
 
+// Log throttling rate limiting config parameters
+var crunchLimitLogBytesPerJob int64
+var crunchLogThrottleBytes int64
+var crunchLogThrottlePeriod int
+var crunchLogThrottleLines int64
+var crunchLogPartialLineThrottlePeriod int
+var crunchLogBytesPerEvent int64
+var crunchLogSecondsBetweenEvents int
+
 // ArvLogWriter is an io.WriteCloser that processes each write by
 // writing it through to another io.WriteCloser (typically a
 // CollectionFileWriter) and creating an Arvados log entry.
@@ -190,72 +199,6 @@ type ArvLogWriter struct {
        logThrottleFirstPartialLine  bool
        bufToFlush                   bytes.Buffer
        bufFlushedAt                 time.Time
-
-       // rate limiting config parameters
-       crunchLimitLogBytesPerJob          int64
-       crunchLogThrottleBytes             int64
-       crunchLogThrottlePeriod            int
-       crunchLogThrottleLines             int64
-       crunchLogPartialLineThrottlePeriod int
-       crunchLogBytesPerEvent             int64
-       crunchLogSecondsBetweenEvents      int
-}
-
-// NewArvLogWriter creates new ArvLogWriter and loads the rate limiting config params
-func NewArvLogWriter(clnt IArvadosClient, uuid string, ls string, wc io.WriteCloser) *ArvLogWriter {
-       w := &ArvLogWriter{ArvClient: clnt, UUID: uuid, loggingStream: ls, writeCloser: wc}
-
-       // load the rate limit discovery config paramters
-       param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
-       if err != nil {
-               w.crunchLimitLogBytesPerJob = 67108864
-       } else {
-               w.crunchLimitLogBytesPerJob = int64(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogThrottleBytes")
-       if err != nil {
-               w.crunchLogThrottleBytes = 65536
-       } else {
-               w.crunchLogThrottleBytes = int64(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogThrottlePeriod")
-       if err != nil {
-               w.crunchLogThrottlePeriod = 60
-       } else {
-               w.crunchLogThrottlePeriod = int(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogThrottleLines")
-       if err != nil {
-               w.crunchLogThrottleLines = 1024
-       } else {
-               w.crunchLogThrottleLines = int64(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
-       if err != nil {
-               w.crunchLogPartialLineThrottlePeriod = 5
-       } else {
-               w.crunchLogPartialLineThrottlePeriod = int(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogBytesPerEvent")
-       if err != nil {
-               w.crunchLogBytesPerEvent = 4096
-       } else {
-               w.crunchLogBytesPerEvent = int64(param.(float64))
-       }
-
-       param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
-       if err != nil {
-               w.crunchLogSecondsBetweenEvents = 1
-       } else {
-               w.crunchLogSecondsBetweenEvents = int(param.(float64))
-       }
-
-       return w
 }
 
 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
@@ -276,7 +219,7 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
                        arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
                }
 
-               arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod))
+               arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(crunchLogThrottlePeriod))
                arvlog.logThrottleBytesSoFar = 0
                arvlog.logThrottleLinesSoFar = 0
                arvlog.logThrottleBytesSkipped = 0
@@ -303,8 +246,8 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
                arvlog.logThrottleIsOpen = logOpen
        }
 
-       if int64(arvlog.bufToFlush.Len()) > arvlog.crunchLogBytesPerEvent ||
-               (now.Sub(arvlog.bufFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) {
+       if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
+               (now.Sub(arvlog.bufFlushedAt) >= time.Duration(crunchLogSecondsBetweenEvents)) {
                // write to API
                lr := arvadosclient.Dict{"log": arvadosclient.Dict{
                        "object_uuid": arvlog.UUID,
@@ -349,7 +292,7 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
                if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
                        partialLine = true
 
-                       if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) {
+                       if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) {
                                arvlog.logThrottlePartialLineLastAt = now
                        } else {
                                skipCounts = true
@@ -362,24 +305,28 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
                        arvlog.bytesLogged += lineSize
                }
 
-               if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob {
-                       message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(now.UTC()), arvlog.crunchLimitLogBytesPerJob)
+               if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
+                       message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
+                               RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
                        arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
                        arvlog.logThrottleIsOpen = false
 
-               } else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes {
+               } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
                        remainingTime := arvlog.logThrottleResetTime.Sub(now)
-                       message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+                       message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
+                               RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod, remainingTime/time.Second)
                        arvlog.logThrottleIsOpen = false
 
-               } else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines {
+               } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
                        remainingTime := arvlog.logThrottleResetTime.Sub(now)
-                       message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second)
+                       message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
+                               RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod, remainingTime/time.Second)
                        arvlog.logThrottleIsOpen = false
 
                } else if partialLine && arvlog.logThrottleFirstPartialLine {
                        arvlog.logThrottleFirstPartialLine = false
-                       message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogPartialLineThrottlePeriod)
+                       message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
+                               RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod)
 
                }
        }
@@ -400,3 +347,55 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
                return arvlog.logThrottleIsOpen, line
        }
 }
+
+// load the rate limit discovery config paramters
+func loadLogThrottleParams(clnt IArvadosClient) {
+       param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
+       if err != nil {
+               crunchLimitLogBytesPerJob = 67108864
+       } else {
+               crunchLimitLogBytesPerJob = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogThrottleBytes")
+       if err != nil {
+               crunchLogThrottleBytes = 65536
+       } else {
+               crunchLogThrottleBytes = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogThrottlePeriod")
+       if err != nil {
+               crunchLogThrottlePeriod = 60
+       } else {
+               crunchLogThrottlePeriod = int(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogThrottleLines")
+       if err != nil {
+               crunchLogThrottleLines = 1024
+       } else {
+               crunchLogThrottleLines = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
+       if err != nil {
+               crunchLogPartialLineThrottlePeriod = 5
+       } else {
+               crunchLogPartialLineThrottlePeriod = int(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogBytesPerEvent")
+       if err != nil {
+               crunchLogBytesPerEvent = 4096
+       } else {
+               crunchLogBytesPerEvent = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
+       if err != nil {
+               crunchLogSecondsBetweenEvents = 1
+       } else {
+               crunchLogSecondsBetweenEvents = int(param.(float64))
+       }
+}