From: Tom Clegg Date: Wed, 1 May 2024 13:39:29 +0000 (-0400) Subject: 21611: Remove log-to-logs-table code. X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/5dfd9a57f9d65ea91295e8502cf7f6e748fc4010 21611: Remove log-to-logs-table code. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index f8941162ec..8627045411 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -81,9 +81,6 @@ type IKeepClient interface { SetStorageClasses(sc []string) } -// NewLogWriter is a factory function to create a new log writer. -type NewLogWriter func(name string) (io.WriteCloser, error) - type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error) type MkTempDir func(string, string) (string, error) @@ -125,8 +122,7 @@ type ContainerRunner struct { Container arvados.Container token string ExitCode *int - NewLogWriter NewLogWriter - CrunchLog *ThrottledLogger + CrunchLog *logWriter logUUID string logMtx sync.Mutex LogCollection arvados.CollectionFileSystem @@ -168,7 +164,7 @@ type ContainerRunner struct { enableNetwork string // one of "default" or "always" networkMode string // "none", "host", or "" -- passed through to executor brokenNodeHook string // script to run if node appears to be broken - arvMountLog *ThrottledLogger + arvMountLog io.WriteCloser containerWatchdogInterval time.Duration @@ -303,11 +299,10 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e } c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token) - w, err := runner.NewLogWriter("arv-mount") + runner.arvMountLog, err = runner.openLogFile("arv-mount") if err != nil { return nil, err } - runner.arvMountLog = NewThrottledLogger(w) scanner := logScanner{ Patterns: []string{ "Keep write error", @@ -735,13 +730,13 @@ func (runner *ContainerRunner) stopHoststat() error { } func (runner *ContainerRunner) startHoststat() error { - w, err := runner.NewLogWriter("hoststat") + var err error + runner.hoststatLogger, err = runner.openLogFile("hoststat") if err != nil { return err } - runner.hoststatLogger = NewThrottledLogger(w) runner.hoststatReporter = &crunchstat.Reporter{ - Logger: log.New(runner.hoststatLogger, "", 0), + Logger: newLogWriter(runner.hoststatLogger), // Our own cgroup is the "host" cgroup, in the sense // that it accounts for resource usage outside the // container. It doesn't count _all_ resource usage on @@ -759,15 +754,15 @@ func (runner *ContainerRunner) startHoststat() error { } func (runner *ContainerRunner) startCrunchstat() error { - w, err := runner.NewLogWriter("crunchstat") + var err error + runner.statLogger, err = runner.openLogFile("crunchstat") if err != nil { return err } - runner.statLogger = NewThrottledLogger(w) runner.statReporter = &crunchstat.Reporter{ Pid: runner.executor.Pid, FS: runner.crunchstatFakeFS, - Logger: log.New(runner.statLogger, "", 0), + Logger: newLogWriter(runner.statLogger), MemThresholds: map[string][]crunchstat.Threshold{ "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}), }, @@ -790,7 +785,7 @@ type infoCommand struct { // might differ from what's described in the node record (see // LogNodeRecord). func (runner *ContainerRunner) LogHostInfo() (err error) { - w, err := runner.NewLogWriter("node-info") + w, err := runner.openLogFile("node-info") if err != nil { return } @@ -875,13 +870,6 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str if err != nil { return false, err } - w := &ArvLogWriter{ - ArvClient: runner.DispatcherArvClient, - UUID: runner.Container.UUID, - loggingStream: label, - writeCloser: writer, - } - reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params)) if err != nil { return false, fmt.Errorf("error getting %s record: %v", label, err) @@ -901,12 +889,12 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str return false, nil } // Re-encode it using indentation to improve readability - enc := json.NewEncoder(w) + enc := json.NewEncoder(writer) enc.SetIndent("", " ") if err = enc.Encode(items[0]); err != nil { return false, fmt.Errorf("error logging %s record: %v", label, err) } - err = w.Close() + err = writer.Close() if err != nil { return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err) } @@ -974,10 +962,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st return err } stdout = f - } else if w, err := runner.NewLogWriter("stdout"); err != nil { + } else if w, err := runner.openLogFile("stdout"); err != nil { return err } else { - stdout = NewThrottledLogger(w) + stdout = w } if mnt, ok := runner.Container.Mounts["stderr"]; ok { @@ -986,10 +974,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st return err } stderr = f - } else if w, err := runner.NewLogWriter("stderr"); err != nil { + } else if w, err := runner.openLogFile("stderr"); err != nil { return err } else { - stderr = NewThrottledLogger(w) + stderr = w } env := runner.Container.Environment @@ -1442,19 +1430,11 @@ func (runner *ContainerRunner) CommitLogs() error { if runner.arvMountLog != nil { runner.arvMountLog.Close() } - runner.CrunchLog.Close() - - // Closing CrunchLog above allows them to be committed to Keep at this - // point, but re-open crunch log with ArvClient in case there are any - // other further errors (such as failing to write the log to Keep!) - // while shutting down - runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ - ArvClient: runner.DispatcherArvClient, - UUID: runner.Container.UUID, - loggingStream: "crunch-run", - writeCloser: nil, - }) - runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0) + + // From now on just log to stderr, in case there are + // any other further errors (such as failing to write + // the log to Keep!) while shutting down + runner.CrunchLog = newLogWriter(newTimestamper(newStringPrefixer(os.Stderr, runner.Container.UUID+" "))) }() if runner.keepstoreLogger != nil { @@ -1614,18 +1594,8 @@ func (runner *ContainerRunner) IsCancelled() bool { return runner.cCancelled } -// NewArvLogWriter creates an ArvLogWriter -func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) { - writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return nil, err - } - return &ArvLogWriter{ - ArvClient: runner.DispatcherArvClient, - UUID: runner.Container.UUID, - loggingStream: name, - writeCloser: writer, - }, nil +func (runner *ContainerRunner) openLogFile(name string) (io.WriteCloser, error) { + return runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666) } // Run the full container lifecycle. @@ -1655,9 +1625,7 @@ func (runner *ContainerRunner) Run() (err error) { defer func() { runner.CleanupDirs() - runner.CrunchLog.Printf("crunch-run finished") - runner.CrunchLog.Close() }() err = runner.fetchContainerRecord() @@ -1851,7 +1819,6 @@ func NewContainerRunner(dispatcherClient *arvados.Client, DispatcherArvClient: dispatcherArvClient, DispatcherKeepClient: dispatcherKeepClient, } - cr.NewLogWriter = cr.NewArvLogWriter cr.RunArvMount = cr.ArvMountCmd cr.MkTempDir = ioutil.TempDir cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) { @@ -1874,14 +1841,12 @@ func NewContainerRunner(dispatcherClient *arvados.Client, return nil, err } cr.Container.UUID = containerUUID - w, err := cr.NewLogWriter("crunch-run") + f, err := cr.openLogFile("crunch-run") if err != nil { return nil, err } - cr.CrunchLog = NewThrottledLogger(w) - cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) + cr.CrunchLog = newLogWriter(newTimestamper(io.MultiWriter(f, newStringPrefixer(os.Stderr, cr.Container.UUID+" ")))) - loadLogThrottleParams(dispatcherArvClient) go cr.updateLogs() return cr, nil @@ -2026,12 +1991,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s keepstoreLogbuf.SetWriter(io.Discard) } else { cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES")) - logwriter, err := cr.NewLogWriter("keepstore") + cr.keepstoreLogger, err = cr.openLogFile("keepstore") if err != nil { log.Print(err) return 1 } - cr.keepstoreLogger = NewThrottledLogger(logwriter) var writer io.WriteCloser = cr.keepstoreLogger if logWhat == "errors" { @@ -2057,13 +2021,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf) default: cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine) - cr.CrunchLog.Close() return 1 } if err != nil { cr.CrunchLog.Printf("%s: %v", containerUUID, err) cr.checkBrokenNode(err) - cr.CrunchLog.Close() return 1 } defer cr.executor.Close() diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go index 5cb982e1bb..58ae1c190c 100644 --- a/lib/crunchrun/crunchrun_test.go +++ b/lib/crunchrun/crunchrun_test.go @@ -14,7 +14,6 @@ import ( "io" "io/fs" "io/ioutil" - "log" "math/rand" "net/http" "net/http/httptest" @@ -121,7 +120,6 @@ type ArvTestClient struct { Content []arvadosclient.Dict arvados.Container secretMounts []byte - Logs map[string]*bytes.Buffer sync.Mutex WasSetRunning bool callraw bool @@ -207,14 +205,7 @@ func (client *ArvTestClient) Create(resourceType string, client.Content = append(client.Content, parameters) if resourceType == "logs" { - et := parameters["log"].(arvadosclient.Dict)["event_type"].(string) - if client.Logs == nil { - client.Logs = make(map[string]*bytes.Buffer) - } - if client.Logs[et] == nil { - client.Logs[et] = &bytes.Buffer{} - } - client.Logs[et].Write([]byte(parameters["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"])) + panic("logs.create called") } if resourceType == "collections" && output != nil { @@ -607,29 +598,6 @@ func (*KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename return ErrorReader{}, nil } -type ClosableBuffer struct { - bytes.Buffer -} - -func (*ClosableBuffer) Close() error { - return nil -} - -type TestLogs struct { - Stdout ClosableBuffer - Stderr ClosableBuffer -} - -func (tl *TestLogs) NewTestLoggingWriter(logstr string) (io.WriteCloser, error) { - if logstr == "stdout" { - return &tl.Stdout, nil - } - if logstr == "stderr" { - return &tl.Stderr, nil - } - return nil, errors.New("???") -} - func dockerLog(fd byte, msg string) []byte { by := []byte(msg) header := make([]byte, 8+len(by)) @@ -645,8 +613,6 @@ func (s *TestSuite) TestRunContainer(c *C) { return 0 } - var logs TestLogs - s.runner.NewLogWriter = logs.NewTestLoggingWriter s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH s.runner.Container.Command = []string{"./hw"} s.runner.Container.OutputStorageClasses = []string{"default"} @@ -663,8 +629,8 @@ func (s *TestSuite) TestRunContainer(c *C) { err = s.runner.WaitFinish() c.Assert(err, IsNil) - c.Check(logs.Stdout.String(), Matches, ".*Hello world\n") - c.Check(logs.Stderr.String(), Equals, "") + c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, `Hello world\n`) + c.Check(logFileContent(c, s.runner, "stderr.txt"), Matches, ``) } func (s *TestSuite) TestCommitLogs(c *C) { @@ -673,7 +639,9 @@ func (s *TestSuite) TestCommitLogs(c *C) { defer kc.Close() cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) - cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp + f, err := cr.openLogFile("crunch-run") + c.Assert(err, IsNil) + cr.CrunchLog = newLogWriter(newTestTimestamper(f)) cr.CrunchLog.Print("Hello world!") cr.CrunchLog.Print("Goodbye") @@ -682,10 +650,10 @@ func (s *TestSuite) TestCommitLogs(c *C) { err = cr.CommitLogs() c.Check(err, IsNil) - c.Check(api.Calls, Equals, 2) - c.Check(api.Content[1]["ensure_unique_name"], Equals, true) - c.Check(api.Content[1]["collection"].(arvadosclient.Dict)["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz") - c.Check(api.Content[1]["collection"].(arvadosclient.Dict)["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n") + c.Check(api.Calls, Equals, 1) + c.Check(api.Content[0]["ensure_unique_name"], Equals, true) + c.Check(api.Content[0]["collection"].(arvadosclient.Dict)["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz") + c.Check(api.Content[0]["collection"].(arvadosclient.Dict)["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n") c.Check(*cr.LogsPDH, Equals, "63da7bdacf08c40f604daad80c261e9a+60") } @@ -806,10 +774,7 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, fn } if err != nil { - for k, v := range s.api.Logs { - c.Log(k) - c.Log(v.String()) - } + dumpAllLogFiles(c, s.runner) } return s.api, s.runner, realTemp @@ -844,7 +809,7 @@ func (s *TestSuite) TestFullRunHello(c *C) { c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) - c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello world\n") + c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, "hello world\n") c.Check(s.testDispatcherKeepClient.StorageClasses, DeepEquals, []string{"default"}) c.Check(s.testContainerKeepClient.StorageClasses, DeepEquals, []string{"default"}) } @@ -935,13 +900,13 @@ func (s *TestSuite) testSpotInterruptionNotice(c *C, failureRate float64) { time.Sleep(time.Second) return 0 }) - c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`) - c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`) + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`) + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`) if failureRate == 1 { - c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`) + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`) } else { text := `Cloud provider scheduled instance stop at ` + stoptime.Load().(time.Time).Format(time.RFC3339) - c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*`+text+`.*`) + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*`+text+`.*`) c.Check(s.api.CalledWith("container.runtime_status.warning", "preemption notice"), NotNil) c.Check(s.api.CalledWith("container.runtime_status.warningDetail", text), NotNil) c.Check(s.api.CalledWith("container.runtime_status.preemptionNotice", text), NotNil) @@ -966,7 +931,7 @@ func (s *TestSuite) TestRunTimeExceeded(c *C) { }) c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil) - c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*") + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, "(?ms).*maximum run time exceeded.*") } func (s *TestSuite) TestContainerWaitFails(c *C) { @@ -984,7 +949,7 @@ func (s *TestSuite) TestContainerWaitFails(c *C) { }) c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil) - c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Container is not running.*") + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, "(?ms).*Container is not running.*") } func (s *TestSuite) TestCrunchstat(c *C) { @@ -1007,11 +972,10 @@ func (s *TestSuite) TestCrunchstat(c *C) { c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) - c.Assert(s.api.Logs["crunchstat"], NotNil) - c.Check(s.api.Logs["crunchstat"].String(), Matches, `(?ms).*mem \d+ swap \d+ pgmajfault \d+ rss.*`) + c.Check(logFileContent(c, s.runner, "crunchstat.txt"), Matches, `(?ms).*mem \d+ swap \d+ pgmajfault \d+ rss.*`) // Check that we called (*crunchstat.Reporter)Stop(). - c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Maximum crunch-run memory rss usage was \d+ bytes\n.*`) + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Maximum crunch-run memory rss usage was \d+ bytes\n.*`) } func (s *TestSuite) TestNodeInfoLog(c *C) { @@ -1032,19 +996,16 @@ func (s *TestSuite) TestNodeInfoLog(c *C) { c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) - buf, err := fs.ReadFile(arvados.FS(s.runner.LogCollection), "/node.json") - c.Assert(err, IsNil) - json := string(buf) + json := logFileContent(c, s.runner, "node.json") c.Check(json, Matches, `(?ms).*"ProviderType": *"a1\.2xlarge".*`) c.Check(json, Matches, `(?ms).*"Price": *1\.2.*`) - c.Assert(s.api.Logs["node-info"], NotNil) - json = s.api.Logs["node-info"].String() - c.Check(json, Matches, `(?ms).*Host Information.*`) - c.Check(json, Matches, `(?ms).*CPU Information.*`) - c.Check(json, Matches, `(?ms).*Memory Information.*`) - c.Check(json, Matches, `(?ms).*Disk Space.*`) - c.Check(json, Matches, `(?ms).*Disk INodes.*`) + nodeinfo := logFileContent(c, s.runner, "node-info.txt") + c.Check(nodeinfo, Matches, `(?ms).*Host Information.*`) + c.Check(nodeinfo, Matches, `(?ms).*CPU Information.*`) + c.Check(nodeinfo, Matches, `(?ms).*Memory Information.*`) + c.Check(nodeinfo, Matches, `(?ms).*Disk Space.*`) + c.Check(nodeinfo, Matches, `(?ms).*Disk INodes.*`) } func (s *TestSuite) TestLogVersionAndRuntime(c *C) { @@ -1062,11 +1023,10 @@ func (s *TestSuite) TestLogVersionAndRuntime(c *C) { return 0 }) - c.Assert(s.api.Logs["crunch-run"], NotNil) - c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`) - c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`) - c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`) - c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`) + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`) + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`) + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`) + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Using container runtime: stub.*`) } func (s *TestSuite) testLogRSSThresholds(c *C, ram int64, expected []int, notExpected int) { @@ -1082,8 +1042,9 @@ func (s *TestSuite) testLogRSSThresholds(c *C, ram int64, expected []int, notExp "runtime_constraints": {"ram": `+fmt.Sprintf("%d", ram)+`}, "state": "Locked" }`, nil, func() int { return 0 }) - c.Logf("=== crunchstat logs\n%s\n", s.api.Logs["crunchstat"].String()) - logs := s.api.Logs["crunch-run"].String() + logs := logFileContent(c, s.runner, "crunch-run.txt") + c.Log("=== crunchstat logs") + c.Log(logs) pattern := logLineStart + `Container using over %d%% of memory \(rss %d/%d bytes\)` var threshold int for _, threshold = range expected { @@ -1121,7 +1082,7 @@ func (s *TestSuite) TestLogMaximaAfterRun(c *C) { "runtime_constraints": {"ram": `+fmt.Sprintf("%d", s.debian12MemoryCurrent*10)+`}, "state": "Locked" }`, nil, func() int { return 0 }) - logs := s.api.Logs["crunch-run"].String() + logs := logFileContent(c, s.runner, "crunch-run.txt") for _, expected := range []string{ `Maximum disk usage was \d+%, \d+/\d+ bytes`, fmt.Sprintf(`Maximum container memory swap usage was %d bytes`, s.debian12SwapCurrent), @@ -1189,8 +1150,7 @@ func (s *TestSuite) TestContainerRecordLog(c *C) { c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) - c.Assert(s.api.Logs["container"], NotNil) - c.Check(s.api.Logs["container"].String(), Matches, `(?ms).*container_image.*`) + c.Check(logFileContent(c, s.runner, "container.json"), Matches, `(?ms).*container_image.*`) } func (s *TestSuite) TestFullRunStderr(c *C) { @@ -1215,8 +1175,8 @@ func (s *TestSuite) TestFullRunStderr(c *C) { c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1) c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil) - c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello\n") - c.Check(s.api.Logs["stderr"].String(), Matches, ".*world\n") + c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, ".*hello\n") + c.Check(logFileContent(c, s.runner, "stderr.txt"), Matches, ".*world\n") } func (s *TestSuite) TestFullRunDefaultCwd(c *C) { @@ -1237,8 +1197,7 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) { c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) - c.Log(s.api.Logs["stdout"]) - c.Check(s.api.Logs["stdout"].String(), Matches, `.*workdir=""\n`) + c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, `.*workdir=""`) } func (s *TestSuite) TestFullRunSetCwd(c *C) { @@ -1259,7 +1218,7 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) { c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) - c.Check(s.api.Logs["stdout"].String(), Matches, ".*/bin\n") + c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, ".*/bin\n") } func (s *TestSuite) TestFullRunSetOutputStorageClasses(c *C) { @@ -1281,7 +1240,7 @@ func (s *TestSuite) TestFullRunSetOutputStorageClasses(c *C) { c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) - c.Check(s.api.Logs["stdout"].String(), Matches, ".*/bin\n") + c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, ".*/bin\n") c.Check(s.testDispatcherKeepClient.StorageClasses, DeepEquals, []string{"foo", "bar"}) c.Check(s.testContainerKeepClient.StorageClasses, DeepEquals, []string{"foo", "bar"}) } @@ -1378,14 +1337,11 @@ func (s *TestSuite) testStopContainer(c *C) { case err = <-done: c.Check(err, IsNil) } - for k, v := range s.api.Logs { - c.Log(k) - c.Log(v.String(), "\n") - } + dumpAllLogFiles(c, s.runner) c.Check(s.api.CalledWith("container.log", nil), NotNil) c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil) - c.Check(s.api.Logs["stdout"].String(), Matches, "(?ms).*foo\n$") + c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, "(?ms).*foo\n$") } func (s *TestSuite) TestFullRunSetEnv(c *C) { @@ -1406,7 +1362,7 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) { c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) - c.Check(s.api.Logs["stdout"].String(), Matches, `.*map\[FROBIZ:bilbo\]\n`) + c.Check(logFileContent(c, s.runner, "stdout.txt"), Matches, `.*map\[FROBIZ:bilbo\]`) } type ArvMountCmdLine struct { @@ -1831,7 +1787,7 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) { }) c.Check(s.api.CalledWith("container.exit_code", 3), NotNil) c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) - c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*status code 3\n.*`) + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*status code 3\n.*`) } func (s *TestSuite) TestFullRunSetOutput(c *C) { @@ -1886,7 +1842,7 @@ func (s *TestSuite) TestArvMountRuntimeStatusWarning(c *C) { c.Check(s.api.CalledWith("container.runtime_status.warning", "arv-mount: Keep write error"), NotNil) c.Check(s.api.CalledWith("container.runtime_status.warningDetail", "Test: Keep write error: I am a teapot"), NotNil) c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) - c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Container exited with status code 137 \(signal 9, SIGKILL\).*`) + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, `(?ms).*Container exited with status code 137 \(signal 9, SIGKILL\).*`) } func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C) { @@ -2196,13 +2152,14 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) { "state": "Locked" }`, nil, func() int { return 0 }) c.Check(s.api.CalledWith("container.state", nextState), NotNil) - c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*") + logs := logFileContent(c, s.runner, "crunch-run.txt") + c.Check(logs, Matches, "(?ms).*unable to run containers.*") if s.runner.brokenNodeHook != "" { - c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*") - c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*") - c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*") + c.Check(logs, Matches, "(?ms).*Running broken node hook.*") + c.Check(logs, Matches, "(?ms).*killme.*") + c.Check(logs, Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*") } else { - c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*") + c.Check(logs, Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*") } } } @@ -2227,7 +2184,7 @@ func (s *TestSuite) TestBadCommand(c *C) { "state": "Locked" }`, nil, func() int { return 0 }) c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil) - c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*") + c.Check(logFileContent(c, s.runner, "crunch-run.txt"), Matches, "(?ms).*Possible causes:.*is missing.*") } } @@ -2337,7 +2294,7 @@ func (s *TestSuite) TestCalculateCost(c *C) { cr := s.runner cr.costStartTime = now.Add(-time.Hour) var logbuf bytes.Buffer - cr.CrunchLog.Immediate = log.New(&logbuf, "", 0) + cr.CrunchLog = newLogWriter(&logbuf) // if there's no InstanceType env var, cost is calculated as 0 os.Unsetenv("InstanceType") @@ -2463,3 +2420,20 @@ type FakeProcess struct { func (fp FakeProcess) CmdlineSlice() ([]string, error) { return fp.cmdLine, nil } + +func logFileContent(c *C, cr *ContainerRunner, fnm string) string { + buf, err := fs.ReadFile(arvados.FS(cr.LogCollection), fnm) + c.Assert(err, IsNil) + return string(buf) +} + +func dumpAllLogFiles(c *C, cr *ContainerRunner) { + d, err := cr.LogCollection.OpenFile("/", os.O_RDONLY, 0) + c.Assert(err, IsNil) + fis, err := d.Readdir(-1) + c.Assert(err, IsNil) + for _, fi := range fis { + c.Logf("=== %s", fi.Name()) + c.Log(logFileContent(c, cr, fi.Name())) + } +} diff --git a/lib/crunchrun/cuda.go b/lib/crunchrun/cuda.go index c693dbcb96..f91a5c62cd 100644 --- a/lib/crunchrun/cuda.go +++ b/lib/crunchrun/cuda.go @@ -5,13 +5,15 @@ package crunchrun import ( + "fmt" + "io" "os/exec" ) // nvidiaModprobe makes sure all the nvidia kernel modules and devices // are set up. If we don't have all the modules/devices set up we get // "CUDA_ERROR_UNKNOWN". -func nvidiaModprobe(writer *ThrottledLogger) { +func nvidiaModprobe(writer io.Writer) { // The underlying problem is that when normally running // directly on the host, the CUDA SDK will automatically // detect and set up the devices on demand. However, when @@ -42,7 +44,7 @@ func nvidiaModprobe(writer *ThrottledLogger) { nvidiaSmi.Stderr = writer err := nvidiaSmi.Run() if err != nil { - writer.Printf("Warning %v: %v", nvidiaSmi.Args, err) + fmt.Fprintf(writer, "Warning %v: %v\n", nvidiaSmi.Args, err) } // Load the kernel modules & devices associated with @@ -63,7 +65,7 @@ func nvidiaModprobe(writer *ThrottledLogger) { nvmodprobe.Stderr = writer err = nvmodprobe.Run() if err != nil { - writer.Printf("Warning %v: %v", nvmodprobe.Args, err) + fmt.Fprintf(writer, "Warning %v: %v\n", nvmodprobe.Args, err) } } } diff --git a/lib/crunchrun/logging.go b/lib/crunchrun/logging.go index 91a1b77cf4..04c3724969 100644 --- a/lib/crunchrun/logging.go +++ b/lib/crunchrun/logging.go @@ -5,373 +5,82 @@ package crunchrun import ( - "bufio" "bytes" "encoding/json" - "fmt" "io" "log" - "regexp" - "strings" - "sync" "time" - - "git.arvados.org/arvados.git/sdk/go/arvadosclient" ) -// Timestamper is the signature for a function that takes a timestamp and -// return a formated string value. -type Timestamper func(t time.Time) string - -// Logging plumbing: -// -// ThrottledLogger.Logger -> ThrottledLogger.Write -> -// ThrottledLogger.buf -> ThrottledLogger.flusher -> -// ArvLogWriter.Write -> CollectionFileWriter.Write | Api.Create -// -// For stdout/stderr ReadWriteLines additionally runs as a goroutine to pull -// data from the stdout/stderr Reader and send to the Logger. +// rfc3339NanoFixed is a fixed-width version of time.RFC3339Nano. +const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" -// ThrottledLogger accepts writes, prepends a timestamp to each line of the -// write, and periodically flushes to a downstream writer. It supports the -// "Logger" and "WriteCloser" interfaces. -type ThrottledLogger struct { - *log.Logger - buf *bytes.Buffer - sync.Mutex - writer io.WriteCloser - flush chan struct{} - stopped chan struct{} - stopping chan struct{} - Timestamper - Immediate *log.Logger - pendingFlush bool +// prefixer wraps an io.Writer, inserting a string returned by +// prefixFunc at the beginning of each line. +type prefixer struct { + writer io.Writer + prefixFunc func() string + unfinished bool // true if the most recent write ended with a non-newline char } -// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano. -const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" - -// RFC3339Timestamp formats t as RFC3339NanoFixed. -func RFC3339Timestamp(t time.Time) string { - return t.Format(RFC3339NanoFixed) -} - -// Write prepends a timestamp to each line of the input data and -// appends to the internal buffer. Each line is also logged to -// tl.Immediate, if tl.Immediate is not nil. -func (tl *ThrottledLogger) Write(p []byte) (n int, err error) { - tl.Mutex.Lock() - defer tl.Mutex.Unlock() - - if tl.buf == nil { - tl.buf = &bytes.Buffer{} - } - - now := tl.Timestamper(time.Now().UTC()) - sc := bufio.NewScanner(bytes.NewBuffer(p)) - for err == nil && sc.Scan() { - out := fmt.Sprintf("%s %s\n", now, sc.Bytes()) - if tl.Immediate != nil { - tl.Immediate.Print(out[:len(out)-1]) - } - _, err = io.WriteString(tl.buf, out) - } - if err == nil { - err = sc.Err() - if err == nil { - n = len(p) - } - } - - if int64(tl.buf.Len()) >= crunchLogBytesPerEvent { - // Non-blocking send. Try send a flush if it is ready to - // accept it. Otherwise do nothing because a flush is already - // pending. - select { - case tl.flush <- struct{}{}: - default: - } +// newTimestamper wraps an io.Writer, inserting an RFC3339NanoFixed +// timestamp at the beginning of each line. +func newTimestamper(w io.Writer) *prefixer { + return &prefixer{ + writer: w, + prefixFunc: func() string { return time.Now().UTC().Format(rfc3339NanoFixed + " ") }, } - - return } -// Periodically check the current buffer; if not empty, send it on the -// channel to the goWriter goroutine. -func (tl *ThrottledLogger) flusher() { - ticker := time.NewTicker(time.Duration(crunchLogSecondsBetweenEvents)) - defer ticker.Stop() - for stopping := false; !stopping; { - select { - case <-tl.stopping: - // flush tl.buf and exit the loop - stopping = true - case <-tl.flush: - case <-ticker.C: - } - - var ready *bytes.Buffer - - tl.Mutex.Lock() - ready, tl.buf = tl.buf, &bytes.Buffer{} - tl.Mutex.Unlock() - - if ready != nil && ready.Len() > 0 { - tl.writer.Write(ready.Bytes()) - } - } - close(tl.stopped) -} - -// Close the flusher goroutine and wait for it to complete, then close the -// underlying Writer. -func (tl *ThrottledLogger) Close() error { - select { - case <-tl.stopping: - // already stopped - default: - close(tl.stopping) +// newStringPrefixer wraps an io.Writer, inserting the given string at +// the beginning of each line. The given string should include a +// trailing space for readability. +func newStringPrefixer(w io.Writer, s string) *prefixer { + return &prefixer{ + writer: w, + prefixFunc: func() string { return s }, } - <-tl.stopped - return tl.writer.Close() } -const ( - // MaxLogLine is the maximum length of stdout/stderr lines before they are split. - MaxLogLine = 1 << 12 -) - -// ReadWriteLines reads lines from a reader and writes to a Writer, with long -// line splitting. -func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) { - reader := bufio.NewReaderSize(in, MaxLogLine) - var prefix string - for { - line, isPrefix, err := reader.ReadLine() - if err == io.EOF { - break - } else if err != nil { - writer.Write([]byte(fmt.Sprintln("error reading container log:", err))) - } - var suffix string - if isPrefix { - suffix = "[...]\n" - } - - if prefix == "" && suffix == "" { - writer.Write(line) - } else { - writer.Write([]byte(fmt.Sprint(prefix, string(line), suffix))) +func (tp *prefixer) Write(p []byte) (n int, err error) { + for len(p) > 0 && err == nil { + if !tp.unfinished { + _, err = io.WriteString(tp.writer, tp.prefixFunc()) + if err != nil { + return + } } - - // Set up prefix for following line - if isPrefix { - prefix = "[...]" + newline := bytes.IndexRune(p, '\n') + var nn int + if newline < 0 { + tp.unfinished = true + nn, err = tp.writer.Write(p) + p = nil } else { - prefix = "" + tp.unfinished = false + nn, err = tp.writer.Write(p[:newline+1]) + p = p[nn:] } + n += nn } - done <- true -} - -// NewThrottledLogger creates a new thottled logger that -// - prepends timestamps to each line, and -// - batches log messages and only calls the underlying Writer -// at most once per "crunchLogSecondsBetweenEvents" seconds. -func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger { - tl := &ThrottledLogger{} - tl.flush = make(chan struct{}, 1) - tl.stopped = make(chan struct{}) - tl.stopping = make(chan struct{}) - tl.writer = writer - tl.Logger = log.New(tl, "", 0) - tl.Timestamper = RFC3339Timestamp - go tl.flusher() - return tl -} - -// Log throttling rate limiting config parameters -var crunchLimitLogBytesPerJob int64 = 67108864 -var crunchLogThrottleBytes int64 = 65536 -var crunchLogThrottlePeriod time.Duration = time.Second * 60 -var crunchLogThrottleLines int64 = 1024 -var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5 -var crunchLogBytesPerEvent int64 = 4096 -var crunchLogSecondsBetweenEvents = time.Second -var crunchLogUpdatePeriod = time.Hour / 2 -var crunchLogUpdateSize = int64(1 << 25) - -// ArvLogWriter is an io.WriteCloser that processes each write by -// writing it through to another io.WriteCloser (typically a -// CollectionFileWriter) and creating an Arvados log entry. -type ArvLogWriter struct { - ArvClient IArvadosClient - UUID string - loggingStream string - writeCloser io.WriteCloser - - // for rate limiting - bytesLogged int64 - logThrottleResetTime time.Time - logThrottleLinesSoFar int64 - logThrottleBytesSoFar int64 - logThrottleBytesSkipped int64 - logThrottleIsOpen bool - logThrottlePartialLineNextAt time.Time - logThrottleFirstPartialLine bool - bufToFlush bytes.Buffer - bufFlushedAt time.Time - closing bool + return } -func (arvlog *ArvLogWriter) Write(p []byte) (int, error) { - // Write to the next writer in the chain (a file in Keep) - var err1 error - if arvlog.writeCloser != nil { - _, err1 = arvlog.writeCloser.Write(p) - } - - // write to API after checking rate limit - now := time.Now() - - if now.After(arvlog.logThrottleResetTime) { - // It has been more than throttle_period seconds since the last - // checkpoint; so reset the throttle - if arvlog.logThrottleBytesSkipped > 0 { - arvlog.bufToFlush.WriteString(fmt.Sprintf("%s Skipped %d bytes of log\n", RFC3339Timestamp(now.UTC()), arvlog.logThrottleBytesSkipped)) - } - - arvlog.logThrottleResetTime = now.Add(crunchLogThrottlePeriod) - arvlog.logThrottleBytesSoFar = 0 - arvlog.logThrottleLinesSoFar = 0 - arvlog.logThrottleBytesSkipped = 0 - arvlog.logThrottleIsOpen = true - } - - lines := bytes.Split(p, []byte("\n")) - - for _, line := range lines { - // Short circuit the counting code if we're just going to throw - // away the data anyway. - if !arvlog.logThrottleIsOpen { - arvlog.logThrottleBytesSkipped += int64(len(line)) - continue - } else if len(line) == 0 { - continue - } - - // check rateLimit - logOpen, msg := arvlog.rateLimit(line, now) - if logOpen { - arvlog.bufToFlush.WriteString(string(msg) + "\n") - } - } - - if (int64(arvlog.bufToFlush.Len()) >= crunchLogBytesPerEvent || - (now.Sub(arvlog.bufFlushedAt) >= crunchLogSecondsBetweenEvents) || - arvlog.closing) && (arvlog.bufToFlush.Len() > 0) { - // write to API - lr := arvadosclient.Dict{"log": arvadosclient.Dict{ - "object_uuid": arvlog.UUID, - "event_type": arvlog.loggingStream, - "properties": map[string]string{"text": arvlog.bufToFlush.String()}}} - err2 := arvlog.ArvClient.Create("logs", lr, nil) - - arvlog.bufToFlush = bytes.Buffer{} - arvlog.bufFlushedAt = now - - if err1 != nil || err2 != nil { - return 0, fmt.Errorf("%s ; %s", err1, err2) - } - } - - return len(p), nil +// logWriter adds log.Logger methods to an io.Writer. +type logWriter struct { + io.Writer + *log.Logger } -// Close the underlying writer -func (arvlog *ArvLogWriter) Close() (err error) { - arvlog.closing = true - arvlog.Write([]byte{}) - if arvlog.writeCloser != nil { - err = arvlog.writeCloser.Close() - arvlog.writeCloser = nil +func newLogWriter(w io.Writer) *logWriter { + return &logWriter{ + Writer: w, + Logger: log.New(w, "", 0), } - return err } -var lineRegexp = regexp.MustCompile(`^\S+ (.*)`) - -// Test for hard cap on total output and for log throttling. Returns whether -// the log line should go to output or not. Returns message if limit exceeded. -func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte) { - message := "" - lineSize := int64(len(line)) - - if arvlog.logThrottleIsOpen { - matches := lineRegexp.FindStringSubmatch(string(line)) - - if len(matches) == 2 && strings.HasPrefix(matches[1], "[...]") && strings.HasSuffix(matches[1], "[...]") { - // This is a partial line. - - if arvlog.logThrottleFirstPartialLine { - // Partial should be suppressed. First time this is happening for this line so provide a message instead. - arvlog.logThrottleFirstPartialLine = false - arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod) - arvlog.logThrottleBytesSkipped += lineSize - return true, []byte(fmt.Sprintf("%s Rate-limiting partial segments of long lines to one every %d seconds.", - RFC3339Timestamp(now.UTC()), crunchLogPartialLineThrottlePeriod/time.Second)) - } else if now.After(arvlog.logThrottlePartialLineNextAt) { - // The throttle period has passed. Update timestamp and let it through. - arvlog.logThrottlePartialLineNextAt = now.Add(crunchLogPartialLineThrottlePeriod) - } else { - // Suppress line. - arvlog.logThrottleBytesSkipped += lineSize - return false, line - } - } else { - // Not a partial line so reset. - arvlog.logThrottlePartialLineNextAt = time.Time{} - arvlog.logThrottleFirstPartialLine = true - } - - arvlog.bytesLogged += lineSize - arvlog.logThrottleBytesSoFar += lineSize - arvlog.logThrottleLinesSoFar++ - - if arvlog.bytesLogged > crunchLimitLogBytesPerJob { - message = fmt.Sprintf("%s Exceeded log limit %d bytes (crunch_limit_log_bytes_per_job). Log will be truncated.", - RFC3339Timestamp(now.UTC()), crunchLimitLogBytesPerJob) - arvlog.logThrottleResetTime = now.Add(time.Duration(365 * 24 * time.Hour)) - arvlog.logThrottleIsOpen = false - - } else if arvlog.logThrottleBytesSoFar > crunchLogThrottleBytes { - remainingTime := arvlog.logThrottleResetTime.Sub(now) - message = fmt.Sprintf("%s Exceeded rate %d bytes per %d seconds (crunch_log_throttle_bytes). Logging will be silenced for the next %d seconds.", - RFC3339Timestamp(now.UTC()), crunchLogThrottleBytes, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second) - arvlog.logThrottleIsOpen = false - - } else if arvlog.logThrottleLinesSoFar > crunchLogThrottleLines { - remainingTime := arvlog.logThrottleResetTime.Sub(now) - message = fmt.Sprintf("%s Exceeded rate %d lines per %d seconds (crunch_log_throttle_lines), logging will be silenced for the next %d seconds.", - RFC3339Timestamp(now.UTC()), crunchLogThrottleLines, crunchLogThrottlePeriod/time.Second, remainingTime/time.Second) - arvlog.logThrottleIsOpen = false - - } - } - - if !arvlog.logThrottleIsOpen { - // Don't log anything if any limit has been exceeded. Just count lossage. - arvlog.logThrottleBytesSkipped += lineSize - } - - if message != "" { - // Yes, write to logs, but use our "rate exceeded" message - // instead of the log message that exceeded the limit. - message += " A complete log is still being written to Keep, and will be available when the job finishes." - return true, []byte(message) - } - return arvlog.logThrottleIsOpen, line -} +var crunchLogUpdatePeriod = time.Hour / 2 +var crunchLogUpdateSize = int64(1 << 25) // load the rate limit discovery config parameters func loadLogThrottleParams(clnt IArvadosClient) { @@ -394,13 +103,6 @@ func loadLogThrottleParams(clnt IArvadosClient) { } } - loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob") - loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes") - loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod") - loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines") - loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod") - loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent") - loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents") loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize") loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod") diff --git a/lib/crunchrun/logging_test.go b/lib/crunchrun/logging_test.go index ee3320c7c3..e747a2069e 100644 --- a/lib/crunchrun/logging_test.go +++ b/lib/crunchrun/logging_test.go @@ -13,26 +13,25 @@ import ( "time" "git.arvados.org/arvados.git/sdk/go/arvados" - "git.arvados.org/arvados.git/sdk/go/arvadosclient" . "gopkg.in/check.v1" check "gopkg.in/check.v1" ) -type LoggingTestSuite struct { - client *arvados.Client -} - -type TestTimestamper struct { - count int +// newTestTimestamper wraps an io.Writer, inserting a predictable +// RFC3339NanoFixed timestamp at the beginning of each line. +func newTestTimestamper(w io.Writer) *prefixer { + count := 0 + return &prefixer{ + writer: w, + prefixFunc: func() string { + count++ + return fmt.Sprintf("2015-12-29T15:51:45.%09dZ ", count) + }, + } } -func (stamper *TestTimestamper) Timestamp(t time.Time) string { - stamper.count++ - t, err := time.ParseInLocation(time.RFC3339Nano, fmt.Sprintf("2015-12-29T15:51:45.%09dZ", stamper.count), t.Location()) - if err != nil { - panic(err) - } - return RFC3339Timestamp(t) +type LoggingTestSuite struct { + client *arvados.Client } // Gocheck boilerplate @@ -48,26 +47,20 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) { api := &ArvTestClient{} kc := &KeepTestClient{} defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz") + cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-dz642-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) - cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp + f, err := cr.openLogFile("crunch-run") + c.Assert(err, IsNil) + cr.CrunchLog = newLogWriter(newTestTimestamper(f)) cr.CrunchLog.Print("Hello world!") cr.CrunchLog.Print("Goodbye") - cr.CrunchLog.Close() - - c.Check(api.Calls, Equals, 1) - - mt, err := cr.LogCollection.MarshalManifest(".") - c.Check(err, IsNil) - c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n") - logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" + - "2015-12-29T15:51:45.000000002Z Goodbye\n" + c.Check(api.Calls, Equals, 0) - c.Check(api.Content[0]["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run") - c.Check(api.Content[0]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext) - s.checkWroteBlock(c, kc, "74561df9ae65ee9f35d5661d42454264+83", logtext) + logs := logFileContent(c, cr, "crunch-run.txt") + c.Check(logs, Matches, `....-..-..T..:..:..\..........Z Hello world!\n`+ + `....-..-..T..:..:..\..........Z Goodbye\n`) } func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) { @@ -79,59 +72,34 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) { defer kc.Close() cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) - cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp - cr.CrunchLog.Immediate = nil - + f, err := cr.openLogFile("crunch-run") + c.Assert(err, IsNil) + cr.CrunchLog = newLogWriter(newTestTimestamper(f)) for i := 0; i < 2000000; i++ { cr.CrunchLog.Printf("Hello %d", i) } cr.CrunchLog.Print("Goodbye") - cr.CrunchLog.Close() - - c.Check(api.Calls > 0, Equals, true) - c.Check(api.Calls < 2000000, Equals, true) - - mt, err := cr.LogCollection.MarshalManifest(".") - c.Check(err, IsNil) - c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunch-run.txt\n") -} - -func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) { - api := &ArvTestClient{} - kc := &KeepTestClient{} - defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz") - c.Assert(err, IsNil) - ts := &TestTimestamper{} - cr.CrunchLog.Timestamper = ts.Timestamp - w, err := cr.NewLogWriter("stdout") - c.Assert(err, IsNil) - stdout := NewThrottledLogger(w) - stdout.Timestamper = ts.Timestamp - - cr.CrunchLog.Print("Hello world!") - stdout.Print("Doing stuff") - cr.CrunchLog.Print("Goodbye") - stdout.Print("Blurb") - cr.CrunchLog.Close() - stdout.Close() - - logText := make(map[string]string) - for _, content := range api.Content { - log := content["log"].(arvadosclient.Dict) - logText[log["event_type"].(string)] += log["properties"].(map[string]string)["text"] - } - c.Check(logText["crunch-run"], Equals, `2015-12-29T15:51:45.000000001Z Hello world! -2015-12-29T15:51:45.000000003Z Goodbye -`) - c.Check(logText["stdout"], Equals, `2015-12-29T15:51:45.000000002Z Doing stuff -2015-12-29T15:51:45.000000004Z Blurb + logs := logFileContent(c, cr, "crunch-run.txt") + c.Check(strings.Count(logs, "\n"), Equals, 2000001) + // Redact most of the logs except the start/end for the regexp + // match -- otherwise, when the regexp fails, gocheck spams + // the test logs with tens of megabytes of quoted strings. + c.Assert(len(logs) > 10000, Equals, true) + c.Check(logs[:500]+"\n...\n"+logs[len(logs)-500:], Matches, `(?ms)2015-12-29T15:51:45.000000001Z Hello 0 +2015-12-29T15:51:45.000000002Z Hello 1 +2015-12-29T15:51:45.000000003Z Hello 2 +2015-12-29T15:51:45.000000004Z Hello 3 +.* +2015-12-29T15:51:45.001999998Z Hello 1999997 +2015-12-29T15:51:45.001999999Z Hello 1999998 +2015-12-29T15:51:45.002000000Z Hello 1999999 +2015-12-29T15:51:45.002000001Z Goodbye `) mt, err := cr.LogCollection.MarshalManifest(".") c.Check(err, IsNil) - c.Check(mt, Equals, ". 48f9023dc683a850b1c9b482b14c4b97+163 0:83:crunch-run.txt 83:80:stdout.txt\n") + c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunch-run.txt\n") } func (s *LoggingTestSuite) TestLogUpdate(c *C) { @@ -149,14 +117,13 @@ func (s *LoggingTestSuite) TestLogUpdate(c *C) { api := &ArvTestClient{} kc := &KeepTestClient{} defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz") + cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-dz642-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) - ts := &TestTimestamper{} - cr.CrunchLog.Timestamper = ts.Timestamp - w, err := cr.NewLogWriter("stdout") + f, err := cr.openLogFile("crunch-run") + c.Assert(err, IsNil) + cr.CrunchLog = newLogWriter(newTestTimestamper(f)) + stdout, err := cr.openLogFile("stdout") c.Assert(err, IsNil) - stdout := NewThrottledLogger(w) - stdout.Timestamper = ts.Timestamp c.Check(cr.logUUID, Equals, "") cr.CrunchLog.Printf("Hello %1000s", "space") @@ -165,75 +132,18 @@ func (s *LoggingTestSuite) TestLogUpdate(c *C) { } c.Check(cr.logUUID, Not(Equals), "") cr.CrunchLog.Print("Goodbye") - fmt.Fprint(stdout, "Goodbye\n") - cr.CrunchLog.Close() - stdout.Close() - w.Close() + fmt.Fprintln(stdout, "Goodbye") + + c.Check(logFileContent(c, cr, "crunch-run.txt"), Matches, `....-..-..T..:..:............Z Hello {995}space\n`+ + `....-..-..T..:..:............Z Goodbye\n`) + c.Check(logFileContent(c, cr, "stdout.txt"), Matches, `Goodbye\n`) mt, err := cr.LogCollection.MarshalManifest(".") c.Check(err, IsNil) - // Block packing depends on whether there's an update - // between the two Goodbyes -- either way the first - // block will be 4dc76. - c.Check(mt, Matches, `. 4dc76e0a212bfa30c39d76d8c16da0c0\+1038 (afc503bc1b9a828b4bb543cb629e936c\+78|90699dc22545cd74a0664303f70bc05a\+39 276b49339fd5203d15a93ff3de11bfb9\+39) 0:1077:crunch-run.txt 1077:39:stdout.txt\n`) + c.Check(mt, Matches, `. 4dc76e0a212bfa30c39d76d8c16da0c0\+1038 5be52044a8c51e7b62dd62be07872968\+47 0:1077:crunch-run.txt 1077:8:stdout.txt\n`) } } -func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytes(c *C) { - s.testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds") -} - -func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleLines(c *C) { - s.testWriteLogsWithRateLimit(c, "crunchLogThrottleLines", 1, 1024, "Exceeded rate 1 lines per 60 seconds") -} - -func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytesPerEvent(c *C) { - s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 50, 67108864, "Exceeded log limit 50 bytes (crunch_limit_log_bytes_per_job)") -} - -func (s *LoggingTestSuite) TestWriteLogsWithZeroBytesPerJob(c *C) { - s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 0, 67108864, "Exceeded log limit 0 bytes (crunch_limit_log_bytes_per_job)") -} - -func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, throttleDefault int, expected string) { - discoveryMap[throttleParam] = float64(throttleValue) - defer func() { - discoveryMap[throttleParam] = float64(throttleDefault) - }() - - api := &ArvTestClient{} - kc := &KeepTestClient{} - defer kc.Close() - cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzzzzzzzzzzzz") - c.Assert(err, IsNil) - cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp - - cr.CrunchLog.Print("Hello world!") - cr.CrunchLog.Print("Goodbye") - cr.CrunchLog.Close() - - c.Check(api.Calls, Equals, 1) - - mt, err := cr.LogCollection.MarshalManifest(".") - c.Check(err, IsNil) - c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n") - - logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" + - "2015-12-29T15:51:45.000000002Z Goodbye\n" - - c.Check(api.Content[0]["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run") - stderrLog := api.Content[0]["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"] - c.Check(true, Equals, strings.Contains(stderrLog, expected)) - s.checkWroteBlock(c, kc, "74561df9ae65ee9f35d5661d42454264+83", logtext) -} - -func (s *LoggingTestSuite) checkWroteBlock(c *C, kc *KeepTestClient, locator, expect string) { - buf := make([]byte, len([]byte(expect))+1) - n, err := kc.ReadAt(locator, buf, 0) - c.Check(err, IsNil) - c.Check(string(buf[:n]), Equals, expect) -} - type filterSuite struct{} var _ = Suite(&filterSuite{})