21611: Remove log-to-logs-table code.
authorTom Clegg <tom@curii.com>
Wed, 1 May 2024 13:39:29 +0000 (09:39 -0400)
committerTom Clegg <tom@curii.com>
Fri, 3 May 2024 18:04:34 +0000 (14:04 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go
lib/crunchrun/cuda.go
lib/crunchrun/logging.go
lib/crunchrun/logging_test.go

index f8941162ec089515fd8ad2708f32e72162bc771e..8627045411a56dba53a61ba01c3b9e22d721812c 100644 (file)
@@ -81,9 +81,6 @@ type IKeepClient interface {
        SetStorageClasses(sc []string)
 }
 
-// NewLogWriter is a factory function to create a new log writer.
-type NewLogWriter func(name string) (io.WriteCloser, error)
-
 type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
 
 type MkTempDir func(string, string) (string, error)
@@ -125,8 +122,7 @@ type ContainerRunner struct {
        Container     arvados.Container
        token         string
        ExitCode      *int
-       NewLogWriter  NewLogWriter
-       CrunchLog     *ThrottledLogger
+       CrunchLog     *logWriter
        logUUID       string
        logMtx        sync.Mutex
        LogCollection arvados.CollectionFileSystem
@@ -168,7 +164,7 @@ type ContainerRunner struct {
        enableNetwork     string // one of "default" or "always"
        networkMode       string // "none", "host", or "" -- passed through to executor
        brokenNodeHook    string // script to run if node appears to be broken
-       arvMountLog       *ThrottledLogger
+       arvMountLog       io.WriteCloser
 
        containerWatchdogInterval time.Duration
 
@@ -303,11 +299,10 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e
        }
        c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
 
-       w, err := runner.NewLogWriter("arv-mount")
+       runner.arvMountLog, err = runner.openLogFile("arv-mount")
        if err != nil {
                return nil, err
        }
-       runner.arvMountLog = NewThrottledLogger(w)
        scanner := logScanner{
                Patterns: []string{
                        "Keep write error",
@@ -735,13 +730,13 @@ func (runner *ContainerRunner) stopHoststat() error {
 }
 
 func (runner *ContainerRunner) startHoststat() error {
-       w, err := runner.NewLogWriter("hoststat")
+       var err error
+       runner.hoststatLogger, err = runner.openLogFile("hoststat")
        if err != nil {
                return err
        }
-       runner.hoststatLogger = NewThrottledLogger(w)
        runner.hoststatReporter = &crunchstat.Reporter{
-               Logger: log.New(runner.hoststatLogger, "", 0),
+               Logger: newLogWriter(runner.hoststatLogger),
                // Our own cgroup is the "host" cgroup, in the sense
                // that it accounts for resource usage outside the
                // container. It doesn't count _all_ resource usage on
@@ -759,15 +754,15 @@ func (runner *ContainerRunner) startHoststat() error {
 }
 
 func (runner *ContainerRunner) startCrunchstat() error {
-       w, err := runner.NewLogWriter("crunchstat")
+       var err error
+       runner.statLogger, err = runner.openLogFile("crunchstat")
        if err != nil {
                return err
        }
-       runner.statLogger = NewThrottledLogger(w)
        runner.statReporter = &crunchstat.Reporter{
                Pid:    runner.executor.Pid,
                FS:     runner.crunchstatFakeFS,
-               Logger: log.New(runner.statLogger, "", 0),
+               Logger: newLogWriter(runner.statLogger),
                MemThresholds: map[string][]crunchstat.Threshold{
                        "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
                },
@@ -790,7 +785,7 @@ type infoCommand struct {
 // might differ from what's described in the node record (see
 // LogNodeRecord).
 func (runner *ContainerRunner) LogHostInfo() (err error) {
-       w, err := runner.NewLogWriter("node-info")
+       w, err := runner.openLogFile("node-info")
        if err != nil {
                return
        }
@@ -875,13 +870,6 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str
        if err != nil {
                return false, err
        }
-       w := &ArvLogWriter{
-               ArvClient:     runner.DispatcherArvClient,
-               UUID:          runner.Container.UUID,
-               loggingStream: label,
-               writeCloser:   writer,
-       }
-
        reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
        if err != nil {
                return false, fmt.Errorf("error getting %s record: %v", label, err)
@@ -901,12 +889,12 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str
                return false, nil
        }
        // Re-encode it using indentation to improve readability
-       enc := json.NewEncoder(w)
+       enc := json.NewEncoder(writer)
        enc.SetIndent("", "    ")
        if err = enc.Encode(items[0]); err != nil {
                return false, fmt.Errorf("error logging %s record: %v", label, err)
        }
-       err = w.Close()
+       err = writer.Close()
        if err != nil {
                return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err)
        }
@@ -974,10 +962,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
                        return err
                }
                stdout = f
-       } else if w, err := runner.NewLogWriter("stdout"); err != nil {
+       } else if w, err := runner.openLogFile("stdout"); err != nil {
                return err
        } else {
-               stdout = NewThrottledLogger(w)
+               stdout = w
        }
 
        if mnt, ok := runner.Container.Mounts["stderr"]; ok {
@@ -986,10 +974,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
                        return err
                }
                stderr = f
-       } else if w, err := runner.NewLogWriter("stderr"); err != nil {
+       } else if w, err := runner.openLogFile("stderr"); err != nil {
                return err
        } else {
-               stderr = NewThrottledLogger(w)
+               stderr = w
        }
 
        env := runner.Container.Environment
@@ -1442,19 +1430,11 @@ func (runner *ContainerRunner) CommitLogs() error {
                if runner.arvMountLog != nil {
                        runner.arvMountLog.Close()
                }
-               runner.CrunchLog.Close()
-
-               // Closing CrunchLog above allows them to be committed to Keep at this
-               // point, but re-open crunch log with ArvClient in case there are any
-               // other further errors (such as failing to write the log to Keep!)
-               // while shutting down
-               runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
-                       ArvClient:     runner.DispatcherArvClient,
-                       UUID:          runner.Container.UUID,
-                       loggingStream: "crunch-run",
-                       writeCloser:   nil,
-               })
-               runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
+
+               // From now on just log to stderr, in case there are
+               // any other further errors (such as failing to write
+               // the log to Keep!)  while shutting down
+               runner.CrunchLog = newLogWriter(newTimestamper(newStringPrefixer(os.Stderr, runner.Container.UUID+" ")))
        }()
 
        if runner.keepstoreLogger != nil {
@@ -1614,18 +1594,8 @@ func (runner *ContainerRunner) IsCancelled() bool {
        return runner.cCancelled
 }
 
-// NewArvLogWriter creates an ArvLogWriter
-func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) {
-       writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
-       if err != nil {
-               return nil, err
-       }
-       return &ArvLogWriter{
-               ArvClient:     runner.DispatcherArvClient,
-               UUID:          runner.Container.UUID,
-               loggingStream: name,
-               writeCloser:   writer,
-       }, nil
+func (runner *ContainerRunner) openLogFile(name string) (io.WriteCloser, error) {
+       return runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
 }
 
 // Run the full container lifecycle.
@@ -1655,9 +1625,7 @@ func (runner *ContainerRunner) Run() (err error) {
 
        defer func() {
                runner.CleanupDirs()
-
                runner.CrunchLog.Printf("crunch-run finished")
-               runner.CrunchLog.Close()
        }()
 
        err = runner.fetchContainerRecord()
@@ -1851,7 +1819,6 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
                DispatcherArvClient:  dispatcherArvClient,
                DispatcherKeepClient: dispatcherKeepClient,
        }
-       cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
        cr.MkTempDir = ioutil.TempDir
        cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
@@ -1874,14 +1841,12 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
                return nil, err
        }
        cr.Container.UUID = containerUUID
-       w, err := cr.NewLogWriter("crunch-run")
+       f, err := cr.openLogFile("crunch-run")
        if err != nil {
                return nil, err
        }
-       cr.CrunchLog = NewThrottledLogger(w)
-       cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+       cr.CrunchLog = newLogWriter(newTimestamper(io.MultiWriter(f, newStringPrefixer(os.Stderr, cr.Container.UUID+" "))))
 
-       loadLogThrottleParams(dispatcherArvClient)
        go cr.updateLogs()
 
        return cr, nil
@@ -2026,12 +1991,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                keepstoreLogbuf.SetWriter(io.Discard)
        } else {
                cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
-               logwriter, err := cr.NewLogWriter("keepstore")
+               cr.keepstoreLogger, err = cr.openLogFile("keepstore")
                if err != nil {
                        log.Print(err)
                        return 1
                }
-               cr.keepstoreLogger = NewThrottledLogger(logwriter)
 
                var writer io.WriteCloser = cr.keepstoreLogger
                if logWhat == "errors" {
@@ -2057,13 +2021,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf)
        default:
                cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine)
-               cr.CrunchLog.Close()
                return 1
        }
        if err != nil {
                cr.CrunchLog.Printf("%s: %v", containerUUID, err)
                cr.checkBrokenNode(err)
-               cr.CrunchLog.Close()
                return 1
        }
        defer cr.executor.Close()
index 5cb982e1bbf9e5de77df1903f278878212a4d443..58ae1c190c75d5455f36924963502780427dc169 100644 (file)
@@ -14,7 +14,6 @@ import (
        "io"
        "io/fs"
        "io/ioutil"
-       "log"
        "math/rand"
        "net/http"
        "net/http/httptest"
@@ -121,7 +120,6 @@ type ArvTestClient struct {
        Content []arvadosclient.Dict
        arvados.Container
        secretMounts []byte
-       Logs         map[string]*bytes.Buffer
        sync.Mutex
        WasSetRunning bool
        callraw       bool
@@ -207,14 +205,7 @@ func (client *ArvTestClient) Create(resourceType string,
        client.Content = append(client.Content, parameters)
 
        if resourceType == "logs" {
-               et := parameters["log"].(arvadosclient.Dict)["event_type"].(string)
-               if client.Logs == nil {
-                       client.Logs = make(map[string]*bytes.Buffer)
-               }
-               if client.Logs[et] == nil {
-                       client.Logs[et] = &bytes.Buffer{}
-               }
-               client.Logs[et].Write([]byte(parameters["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]))
+               panic("logs.create called")
        }
 
        if resourceType == "collections" && output != nil {
@@ -607,29 +598,6 @@ func (*KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename
        return ErrorReader{}, nil
 }
 
-type ClosableBuffer struct {
-       bytes.Buffer
-}
-
-func (*ClosableBuffer) Close() error {
-       return nil
-}
-
-type TestLogs struct {
-       Stdout ClosableBuffer
-       Stderr ClosableBuffer
-}
-
-func (tl *TestLogs) NewTestLoggingWriter(logstr string) (io.WriteCloser, error) {
-       if logstr == "stdout" {
-               return &tl.Stdout, nil
-       }
-       if logstr == "stderr" {
-               return &tl.Stderr, nil
-       }
-       return nil, errors.New("???")
-}
-
 func dockerLog(fd byte, msg string) []byte {
        by := []byte(msg)
        header := make([]byte, 8+len(by))
@@ -645,8 +613,6 @@ func (s *TestSuite) TestRunContainer(c *C) {
                return 0
        }
 
-       var logs TestLogs
-       s.runner.NewLogWriter = logs.NewTestLoggingWriter
        s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
        s.runner.Container.Command = []string{"./hw"}
        s.runner.Container.OutputStorageClasses = []string{"default"}
@@ -663,8 +629,8 @@ func (s *TestSuite) TestRunContainer(c *C) {
        err = s.runner.WaitFinish()
        c.Assert(err, IsNil)
 
-       c.Check(logs.Stdout.String(), Matches, ".*Hello world\n")
-       c.Check(logs.Stderr.String(), Equals, "")
+       c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, `Hello world\n`)
+       c.Check(logFileContent(c, s.runner, "stderr.txt"), Matches, ``)
 }
 
 func (s *TestSuite) TestCommitLogs(c *C) {
@@ -673,7 +639,9 @@ func (s *TestSuite) TestCommitLogs(c *C) {
        defer kc.Close()
        cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
-       cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+       f, err := cr.openLogFile("crunch-run")
+       c.Assert(err, IsNil)
+       cr.CrunchLog = newLogWriter(newTestTimestamper(f))
 
        cr.CrunchLog.Print("Hello world!")
        cr.CrunchLog.Print("Goodbye")
@@ -682,10 +650,10 @@ func (s *TestSuite) TestCommitLogs(c *C) {
        err = cr.CommitLogs()
        c.Check(err, IsNil)
 
-       c.Check(api.Calls, Equals, 2)
-       c.Check(api.Content[1]["ensure_unique_name"], Equals, true)
-       c.Check(api.Content[1]["collection"].(arvadosclient.Dict)["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-       c.Check(api.Content[1]["collection"].(arvadosclient.Dict)["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n")
+       c.Check(api.Calls, Equals, 1)
+       c.Check(api.Content[0]["ensure_unique_name"], Equals, true)
+       c.Check(api.Content[0]["collection"].(arvadosclient.Dict)["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Check(api.Content[0]["collection"].(arvadosclient.Dict)["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n")
        c.Check(*cr.LogsPDH, Equals, "63da7bdacf08c40f604daad80c261e9a+60")
 }
 
@@ -806,10 +774,7 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, fn
        }
 
        if err != nil {
-               for k, v := range s.api.Logs {
-                       c.Log(k)
-                       c.Log(v.String())
-               }
+               dumpAllLogFiles(c, s.runner)
        }
 
        return s.api, s.runner, realTemp
@@ -844,7 +809,7 @@ func (s *TestSuite) TestFullRunHello(c *C) {
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello world\n")
+       c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, "hello world\n")
        c.Check(s.testDispatcherKeepClient.StorageClasses, DeepEquals, []string{"default"})
        c.Check(s.testContainerKeepClient.StorageClasses, DeepEquals, []string{"default"})
 }
@@ -935,13 +900,13 @@ func (s *TestSuite) testSpotInterruptionNotice(c *C, failureRate float64) {
                time.Sleep(time.Second)
                return 0
        })
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`)
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
+       c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`)
+       c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
        if failureRate == 1 {
-               c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`)
+               c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`)
        } else {
                text := `Cloud provider scheduled instance stop at ` + stoptime.Load().(time.Time).Format(time.RFC3339)
-               c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*`+text+`.*`)
+               c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*`+text+`.*`)
                c.Check(s.api.CalledWith("container.runtime_status.warning", "preemption notice"), NotNil)
                c.Check(s.api.CalledWith("container.runtime_status.warningDetail", text), NotNil)
                c.Check(s.api.CalledWith("container.runtime_status.preemptionNotice", text), NotNil)
@@ -966,7 +931,7 @@ func (s *TestSuite) TestRunTimeExceeded(c *C) {
        })
 
        c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*")
+       c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, "(?ms).*maximum run time exceeded.*")
 }
 
 func (s *TestSuite) TestContainerWaitFails(c *C) {
@@ -984,7 +949,7 @@ func (s *TestSuite) TestContainerWaitFails(c *C) {
        })
 
        c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Container is not running.*")
+       c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, "(?ms).*Container is not running.*")
 }
 
 func (s *TestSuite) TestCrunchstat(c *C) {
@@ -1007,11 +972,10 @@ func (s *TestSuite) TestCrunchstat(c *C) {
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
 
-       c.Assert(s.api.Logs["crunchstat"], NotNil)
-       c.Check(s.api.Logs["crunchstat"].String(), Matches, `(?ms).*mem \d+ swap \d+ pgmajfault \d+ rss.*`)
+       c.Check(logFileContent(c, s.runner, "crunchstat.txt"), Matches, `(?ms).*mem \d+ swap \d+ pgmajfault \d+ rss.*`)
 
        // Check that we called (*crunchstat.Reporter)Stop().
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Maximum crunch-run memory rss usage was \d+ bytes\n.*`)
+       c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Maximum crunch-run memory rss usage was \d+ bytes\n.*`)
 }
 
 func (s *TestSuite) TestNodeInfoLog(c *C) {
@@ -1032,19 +996,16 @@ func (s *TestSuite) TestNodeInfoLog(c *C) {
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
 
-       buf, err := fs.ReadFile(arvados.FS(s.runner.LogCollection), "/node.json")
-       c.Assert(err, IsNil)
-       json := string(buf)
+       json := logFileContent(c, s.runner, "node.json")
        c.Check(json, Matches, `(?ms).*"ProviderType": *"a1\.2xlarge".*`)
        c.Check(json, Matches, `(?ms).*"Price": *1\.2.*`)
 
-       c.Assert(s.api.Logs["node-info"], NotNil)
-       json = s.api.Logs["node-info"].String()
-       c.Check(json, Matches, `(?ms).*Host Information.*`)
-       c.Check(json, Matches, `(?ms).*CPU Information.*`)
-       c.Check(json, Matches, `(?ms).*Memory Information.*`)
-       c.Check(json, Matches, `(?ms).*Disk Space.*`)
-       c.Check(json, Matches, `(?ms).*Disk INodes.*`)
+       nodeinfo := logFileContent(c, s.runner, "node-info.txt")
+       c.Check(nodeinfo, Matches, `(?ms).*Host Information.*`)
+       c.Check(nodeinfo, Matches, `(?ms).*CPU Information.*`)
+       c.Check(nodeinfo, Matches, `(?ms).*Memory Information.*`)
+       c.Check(nodeinfo, Matches, `(?ms).*Disk Space.*`)
+       c.Check(nodeinfo, Matches, `(?ms).*Disk INodes.*`)
 }
 
 func (s *TestSuite) TestLogVersionAndRuntime(c *C) {
@@ -1062,11 +1023,10 @@ func (s *TestSuite) TestLogVersionAndRuntime(c *C) {
                return 0
        })
 
-       c.Assert(s.api.Logs["crunch-run"], NotNil)
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`)
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`)
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
+       c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
+       c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`)
+       c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`)
+       c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Using container runtime: stub.*`)
 }
 
 func (s *TestSuite) testLogRSSThresholds(c *C, ram int64, expected []int, notExpected int) {
@@ -1082,8 +1042,9 @@ func (s *TestSuite) testLogRSSThresholds(c *C, ram int64, expected []int, notExp
                "runtime_constraints": {"ram": `+fmt.Sprintf("%d", ram)+`},
                "state": "Locked"
        }`, nil, func() int { return 0 })
-       c.Logf("=== crunchstat logs\n%s\n", s.api.Logs["crunchstat"].String())
-       logs := s.api.Logs["crunch-run"].String()
+       logs := logFileContent(c, s.runner, "crunch-run.txt")
+       c.Log("=== crunchstat logs")
+       c.Log(logs)
        pattern := logLineStart + `Container using over %d%% of memory \(rss %d/%d bytes\)`
        var threshold int
        for _, threshold = range expected {
@@ -1121,7 +1082,7 @@ func (s *TestSuite) TestLogMaximaAfterRun(c *C) {
         "runtime_constraints": {"ram": `+fmt.Sprintf("%d", s.debian12MemoryCurrent*10)+`},
         "state": "Locked"
     }`, nil, func() int { return 0 })
-       logs := s.api.Logs["crunch-run"].String()
+       logs := logFileContent(c, s.runner, "crunch-run.txt")
        for _, expected := range []string{
                `Maximum disk usage was \d+%, \d+/\d+ bytes`,
                fmt.Sprintf(`Maximum container memory swap usage was %d bytes`, s.debian12SwapCurrent),
@@ -1189,8 +1150,7 @@ func (s *TestSuite) TestContainerRecordLog(c *C) {
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
 
-       c.Assert(s.api.Logs["container"], NotNil)
-       c.Check(s.api.Logs["container"].String(), Matches, `(?ms).*container_image.*`)
+       c.Check(logFileContent(c, s.runner, "container.json"), Matches, `(?ms).*container_image.*`)
 }
 
 func (s *TestSuite) TestFullRunStderr(c *C) {
@@ -1215,8 +1175,8 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
        c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
        c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
 
-       c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello\n")
-       c.Check(s.api.Logs["stderr"].String(), Matches, ".*world\n")
+       c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, ".*hello\n")
+       c.Check(logFileContent(c, s.runner, "stderr.txt"), Matches, ".*world\n")
 }
 
 func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
@@ -1237,8 +1197,7 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-       c.Log(s.api.Logs["stdout"])
-       c.Check(s.api.Logs["stdout"].String(), Matches, `.*workdir=""\n`)
+       c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, `.*workdir=""`)
 }
 
 func (s *TestSuite) TestFullRunSetCwd(c *C) {
@@ -1259,7 +1218,7 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(s.api.Logs["stdout"].String(), Matches, ".*/bin\n")
+       c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, ".*/bin\n")
 }
 
 func (s *TestSuite) TestFullRunSetOutputStorageClasses(c *C) {
@@ -1281,7 +1240,7 @@ func (s *TestSuite) TestFullRunSetOutputStorageClasses(c *C) {
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(s.api.Logs["stdout"].String(), Matches, ".*/bin\n")
+       c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, ".*/bin\n")
        c.Check(s.testDispatcherKeepClient.StorageClasses, DeepEquals, []string{"foo", "bar"})
        c.Check(s.testContainerKeepClient.StorageClasses, DeepEquals, []string{"foo", "bar"})
 }
@@ -1378,14 +1337,11 @@ func (s *TestSuite) testStopContainer(c *C) {
        case err = <-done:
                c.Check(err, IsNil)
        }
-       for k, v := range s.api.Logs {
-               c.Log(k)
-               c.Log(v.String(), "\n")
-       }
+       dumpAllLogFiles(c, s.runner)
 
        c.Check(s.api.CalledWith("container.log", nil), NotNil)
        c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
-       c.Check(s.api.Logs["stdout"].String(), Matches, "(?ms).*foo\n$")
+       c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, "(?ms).*foo\n$")
 }
 
 func (s *TestSuite) TestFullRunSetEnv(c *C) {
@@ -1406,7 +1362,7 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(s.api.Logs["stdout"].String(), Matches, `.*map\[FROBIZ:bilbo\]\n`)
+       c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, `.*map\[FROBIZ:bilbo\]`)
 }
 
 type ArvMountCmdLine struct {
@@ -1831,7 +1787,7 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) {
        })
        c.Check(s.api.CalledWith("container.exit_code", 3), NotNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*status code 3\n.*`)
+       c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*status code 3\n.*`)
 }
 
 func (s *TestSuite) TestFullRunSetOutput(c *C) {
@@ -1886,7 +1842,7 @@ func (s *TestSuite) TestArvMountRuntimeStatusWarning(c *C) {
        c.Check(s.api.CalledWith("container.runtime_status.warning", "arv-mount: Keep write error"), NotNil)
        c.Check(s.api.CalledWith("container.runtime_status.warningDetail", "Test: Keep write error: I am a teapot"), NotNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Container exited with status code 137 \(signal 9, SIGKILL\).*`)
+       c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Container exited with status code 137 \(signal 9, SIGKILL\).*`)
 }
 
 func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C) {
@@ -2196,13 +2152,14 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
     "state": "Locked"
 }`, nil, func() int { return 0 })
                c.Check(s.api.CalledWith("container.state", nextState), NotNil)
-               c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+               logs := logFileContent(c, s.runner, "crunch-run.txt")
+               c.Check(logs, Matches, "(?ms).*unable to run containers.*")
                if s.runner.brokenNodeHook != "" {
-                       c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
-                       c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
-                       c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
+                       c.Check(logs, Matches, "(?ms).*Running broken node hook.*")
+                       c.Check(logs, Matches, "(?ms).*killme.*")
+                       c.Check(logs, Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
                } else {
-                       c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
+                       c.Check(logs, Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
                }
        }
 }
@@ -2227,7 +2184,7 @@ func (s *TestSuite) TestBadCommand(c *C) {
     "state": "Locked"
 }`, nil, func() int { return 0 })
                c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
-               c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
+               c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, "(?ms).*Possible causes:.*is missing.*")
        }
 }
 
@@ -2337,7 +2294,7 @@ func (s *TestSuite) TestCalculateCost(c *C) {
        cr := s.runner
        cr.costStartTime = now.Add(-time.Hour)
        var logbuf bytes.Buffer
-       cr.CrunchLog.Immediate = log.New(&logbuf, "", 0)
+       cr.CrunchLog = newLogWriter(&logbuf)
 
        // if there's no InstanceType env var, cost is calculated as 0
        os.Unsetenv("InstanceType")
@@ -2463,3 +2420,20 @@ type FakeProcess struct {
 func (fp FakeProcess) CmdlineSlice() ([]string, error) {
        return fp.cmdLine, nil
 }
+
+func logFileContent(c *C, cr *ContainerRunner, fnm string) string {
+       buf, err := fs.ReadFile(arvados.FS(cr.LogCollection), fnm)
+       c.Assert(err, IsNil)
+       return string(buf)
+}
+
+func dumpAllLogFiles(c *C, cr *ContainerRunner) {
+       d, err := cr.LogCollection.OpenFile("/", os.O_RDONLY, 0)
+       c.Assert(err, IsNil)
+       fis, err := d.Readdir(-1)
+       c.Assert(err, IsNil)
+       for _, fi := range fis {
+               c.Logf("=== %s", fi.Name())
+               c.Log(logFileContent(c, cr, fi.Name()))
+       }
+}
index c693dbcb960250f1e502e42ae971e5227bb58189..f91a5c62cdc1761da37b08d1889025902867f58d 100644 (file)
@@ -5,13 +5,15 @@
 package crunchrun
 
 import (
+       "fmt"
+       "io"
        "os/exec"
 )
 
 // nvidiaModprobe makes sure all the nvidia kernel modules and devices
 // are set up.  If we don't have all the modules/devices set up we get
 // "CUDA_ERROR_UNKNOWN".
-func nvidiaModprobe(writer *ThrottledLogger) {
+func nvidiaModprobe(writer io.Writer) {
        // The underlying problem is that when normally running
        // directly on the host, the CUDA SDK will automatically
        // detect and set up the devices on demand.  However, when
@@ -42,7 +44,7 @@ func nvidiaModprobe(writer *ThrottledLogger) {
        nvidiaSmi.Stderr = writer
        err := nvidiaSmi.Run()
        if err != nil {
-               writer.Printf("Warning %v: %v", nvidiaSmi.Args, err)
+               fmt.Fprintf(writer, "Warning %v: %v\n", nvidiaSmi.Args, err)
        }
 
        // Load the kernel modules & devices associated with
@@ -63,7 +65,7 @@ func nvidiaModprobe(writer *ThrottledLogger) {
                nvmodprobe.Stderr = writer
                err = nvmodprobe.Run()
                if err != nil {
-                       writer.Printf("Warning %v: %v", nvmodprobe.Args, err)
+                       fmt.Fprintf(writer, "Warning %v: %v\n", nvmodprobe.Args, err)
                }
        }
 }
index 91a1b77cf4fa6ab2d22bf9b476be8a756ea6b0b1..04c37249697bf5cbb0b3fbe81dfa164fb835894d 100644 (file)
 package crunchrun
 
 import (
-       "bufio"
        "bytes"
        "encoding/json"
-       "fmt"
        "io"
        "log"
-       "regexp"
-       "strings"
-       "sync"
        "time"
-
-       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
 )
 
-// Timestamper is the signature for a function that takes a timestamp and
-// return a formated string value.
-type Timestamper func(t time.Time) string
-
-// Logging plumbing:
-//
-// ThrottledLogger.Logger -> ThrottledLogger.Write ->
-// ThrottledLogger.buf -> ThrottledLogger.flusher ->
-// ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create
-//
-// For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull
-// data from the stdout/stderr Reader and send to the Logger.
+// rfc3339NanoFixed is a fixed-width version of time.RFC3339Nano.
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
 
-// ThrottledLogger accepts writes, prepends a timestamp to each line of the
-// write, and periodically flushes to a downstream writer.  It supports the
-// "Logger" and "WriteCloser" interfaces.
-type ThrottledLogger struct {
-       *log.Logger
-       buf *bytes.Buffer
-       sync.Mutex
-       writer   io.WriteCloser
-       flush    chan struct{}
-       stopped  chan struct{}
-       stopping chan struct{}
-       Timestamper
-       Immediate    *log.Logger
-       pendingFlush bool
+// prefixer wraps an io.Writer, inserting a string returned by
+// prefixFunc at the beginning of each line.
+type prefixer struct {
+       writer     io.Writer
+       prefixFunc func() string
+       unfinished bool // true if the most recent write ended with a non-newline char
 }
 
-// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
-const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-
-// RFC3339Timestamp formats t as RFC3339NanoFixed.
-func RFC3339Timestamp(t time.Time) string {
-       return t.Format(RFC3339NanoFixed)
-}
-
-// Write prepends a timestamp to each line of the input data and
-// appends to the internal buffer. Each line is also logged to
-// tl.Immediate, if tl.Immediate is not nil.
-func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
-       tl.Mutex.Lock()
-       defer tl.Mutex.Unlock()
-
-       if tl.buf == nil {
-               tl.buf = &bytes.Buffer{}
-       }
-
-       now := tl.Timestamper(time.Now().UTC())
-       sc := bufio.NewScanner(bytes.NewBuffer(p))
-       for err == nil && sc.Scan() {
-               out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
-               if tl.Immediate != nil {
-                       tl.Immediate.Print(out[:len(out)-1])
-               }
-               _, err = io.WriteString(tl.buf, out)
-       }
-       if err == nil {
-               err = sc.Err()
-               if err == nil {
-                       n = len(p)
-               }
-       }
-
-       if int64(tl.buf.Len()) >= crunchLogBytesPerEvent {
-               // Non-blocking send.  Try send a flush if it is ready to
-               // accept it.  Otherwise do nothing because a flush is already
-               // pending.
-               select {
-               case tl.flush <- struct{}{}:
-               default:
-               }
+// newTimestamper wraps an io.Writer, inserting an RFC3339NanoFixed
+// timestamp at the beginning of each line.
+func newTimestamper(w io.Writer) *prefixer {
+       return &prefixer{
+               writer:     w,
+               prefixFunc: func() string { return time.Now().UTC().Format(rfc3339NanoFixed + " ") },
        }
-
-       return
 }
 
-// Periodically check the current buffer; if not empty, send it on the
-// channel to the goWriter goroutine.
-func (tl *ThrottledLogger) flusher() {
-       ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents))
-       defer ticker.Stop()
-       for stopping := false; !stopping; {
-               select {
-               case <-tl.stopping:
-                       // flush tl.buf and exit the loop
-                       stopping = true
-               case <-tl.flush:
-               case <-ticker.C:
-               }
-
-               var ready *bytes.Buffer
-
-               tl.Mutex.Lock()
-               ready, tl.buf = tl.buf, &bytes.Buffer{}
-               tl.Mutex.Unlock()
-
-               if ready != nil && ready.Len() > 0 {
-                       tl.writer.Write(ready.Bytes())
-               }
-       }
-       close(tl.stopped)
-}
-
-// Close the flusher goroutine and wait for it to complete, then close the
-// underlying Writer.
-func (tl *ThrottledLogger) Close() error {
-       select {
-       case <-tl.stopping:
-               // already stopped
-       default:
-               close(tl.stopping)
+// newStringPrefixer wraps an io.Writer, inserting the given string at
+// the beginning of each line. The given string should include a
+// trailing space for readability.
+func newStringPrefixer(w io.Writer, s string) *prefixer {
+       return &prefixer{
+               writer:     w,
+               prefixFunc: func() string { return s },
        }
-       <-tl.stopped
-       return tl.writer.Close()
 }
 
-const (
-       // MaxLogLine is the maximum length of stdout/stderr lines before they are split.
-       MaxLogLine = 1 << 12
-)
-
-// ReadWriteLines reads lines from a reader and writes to a Writer, with long
-// line splitting.
-func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
-       reader := bufio.NewReaderSize(in, MaxLogLine)
-       var prefix string
-       for {
-               line, isPrefix, err := reader.ReadLine()
-               if err == io.EOF {
-                       break
-               } else if err != nil {
-                       writer.Write([]byte(fmt.Sprintln("error reading container log:", err)))
-               }
-               var suffix string
-               if isPrefix {
-                       suffix = "[...]\n"
-               }
-
-               if prefix == "" && suffix == "" {
-                       writer.Write(line)
-               } else {
-                       writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix)))
+func (tp *prefixer) Write(p []byte) (n int, err error) {
+       for len(p) > 0 && err == nil {
+               if !tp.unfinished {
+                       _, err = io.WriteString(tp.writer, tp.prefixFunc())
+                       if err != nil {
+                               return
+                       }
                }
-
-               // Set up prefix for following line
-               if isPrefix {
-                       prefix = "[...]"
+               newline := bytes.IndexRune(p, '\n')
+               var nn int
+               if newline < 0 {
+                       tp.unfinished = true
+                       nn, err = tp.writer.Write(p)
+                       p = nil
                } else {
-                       prefix = ""
+                       tp.unfinished = false
+                       nn, err = tp.writer.Write(p[:newline+1])
+                       p = p[nn:]
                }
+               n += nn
        }
-       done <- true
-}
-
-// NewThrottledLogger creates a new thottled logger that
-//   - prepends timestamps to each line, and
-//   - batches log messages and only calls the underlying Writer
-//     at most once per "crunchLogSecondsBetweenEvents" seconds.
-func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
-       tl := &ThrottledLogger{}
-       tl.flush = make(chan struct{}, 1)
-       tl.stopped = make(chan struct{})
-       tl.stopping = make(chan struct{})
-       tl.writer = writer
-       tl.Logger = log.New(tl, "", 0)
-       tl.Timestamper = RFC3339Timestamp
-       go tl.flusher()
-       return tl
-}
-
-// Log throttling rate limiting config parameters
-var crunchLimitLogBytesPerJob int64 = 67108864
-var crunchLogThrottleBytes int64 = 65536
-var crunchLogThrottlePeriod time.Duration = time.Second * 60
-var crunchLogThrottleLines int64 = 1024
-var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
-var crunchLogBytesPerEvent int64 = 4096
-var crunchLogSecondsBetweenEvents = time.Second
-var crunchLogUpdatePeriod = time.Hour / 2
-var crunchLogUpdateSize = int64(1 << 25)
-
-// ArvLogWriter is an io.WriteCloser that processes each write by
-// writing it through to another io.WriteCloser (typically a
-// CollectionFileWriter) and creating an Arvados log entry.
-type ArvLogWriter struct {
-       ArvClient     IArvadosClient
-       UUID          string
-       loggingStream string
-       writeCloser   io.WriteCloser
-
-       // for rate limiting
-       bytesLogged                  int64
-       logThrottleResetTime         time.Time
-       logThrottleLinesSoFar        int64
-       logThrottleBytesSoFar        int64
-       logThrottleBytesSkipped      int64
-       logThrottleIsOpen            bool
-       logThrottlePartialLineNextAt time.Time
-       logThrottleFirstPartialLine  bool
-       bufToFlush                   bytes.Buffer
-       bufFlushedAt                 time.Time
-       closing                      bool
+       return
 }
 
-func (arvlog *ArvLogWriter) Write(p []byte) (int, error) {
-       // Write to the next writer in the chain (a file in Keep)
-       var err1 error
-       if arvlog.writeCloser != nil {
-               _, err1 = arvlog.writeCloser.Write(p)
-       }
-
-       // write to API after checking rate limit
-       now := time.Now()
-
-       if now.After(arvlog.logThrottleResetTime) {
-               // It has been more than throttle_period seconds since the last
-               // checkpoint; so reset the throttle
-               if arvlog.logThrottleBytesSkipped > 0 {
-                       arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped))
-               }
-
-               arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod)
-               arvlog.logThrottleBytesSoFar = 0
-               arvlog.logThrottleLinesSoFar = 0
-               arvlog.logThrottleBytesSkipped = 0
-               arvlog.logThrottleIsOpen = true
-       }
-
-       lines := bytes.Split(p, []byte("\n"))
-
-       for _, line := range lines {
-               // Short circuit the counting code if we're just going to throw
-               // away the data anyway.
-               if !arvlog.logThrottleIsOpen {
-                       arvlog.logThrottleBytesSkipped += int64(len(line))
-                       continue
-               } else if len(line) == 0 {
-                       continue
-               }
-
-               // check rateLimit
-               logOpen, msg := arvlog.rateLimit(line, now)
-               if logOpen {
-                       arvlog.bufToFlush.WriteString(string(msg) + "\n")
-               }
-       }
-
-       if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent ||
-               (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) ||
-               arvlog.closing) && (arvlog.bufToFlush.Len() > 0) {
-               // write to API
-               lr := arvadosclient.Dict{"log": arvadosclient.Dict{
-                       "object_uuid": arvlog.UUID,
-                       "event_type":  arvlog.loggingStream,
-                       "properties":  map[string]string{"text": arvlog.bufToFlush.String()}}}
-               err2 := arvlog.ArvClient.Create("logs", lr, nil)
-
-               arvlog.bufToFlush = bytes.Buffer{}
-               arvlog.bufFlushedAt = now
-
-               if err1 != nil || err2 != nil {
-                       return 0, fmt.Errorf("%s ; %s", err1, err2)
-               }
-       }
-
-       return len(p), nil
+// logWriter adds log.Logger methods to an io.Writer.
+type logWriter struct {
+       io.Writer
+       *log.Logger
 }
 
-// Close the underlying writer
-func (arvlog *ArvLogWriter) Close() (err error) {
-       arvlog.closing = true
-       arvlog.Write([]byte{})
-       if arvlog.writeCloser != nil {
-               err = arvlog.writeCloser.Close()
-               arvlog.writeCloser = nil
+func newLogWriter(w io.Writer) *logWriter {
+       return &logWriter{
+               Writer: w,
+               Logger: log.New(w, "", 0),
        }
-       return err
 }
 
-var lineRegexp = regexp.MustCompile(`^\S+ (.*)`)
-
-// Test for hard cap on total output and for log throttling. Returns whether
-// the log line should go to output or not. Returns message if limit exceeded.
-func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) {
-       message := ""
-       lineSize := int64(len(line))
-
-       if arvlog.logThrottleIsOpen {
-               matches := lineRegexp.FindStringSubmatch(string(line))
-
-               if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") {
-                       // This is a partial line.
-
-                       if arvlog.logThrottleFirstPartialLine {
-                               // Partial should be suppressed.  First time this is happening for this line so provide a message instead.
-                               arvlog.logThrottleFirstPartialLine = false
-                               arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
-                               arvlog.logThrottleBytesSkipped += lineSize
-                               return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.",
-                                       RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second))
-                       } else if now.After(arvlog.logThrottlePartialLineNextAt) {
-                               // The throttle period has passed.  Update timestamp and let it through.
-                               arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod)
-                       } else {
-                               // Suppress line.
-                               arvlog.logThrottleBytesSkipped += lineSize
-                               return false, line
-                       }
-               } else {
-                       // Not a partial line so reset.
-                       arvlog.logThrottlePartialLineNextAt = time.Time{}
-                       arvlog.logThrottleFirstPartialLine = true
-               }
-
-               arvlog.bytesLogged += lineSize
-               arvlog.logThrottleBytesSoFar += lineSize
-               arvlog.logThrottleLinesSoFar++
-
-               if arvlog.bytesLogged > crunchLimitLogBytesPerJob {
-                       message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.",
-                               RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob)
-                       arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour))
-                       arvlog.logThrottleIsOpen = false
-
-               } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes {
-                       remainingTime := arvlog.logThrottleResetTime.Sub(now)
-                       message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.",
-                               RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
-                       arvlog.logThrottleIsOpen = false
-
-               } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines {
-                       remainingTime := arvlog.logThrottleResetTime.Sub(now)
-                       message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.",
-                               RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second)
-                       arvlog.logThrottleIsOpen = false
-
-               }
-       }
-
-       if !arvlog.logThrottleIsOpen {
-               // Don't log anything if any limit has been exceeded. Just count lossage.
-               arvlog.logThrottleBytesSkipped += lineSize
-       }
-
-       if message != "" {
-               // Yes, write to logs, but use our "rate exceeded" message
-               // instead of the log message that exceeded the limit.
-               message += " A complete log is still being written to Keep, and will be available when the job finishes."
-               return true, []byte(message)
-       }
-       return arvlog.logThrottleIsOpen, line
-}
+var crunchLogUpdatePeriod = time.Hour / 2
+var crunchLogUpdateSize = int64(1 << 25)
 
 // load the rate limit discovery config parameters
 func loadLogThrottleParams(clnt IArvadosClient) {
@@ -394,13 +103,6 @@ func loadLogThrottleParams(clnt IArvadosClient) {
                }
        }
 
-       loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
-       loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
-       loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
-       loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
-       loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
-       loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
-       loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
        loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
        loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
 
index ee3320c7c34b0ac0b2fa012ad8b8a5276ccdf176..e747a2069e43888ee7ee06fa444d7cee599c3718 100644 (file)
@@ -13,26 +13,25 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        . "gopkg.in/check.v1"
        check "gopkg.in/check.v1"
 )
 
-type LoggingTestSuite struct {
-       client *arvados.Client
-}
-
-type TestTimestamper struct {
-       count int
+// newTestTimestamper wraps an io.Writer, inserting a predictable
+// RFC3339NanoFixed timestamp at the beginning of each line.
+func newTestTimestamper(w io.Writer) *prefixer {
+       count := 0
+       return &prefixer{
+               writer: w,
+               prefixFunc: func() string {
+                       count++
+                       return fmt.Sprintf("2015-12-29T15:51:45.%09dZ ", count)
+               },
+       }
 }
 
-func (stamper *TestTimestamper) Timestamp(t time.Time) string {
-       stamper.count++
-       t, err := time.ParseInLocation(time.RFC3339Nano, fmt.Sprintf("2015-12-29T15:51:45.%09dZ", stamper.count), t.Location())
-       if err != nil {
-               panic(err)
-       }
-       return RFC3339Timestamp(t)
+type LoggingTestSuite struct {
+       client *arvados.Client
 }
 
 // Gocheck boilerplate
@@ -48,26 +47,20 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-dz642-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
-       cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+       f, err := cr.openLogFile("crunch-run")
+       c.Assert(err, IsNil)
+       cr.CrunchLog = newLogWriter(newTestTimestamper(f))
 
        cr.CrunchLog.Print("Hello world!")
        cr.CrunchLog.Print("Goodbye")
-       cr.CrunchLog.Close()
-
-       c.Check(api.Calls, Equals, 1)
-
-       mt, err := cr.LogCollection.MarshalManifest(".")
-       c.Check(err, IsNil)
-       c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
 
-       logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
-               "2015-12-29T15:51:45.000000002Z Goodbye\n"
+       c.Check(api.Calls, Equals, 0)
 
-       c.Check(api.Content[0]["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
-       c.Check(api.Content[0]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext)
-       s.checkWroteBlock(c, kc, "74561df9ae65ee9f35d5661d42454264+83", logtext)
+       logs := logFileContent(c, cr, "crunch-run.txt")
+       c.Check(logs, Matches, `....-..-..T..:..:..\..........Z Hello world!\n`+
+               `....-..-..T..:..:..\..........Z Goodbye\n`)
 }
 
 func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
@@ -79,59 +72,34 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
        defer kc.Close()
        cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
-       cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
-       cr.CrunchLog.Immediate = nil
-
+       f, err := cr.openLogFile("crunch-run")
+       c.Assert(err, IsNil)
+       cr.CrunchLog = newLogWriter(newTestTimestamper(f))
        for i := 0; i < 2000000; i++ {
                cr.CrunchLog.Printf("Hello %d", i)
        }
        cr.CrunchLog.Print("Goodbye")
-       cr.CrunchLog.Close()
-
-       c.Check(api.Calls > 0, Equals, true)
-       c.Check(api.Calls < 2000000, Equals, true)
-
-       mt, err := cr.LogCollection.MarshalManifest(".")
-       c.Check(err, IsNil)
-       c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunch-run.txt\n")
-}
-
-func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
-       api := &ArvTestClient{}
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
-       c.Assert(err, IsNil)
-       ts := &TestTimestamper{}
-       cr.CrunchLog.Timestamper = ts.Timestamp
-       w, err := cr.NewLogWriter("stdout")
-       c.Assert(err, IsNil)
-       stdout := NewThrottledLogger(w)
-       stdout.Timestamper = ts.Timestamp
-
-       cr.CrunchLog.Print("Hello world!")
-       stdout.Print("Doing stuff")
-       cr.CrunchLog.Print("Goodbye")
-       stdout.Print("Blurb")
-       cr.CrunchLog.Close()
-       stdout.Close()
-
-       logText := make(map[string]string)
-       for _, content := range api.Content {
-               log := content["log"].(arvadosclient.Dict)
-               logText[log["event_type"].(string)] += log["properties"].(map[string]string)["text"]
-       }
 
-       c.Check(logText["crunch-run"], Equals, `2015-12-29T15:51:45.000000001Z Hello world!
-2015-12-29T15:51:45.000000003Z Goodbye
-`)
-       c.Check(logText["stdout"], Equals, `2015-12-29T15:51:45.000000002Z Doing stuff
-2015-12-29T15:51:45.000000004Z Blurb
+       logs := logFileContent(c, cr, "crunch-run.txt")
+       c.Check(strings.Count(logs, "\n"), Equals, 2000001)
+       // Redact most of the logs except the start/end for the regexp
+       // match -- otherwise, when the regexp fails, gocheck spams
+       // the test logs with tens of megabytes of quoted strings.
+       c.Assert(len(logs) > 10000, Equals, true)
+       c.Check(logs[:500]+"\n...\n"+logs[len(logs)-500:], Matches, `(?ms)2015-12-29T15:51:45.000000001Z Hello 0
+2015-12-29T15:51:45.000000002Z Hello 1
+2015-12-29T15:51:45.000000003Z Hello 2
+2015-12-29T15:51:45.000000004Z Hello 3
+.*
+2015-12-29T15:51:45.001999998Z Hello 1999997
+2015-12-29T15:51:45.001999999Z Hello 1999998
+2015-12-29T15:51:45.002000000Z Hello 1999999
+2015-12-29T15:51:45.002000001Z Goodbye
 `)
 
        mt, err := cr.LogCollection.MarshalManifest(".")
        c.Check(err, IsNil)
-       c.Check(mt, Equals, ". 48f9023dc683a850b1c9b482b14c4b97+163 0:83:crunch-run.txt 83:80:stdout.txt\n")
+       c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunch-run.txt\n")
 }
 
 func (s *LoggingTestSuite) TestLogUpdate(c *C) {
@@ -149,14 +117,13 @@ func (s *LoggingTestSuite) TestLogUpdate(c *C) {
                api := &ArvTestClient{}
                kc := &KeepTestClient{}
                defer kc.Close()
-               cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
+               cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-dz642-zzzzzzzzzzzzzzz")
                c.Assert(err, IsNil)
-               ts := &TestTimestamper{}
-               cr.CrunchLog.Timestamper = ts.Timestamp
-               w, err := cr.NewLogWriter("stdout")
+               f, err := cr.openLogFile("crunch-run")
+               c.Assert(err, IsNil)
+               cr.CrunchLog = newLogWriter(newTestTimestamper(f))
+               stdout, err := cr.openLogFile("stdout")
                c.Assert(err, IsNil)
-               stdout := NewThrottledLogger(w)
-               stdout.Timestamper = ts.Timestamp
 
                c.Check(cr.logUUID, Equals, "")
                cr.CrunchLog.Printf("Hello %1000s", "space")
@@ -165,75 +132,18 @@ func (s *LoggingTestSuite) TestLogUpdate(c *C) {
                }
                c.Check(cr.logUUID, Not(Equals), "")
                cr.CrunchLog.Print("Goodbye")
-               fmt.Fprint(stdout, "Goodbye\n")
-               cr.CrunchLog.Close()
-               stdout.Close()
-               w.Close()
+               fmt.Fprintln(stdout, "Goodbye")
+
+               c.Check(logFileContent(c, cr, "crunch-run.txt"), Matches, `....-..-..T..:..:............Z Hello  {995}space\n`+
+                       `....-..-..T..:..:............Z Goodbye\n`)
+               c.Check(logFileContent(c, cr, "stdout.txt"), Matches, `Goodbye\n`)
 
                mt, err := cr.LogCollection.MarshalManifest(".")
                c.Check(err, IsNil)
-               // Block packing depends on whether there's an update
-               // between the two Goodbyes -- either way the first
-               // block will be 4dc76.
-               c.Check(mt, Matches, `. 4dc76e0a212bfa30c39d76d8c16da0c0\+1038 (afc503bc1b9a828b4bb543cb629e936c\+78|90699dc22545cd74a0664303f70bc05a\+39 276b49339fd5203d15a93ff3de11bfb9\+39) 0:1077:crunch-run.txt 1077:39:stdout.txt\n`)
+               c.Check(mt, Matches, `. 4dc76e0a212bfa30c39d76d8c16da0c0\+1038 5be52044a8c51e7b62dd62be07872968\+47 0:1077:crunch-run.txt 1077:8:stdout.txt\n`)
        }
 }
 
-func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytes(c *C) {
-       s.testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
-}
-
-func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleLines(c *C) {
-       s.testWriteLogsWithRateLimit(c, "crunchLogThrottleLines", 1, 1024, "Exceeded rate 1 lines per 60 seconds")
-}
-
-func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytesPerEvent(c *C) {
-       s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 50, 67108864, "Exceeded log limit 50 bytes (crunch_limit_log_bytes_per_job)")
-}
-
-func (s *LoggingTestSuite) TestWriteLogsWithZeroBytesPerJob(c *C) {
-       s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 0, 67108864, "Exceeded log limit 0 bytes (crunch_limit_log_bytes_per_job)")
-}
-
-func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, throttleDefault int, expected string) {
-       discoveryMap[throttleParam] = float64(throttleValue)
-       defer func() {
-               discoveryMap[throttleParam] = float64(throttleDefault)
-       }()
-
-       api := &ArvTestClient{}
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz")
-       c.Assert(err, IsNil)
-       cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
-
-       cr.CrunchLog.Print("Hello world!")
-       cr.CrunchLog.Print("Goodbye")
-       cr.CrunchLog.Close()
-
-       c.Check(api.Calls, Equals, 1)
-
-       mt, err := cr.LogCollection.MarshalManifest(".")
-       c.Check(err, IsNil)
-       c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
-
-       logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
-               "2015-12-29T15:51:45.000000002Z Goodbye\n"
-
-       c.Check(api.Content[0]["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
-       stderrLog := api.Content[0]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]
-       c.Check(true, Equals, strings.Contains(stderrLog, expected))
-       s.checkWroteBlock(c, kc, "74561df9ae65ee9f35d5661d42454264+83", logtext)
-}
-
-func (s *LoggingTestSuite) checkWroteBlock(c *C, kc *KeepTestClient, locator, expect string) {
-       buf := make([]byte, len([]byte(expect))+1)
-       n, err := kc.ReadAt(locator, buf, 0)
-       c.Check(err, IsNil)
-       c.Check(string(buf[:n]), Equals, expect)
-}
-
 type filterSuite struct{}
 
 var _ = Suite(&filterSuite{})