From c5c09df38966595b4f27c402d1e9ae5500d6d201 Mon Sep 17 00:00:00 2001 From: radhika Date: Tue, 25 Apr 2017 16:43:47 -0400 Subject: [PATCH] 8019: load log throttling config params during NewContainerRunner --- services/crunch-run/crunchrun.go | 18 +++- services/crunch-run/logging.go | 173 +++++++++++++++---------------- 2 files changed, 100 insertions(+), 91 deletions(-) diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 272ad0b18e..c9c52ee02f 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -630,8 +630,13 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) { // Get and save the raw JSON container record from the API server func (runner *ContainerRunner) LogContainerRecord() (err error) { - w := NewArvLogWriter(runner.ArvClient, runner.Container.UUID, "container", - runner.LogCollection.Open("container.json")) + w := &ArvLogWriter{ + ArvClient: runner.ArvClient, + UUID: runner.Container.UUID, + loggingStream: "container", + writeCloser: runner.LogCollection.Open("container.json"), + } + // Get Container record JSON from the API Server reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil) if err != nil { @@ -1057,7 +1062,8 @@ func (runner *ContainerRunner) CommitLogs() error { // point, but re-open crunch log with ArvClient in case there are any // other further (such as failing to write the log to Keep!) while // shutting down - runner.CrunchLog = NewThrottledLogger(NewArvLogWriter(runner.ArvClient, runner.Container.UUID, "crunch-run", nil)) + runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient, + UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil}) if runner.LogsPDH != nil { // If we have already assigned something to LogsPDH, @@ -1144,7 +1150,8 @@ func (runner *ContainerRunner) IsCancelled() bool { // NewArvLogWriter creates an ArvLogWriter func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser { - return NewArvLogWriter(runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name+".txt")) + return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name, + writeCloser: runner.LogCollection.Open(name + ".txt")} } // Run the full container lifecycle. @@ -1284,6 +1291,9 @@ func NewContainerRunner(api IArvadosClient, cr.Container.UUID = containerUUID cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run")) cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) + + loadLogThrottleParams(api) + return cr } diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go index 6e32d723e8..383d8ad025 100644 --- a/services/crunch-run/logging.go +++ b/services/crunch-run/logging.go @@ -34,9 +34,9 @@ type ThrottledLogger struct { *log.Logger buf *bytes.Buffer sync.Mutex - writer io.WriteCloser - stopping chan struct{} - stopped chan struct{} + writer io.WriteCloser + flush chan struct{} + stopped chan struct{} Timestamper Immediate *log.Logger } @@ -81,13 +81,13 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) { // Periodically check the current buffer; if not empty, send it on the // channel to the goWriter goroutine. func (tl *ThrottledLogger) flusher() { - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents)) defer ticker.Stop() for stopping := false; !stopping; { select { - case <-tl.stopping: - // flush tl.buf, then exit the loop - stopping = true + case _, open := <-tl.flush: + // if !open, flush tl.buf and exit the loop + stopping = !open case <-ticker.C: } @@ -108,10 +108,10 @@ func (tl *ThrottledLogger) flusher() { // underlying Writer. func (tl *ThrottledLogger) Close() error { select { - case <-tl.stopping: + case <-tl.flush: // already stopped default: - close(tl.stopping) + close(tl.flush) } <-tl.stopped return tl.writer.Close() @@ -161,7 +161,7 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) { // per second. func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { tl := &ThrottledLogger{} - tl.stopping = make(chan struct{}) + tl.flush = make(chan struct{}, 1) tl.stopped = make(chan struct{}) tl.writer = writer tl.Logger = log.New(tl, "", 0) @@ -170,6 +170,15 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { return tl } +// Log throttling rate limiting config parameters +var crunchLimitLogBytesPerJob int64 +var crunchLogThrottleBytes int64 +var crunchLogThrottlePeriod int +var crunchLogThrottleLines int64 +var crunchLogPartialLineThrottlePeriod int +var crunchLogBytesPerEvent int64 +var crunchLogSecondsBetweenEvents int + // ArvLogWriter is an io.WriteCloser that processes each write by // writing it through to another io.WriteCloser (typically a // CollectionFileWriter) and creating an Arvados log entry. @@ -190,72 +199,6 @@ type ArvLogWriter struct { logThrottleFirstPartialLine bool bufToFlush bytes.Buffer bufFlushedAt time.Time - - // rate limiting config parameters - crunchLimitLogBytesPerJob int64 - crunchLogThrottleBytes int64 - crunchLogThrottlePeriod int - crunchLogThrottleLines int64 - crunchLogPartialLineThrottlePeriod int - crunchLogBytesPerEvent int64 - crunchLogSecondsBetweenEvents int -} - -// NewArvLogWriter creates new ArvLogWriter and loads the rate limiting config params -func NewArvLogWriter(clnt IArvadosClient, uuid string, ls string, wc io.WriteCloser) *ArvLogWriter { - w := &ArvLogWriter{ArvClient: clnt, UUID: uuid, loggingStream: ls, writeCloser: wc} - - // load the rate limit discovery config paramters - param, err := clnt.Discovery("crunchLimitLogBytesPerJob") - if err != nil { - w.crunchLimitLogBytesPerJob = 67108864 - } else { - w.crunchLimitLogBytesPerJob = int64(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogThrottleBytes") - if err != nil { - w.crunchLogThrottleBytes = 65536 - } else { - w.crunchLogThrottleBytes = int64(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogThrottlePeriod") - if err != nil { - w.crunchLogThrottlePeriod = 60 - } else { - w.crunchLogThrottlePeriod = int(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogThrottleLines") - if err != nil { - w.crunchLogThrottleLines = 1024 - } else { - w.crunchLogThrottleLines = int64(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod") - if err != nil { - w.crunchLogPartialLineThrottlePeriod = 5 - } else { - w.crunchLogPartialLineThrottlePeriod = int(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogBytesPerEvent") - if err != nil { - w.crunchLogBytesPerEvent = 4096 - } else { - w.crunchLogBytesPerEvent = int64(param.(float64)) - } - - param, err = clnt.Discovery("crunchLogSecondsBetweenEvents") - if err != nil { - w.crunchLogSecondsBetweenEvents = 1 - } else { - w.crunchLogSecondsBetweenEvents = int(param.(float64)) - } - - return w } func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { @@ -276,7 +219,7 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped)) } - arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(arvlog.crunchLogThrottlePeriod)) + arvlog.logThrottleResetTime = now.Add(time.Second * time.Duration(crunchLogThrottlePeriod)) arvlog.logThrottleBytesSoFar = 0 arvlog.logThrottleLinesSoFar = 0 arvlog.logThrottleBytesSkipped = 0 @@ -303,8 +246,8 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) { arvlog.logThrottleIsOpen = logOpen } - if int64(arvlog.bufToFlush.Len()) > arvlog.crunchLogBytesPerEvent || - (now.Sub(arvlog.bufFlushedAt) >= time.Duration(arvlog.crunchLogSecondsBetweenEvents)) { + if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent || + (now.Sub(arvlog.bufFlushedAt) >= time.Duration(crunchLogSecondsBetweenEvents)) { // write to API lr := arvadosclient.Dict{"log": arvadosclient.Dict{ "object_uuid": arvlog.UUID, @@ -349,7 +292,7 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") { partialLine = true - if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(arvlog.crunchLogPartialLineThrottlePeriod))) { + if now.After(arvlog.logThrottlePartialLineLastAt.Add(time.Second * time.Duration(crunchLogPartialLineThrottlePeriod))) { arvlog.logThrottlePartialLineLastAt = now } else { skipCounts = true @@ -362,24 +305,28 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) arvlog.bytesLogged += lineSize } - if arvlog.bytesLogged > arvlog.crunchLimitLogBytesPerJob { - message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", RFC3339Timestamp(now.UTC()), arvlog.crunchLimitLogBytesPerJob) + if arvlog.bytesLogged > crunchLimitLogBytesPerJob { + message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", + RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob) arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour)) arvlog.logThrottleIsOpen = false - } else if arvlog.logThrottleBytesSoFar > arvlog.crunchLogThrottleBytes { + } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes { remainingTime := arvlog.logThrottleResetTime.Sub(now) - message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleBytes, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second) + message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", + RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod, remainingTime/time.Second) arvlog.logThrottleIsOpen = false - } else if arvlog.logThrottleLinesSoFar > arvlog.crunchLogThrottleLines { + } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines { remainingTime := arvlog.logThrottleResetTime.Sub(now) - message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogThrottleLines, arvlog.crunchLogThrottlePeriod, remainingTime/time.Second) + message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", + RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod, remainingTime/time.Second) arvlog.logThrottleIsOpen = false } else if partialLine && arvlog.logThrottleFirstPartialLine { arvlog.logThrottleFirstPartialLine = false - message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", RFC3339Timestamp(now.UTC()), arvlog.crunchLogPartialLineThrottlePeriod) + message = fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", + RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod) } } @@ -400,3 +347,55 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) return arvlog.logThrottleIsOpen, line } } + +// load the rate limit discovery config paramters +func loadLogThrottleParams(clnt IArvadosClient) { + param, err := clnt.Discovery("crunchLimitLogBytesPerJob") + if err != nil { + crunchLimitLogBytesPerJob = 67108864 + } else { + crunchLimitLogBytesPerJob = int64(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogThrottleBytes") + if err != nil { + crunchLogThrottleBytes = 65536 + } else { + crunchLogThrottleBytes = int64(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogThrottlePeriod") + if err != nil { + crunchLogThrottlePeriod = 60 + } else { + crunchLogThrottlePeriod = int(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogThrottleLines") + if err != nil { + crunchLogThrottleLines = 1024 + } else { + crunchLogThrottleLines = int64(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod") + if err != nil { + crunchLogPartialLineThrottlePeriod = 5 + } else { + crunchLogPartialLineThrottlePeriod = int(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogBytesPerEvent") + if err != nil { + crunchLogBytesPerEvent = 4096 + } else { + crunchLogBytesPerEvent = int64(param.(float64)) + } + + param, err = clnt.Discovery("crunchLogSecondsBetweenEvents") + if err != nil { + crunchLogSecondsBetweenEvents = 1 + } else { + crunchLogSecondsBetweenEvents = int(param.(float64)) + } +} -- 2.30.2