Merge branch 'master' into origin-8019-crunchrun-log-throttle
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 1 May 2017 17:29:49 +0000 (13:29 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 1 May 2017 17:29:49 +0000 (13:29 -0400)
services/api/app/controllers/arvados/v1/schema_controller.rb
services/api/config/application.default.yml
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/crunch-run/logging.go
services/crunch-run/logging_test.go

index 11269d2556a3b386c5f52edbaabda47629f6b769..e1f4ca5770403e1eb9164ed41d59e807ed1cc551 100644 (file)
@@ -38,6 +38,13 @@ class Arvados::V1::SchemaController < ApplicationController
         blobSignatureTtl: Rails.application.config.blob_signature_ttl,
         maxRequestSize: Rails.application.config.max_request_size,
         dockerImageFormats: Rails.application.config.docker_image_formats,
+        crunchLogBytesPerEvent: Rails.application.config.crunch_log_bytes_per_event,
+        crunchLogSecondsBetweenEvents: Rails.application.config.crunch_log_seconds_between_events,
+        crunchLogThrottlePeriod: Rails.application.config.crunch_log_throttle_period,
+        crunchLogThrottleBytes: Rails.application.config.crunch_log_throttle_bytes,
+        crunchLogThrottleLines: Rails.application.config.crunch_log_throttle_lines,
+        crunchLimitLogBytesPerJob: Rails.application.config.crunch_limit_log_bytes_per_job,
+        crunchLogPartialLineThrottlePeriod: Rails.application.config.crunch_log_partial_line_throttle_period,
         websocketUrl: Rails.application.config.websocket_address,
         parameters: {
           alt: {
index 85955be4e2b81dce6640e8367391500d3a484444..8118914ec91a58865430c812a381ad2973679be9 100644 (file)
@@ -267,6 +267,8 @@ common:
   # silenced by throttling are not counted against this total.
   crunch_limit_log_bytes_per_job: 67108864
 
+  crunch_log_partial_line_throttle_period: 5
+
   # Attributes to suppress in events and audit logs.  Notably,
   # specifying ["manifest_text"] here typically makes the database
   # smaller and faster.
@@ -401,8 +403,6 @@ common:
   # "git log".
   source_version: false
 
-  crunch_log_partial_line_throttle_period: 5
-
   # Enable asynchronous permission graph rebuild.  Must run
   # script/permission-updater.rb as a separate process.  When the permission
   # cache is invalidated, the background process will update the permission
index d1c24a5acbe7e45d727e9eb7323c501b54cbfc69..812525db6904ba1201a54502c5fd781686b0188b 100644 (file)
@@ -633,11 +633,12 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
 // Get and save the raw JSON container record from the API server
 func (runner *ContainerRunner) LogContainerRecord() (err error) {
        w := &ArvLogWriter{
-               runner.ArvClient,
-               runner.Container.UUID,
-               "container",
-               runner.LogCollection.Open("container.json"),
+               ArvClient:     runner.ArvClient,
+               UUID:          runner.Container.UUID,
+               loggingStream: "container",
+               writeCloser:   runner.LogCollection.Open("container.json"),
        }
+
        // Get Container record JSON from the API Server
        reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
        if err != nil {
@@ -1065,8 +1066,8 @@ func (runner *ContainerRunner) CommitLogs() error {
        // point, but re-open crunch log with ArvClient in case there are any
        // other further (such as failing to write the log to Keep!) while
        // shutting down
-       runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
-               "crunch-run", nil})
+       runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
+               UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
 
        if runner.LogsPDH != nil {
                // If we have already assigned something to LogsPDH,
@@ -1153,7 +1154,8 @@ func (runner *ContainerRunner) IsCancelled() bool {
 
 // NewArvLogWriter creates an ArvLogWriter
 func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
-       return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
+       return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name,
+               writeCloser: runner.LogCollection.Open(name + ".txt")}
 }
 
 // Run the full container lifecycle.
@@ -1293,6 +1295,9 @@ func NewContainerRunner(api IArvadosClient,
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+
+       loadLogThrottleParams(api)
+
        return cr
 }
 
index 43c55b67c1c08c07f69fe9913e3681e3835026a6..5bfbf796f045c2d90fe45495dc7c645151c190ca 100644 (file)
@@ -249,7 +249,16 @@ func (client *ArvTestClient) Update(resourceType string, uuid string, parameters
        return nil
 }
 
-var discoveryMap = map[string]interface{}{"defaultTrashLifetime": float64(1209600)}
+var discoveryMap = map[string]interface{}{
+       "defaultTrashLifetime":               float64(1209600),
+       "crunchLimitLogBytesPerJob":          float64(67108864),
+       "crunchLogThrottleBytes":             float64(65536),
+       "crunchLogThrottlePeriod":            float64(60),
+       "crunchLogThrottleLines":             float64(1024),
+       "crunchLogPartialLineThrottlePeriod": float64(5),
+       "crunchLogBytesPerEvent":             float64(4096),
+       "crunchLogSecondsBetweenEvents":      float64(1),
+}
 
 func (client *ArvTestClient) Discovery(key string) (interface{}, error) {
        return discoveryMap[key], nil
index 5254ff671e6d3826356722259069515d450c1066..cdf2d6e68b2cb4ae95cc08bfcd553d5689de47a9 100644 (file)
@@ -6,6 +6,8 @@ import (
        "fmt"
        "io"
        "log"
+       "regexp"
+       "strings"
        "sync"
        "time"
 
@@ -32,11 +34,12 @@ type ThrottledLogger struct {
        *log.Logger
        buf *bytes.Buffer
        sync.Mutex
-       writer   io.WriteCloser
-       stopping chan struct{}
-       stopped  chan struct{}
+       writer  io.WriteCloser
+       flush   chan struct{}
+       stopped chan struct{}
        Timestamper
-       Immediate *log.Logger
+       Immediate    *log.Logger
+       pendingFlush bool
 }
 
 // RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
@@ -73,26 +76,37 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
                        n = len(p)
                }
        }
+
+       if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
+               // Non-blocking send.  Try send a flush if it is ready to
+               // accept it.  Otherwise do nothing because a flush is already
+               // pending.
+               select {
+               case tl.flush <- struct{}{}:
+               default:
+               }
+       }
+
        return
 }
 
 // Periodically check the current buffer; if not empty, send it on the
 // channel to the goWriter goroutine.
 func (tl *ThrottledLogger) flusher() {
-       ticker := time.NewTicker(time.Second)
+       ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
        defer ticker.Stop()
        for stopping := false; !stopping; {
                select {
-               case <-tl.stopping:
-                       // flush tl.buf, then exit the loop
-                       stopping = true
+               case _, open := <-tl.flush:
+                       // if !open, will flush tl.buf and exit the loop
+                       stopping = !open
                case <-ticker.C:
                }
 
                var ready *bytes.Buffer
 
                tl.Mutex.Lock()
-               ready, tl.buf = tl.buf, nil
+               ready, tl.buf = tl.buf, &bytes.Buffer{}
                tl.Mutex.Unlock()
 
                if ready != nil && ready.Len() > 0 {
@@ -106,10 +120,10 @@ func (tl *ThrottledLogger) flusher() {
 // underlying Writer.
 func (tl *ThrottledLogger) Close() error {
        select {
-       case <-tl.stopping:
+       case <-tl.flush:
                // already stopped
        default:
-               close(tl.stopping)
+               close(tl.flush)
        }
        <-tl.stopped
        return tl.writer.Close()
@@ -155,11 +169,11 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
 
 // NewThrottledLogger creates a new thottled logger that
 // (a) prepends timestamps to each line
-// (b) batches log messages and only calls the underlying Writer at most once
-// per second.
+// (b) batches log messages and only calls the underlying Writer
+//  at most once per "crunchLogSecondsBetweenEvents" seconds.
 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
        tl := &ThrottledLogger{}
-       tl.stopping = make(chan struct{})
+       tl.flush = make(chan struct{}, 1)
        tl.stopped = make(chan struct{})
        tl.writer = writer
        tl.Logger = log.New(tl, "", 0)
@@ -168,6 +182,15 @@ func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
        return tl
 }
 
+// Log throttling rate limiting config parameters
+var crunchLimitLogBytesPerJob int64 = 67108864
+var crunchLogThrottleBytes int64 = 65536
+var crunchLogThrottlePeriod time.Duration = time.Second * 60
+var crunchLogThrottleLines int64 = 1024
+var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
+var crunchLogBytesPerEvent int64 = 4096
+var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
+
 // ArvLogWriter is an io.WriteCloser that processes each write by
 // writing it through to another io.WriteCloser (typically a
 // CollectionFileWriter) and creating an Arvados log entry.
@@ -176,6 +199,18 @@ type ArvLogWriter struct {
        UUID          string
        loggingStream string
        writeCloser   io.WriteCloser
+
+       // for rate limiting
+       bytesLogged                  int64
+       logThrottleResetTime         time.Time
+       logThrottleLinesSoFar        int64
+       logThrottleBytesSoFar        int64
+       logThrottleBytesSkipped      int64
+       logThrottleIsOpen            bool
+       logThrottlePartialLineNextAt time.Time
+       logThrottleFirstPartialLine  bool
+       bufToFlush                   bytes.Buffer
+       bufFlushedAt                 time.Time
 }
 
 func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
@@ -185,17 +220,62 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
                _, err1 = arvlog.writeCloser.Write(p)
        }
 
-       // write to API
-       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
-               "object_uuid": arvlog.UUID,
-               "event_type":  arvlog.loggingStream,
-               "properties":  map[string]string{"text": string(p)}}}
-       err2 := arvlog.ArvClient.Create("logs", lr, nil)
+       // write to API after checking rate limit
+       now := time.Now()
+       bytesWritten := 0
+
+       if now.After(arvlog.logThrottleResetTime) {
+               // It has been more than throttle_period seconds since the last
+               // checkpoint; so reset the throttle
+               if arvlog.logThrottleBytesSkipped > 0 {
+                       arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
+               }
+
+               arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
+               arvlog.logThrottleBytesSoFar = 0
+               arvlog.logThrottleLinesSoFar = 0
+               arvlog.logThrottleBytesSkipped = 0
+               arvlog.logThrottleIsOpen = true
+       }
+
+       lines := bytes.Split(p, []byte("\n"))
 
-       if err1 != nil || err2 != nil {
-               return 0, fmt.Errorf("%s ; %s", err1, err2)
+       for _, line := range lines {
+               // Short circuit the counting code if we're just going to throw
+               // away the data anyway.
+               if !arvlog.logThrottleIsOpen {
+                       arvlog.logThrottleBytesSkipped += int64(len(line))
+                       continue
+               } else if len(line) == 0 {
+                       continue
+               }
+
+               // check rateLimit
+               logOpen, msg := arvlog.rateLimit(line, now)
+               if logOpen {
+                       arvlog.bufToFlush.WriteString(string(msg) + "\n")
+               }
        }
-       return len(p), nil
+
+       if int64(arvlog.bufToFlush.Len()) > crunchLogBytesPerEvent ||
+               (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) {
+               // write to API
+               lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+                       "object_uuid": arvlog.UUID,
+                       "event_type":  arvlog.loggingStream,
+                       "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
+               err2 := arvlog.ArvClient.Create("logs", lr, nil)
+
+               bytesWritten = arvlog.bufToFlush.Len()
+               arvlog.bufToFlush = bytes.Buffer{}
+               arvlog.bufFlushedAt = now
+
+               if err1 != nil || err2 != nil {
+                       return 0, fmt.Errorf("%s ; %s", err1, err2)
+               }
+       }
+
+       return bytesWritten, nil
 }
 
 // Close the underlying writer
@@ -206,3 +286,116 @@ func (arvlog *ArvLogWriter) Close() (err error) {
        }
        return err
 }
+
+var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
+
+// Test for hard cap on total output and for log throttling. Returns whether
+// the log line should go to output or not. Returns message if limit exceeded.
+func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
+       message := ""
+       lineSize := int64(len(line))
+
+       if arvlog.logThrottleIsOpen {
+               matches := lineRegexp.FindStringSubmatch(string(line))
+
+               if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
+                       // This is a partial line.
+
+                       if arvlog.logThrottleFirstPartialLine {
+                               // Partial should be suppressed.  First time this is happening for this line so provide a message instead.
+                               arvlog.logThrottleFirstPartialLine = false
+                               arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
+                               arvlog.logThrottleBytesSkipped += lineSize
+                               return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
+                                       RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
+                       } else if now.After(arvlog.logThrottlePartialLineNextAt) {
+                               // The throttle period has passed.  Update timestamp and let it through.
+                               arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
+                       } else {
+                               // Suppress line.
+                               arvlog.logThrottleBytesSkipped += lineSize
+                               return false, line
+                       }
+               } else {
+                       // Not a partial line so reset.
+                       arvlog.logThrottlePartialLineNextAt = time.Time{}
+                       arvlog.logThrottleFirstPartialLine = true
+               }
+
+               arvlog.bytesLogged += lineSize
+               arvlog.logThrottleBytesSoFar += lineSize
+               arvlog.logThrottleLinesSoFar += 1
+
+               if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
+                       message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
+                               RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
+                       arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
+                       arvlog.logThrottleIsOpen = false
+
+               } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
+                       remainingTime := arvlog.logThrottleResetTime.Sub(now)
+                       message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
+                               RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
+                       arvlog.logThrottleIsOpen = false
+
+               } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
+                       remainingTime := arvlog.logThrottleResetTime.Sub(now)
+                       message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
+                               RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
+                       arvlog.logThrottleIsOpen = false
+
+               }
+       }
+
+       if !arvlog.logThrottleIsOpen {
+               // Don't log anything if any limit has been exceeded. Just count lossage.
+               arvlog.logThrottleBytesSkipped += lineSize
+       }
+
+       if message != "" {
+               // Yes, write to logs, but use our "rate exceeded" message
+               // instead of the log message that exceeded the limit.
+               message += " A complete log is still being written to Keep, and will be available when the job finishes."
+               return true, []byte(message)
+       } else {
+               return arvlog.logThrottleIsOpen, line
+       }
+}
+
+// load the rate limit discovery config paramters
+func loadLogThrottleParams(clnt IArvadosClient) {
+       param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
+       if err == nil {
+               crunchLimitLogBytesPerJob = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogThrottleBytes")
+       if err == nil {
+               crunchLogThrottleBytes = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogThrottlePeriod")
+       if err == nil {
+               crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogThrottleLines")
+       if err == nil {
+               crunchLogThrottleLines = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
+       if err == nil {
+               crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogBytesPerEvent")
+       if err == nil {
+               crunchLogBytesPerEvent = int64(param.(float64))
+       }
+
+       param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
+       if err == nil {
+               crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
+       }
+}
index ceb8ca87b00ba25a0a9dc1ac2f2f6ca591cdd0b8..477ed3949bfae0430e65c75843ac5a3e44228ff0 100644 (file)
@@ -4,6 +4,7 @@ import (
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        . "gopkg.in/check.v1"
+       "strings"
        "testing"
        "time"
 )
@@ -109,3 +110,45 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
                ". 408672f5b5325f7d20edfbf899faee42+83 0:83:crunch-run.txt\n"+
                ". c556a293010069fa79a6790a931531d5+80 0:80:stdout.txt\n")
 }
+
+func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytes(c *C) {
+       testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
+}
+
+func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleLines(c *C) {
+       testWriteLogsWithRateLimit(c, "crunchLogThrottleLines", 1, 1024, "Exceeded rate 1 lines per 60 seconds")
+}
+
+func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytesPerEvent(c *C) {
+       testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 50, 67108864, "Exceeded log limit 50 bytes (crunch_limit_log_bytes_per_job)")
+}
+
+func testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, throttleDefault int, expected string) {
+       discoveryMap[throttleParam] = float64(throttleValue)
+       defer func() {
+               discoveryMap[throttleParam] = float64(throttleDefault)
+       }()
+
+       api := &ArvTestClient{}
+       kc := &KeepTestClient{}
+       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+       cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+
+       cr.CrunchLog.Print("Hello world!")
+       cr.CrunchLog.Print("Goodbye")
+       cr.CrunchLog.Close()
+
+       c.Check(api.Calls, Equals, 1)
+
+       mt, err := cr.LogCollection.ManifestText()
+       c.Check(err, IsNil)
+       c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
+
+       logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
+               "2015-12-29T15:51:45.000000002Z Goodbye\n"
+
+       c.Check(api.Content[0]["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
+       stderrLog := api.Content[0]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]
+       c.Check(true, Equals, strings.Contains(stderrLog, expected))
+       c.Check(string(kc.Content), Equals, logtext)
+}